from typing import Optional, List, Dict, Any import pandas as pd from app.core.database import db_connector from app.core.logging import migration_logger def get_primary_key(table_name: str) -> Optional[str]: """Получить колонку первичного ключа""" try: query = f""" SELECT TOP 1 c.name as column_name FROM sys.indexes i INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id WHERE i.object_id = OBJECT_ID('{table_name}') AND i.is_primary_key = 1 """ pk_df = pd.read_sql_query(query, db_connector.src_engine) if not pk_df.empty: pk_column = pk_df.iloc[0]['column_name'] migration_logger.info(f"PRIMARY KEY для {table_name}: {pk_column}") return pk_column migration_logger.warning(f"В {table_name} нет PRIMARY KEY, ищем ID колонку...") columns_query = f""" SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' """ columns_df = pd.read_sql_query(columns_query, db_connector.src_engine) id_keywords = ['ID', 'Id', 'id', 'Code', 'KEY'] for col in columns_df['COLUMN_NAME']: for keyword in id_keywords: if keyword in col: migration_logger.info(f"ID колонка для {table_name}: {col}") return col if not columns_df.empty: first_col = columns_df.iloc[0]['COLUMN_NAME'] migration_logger.warning(f"Используем первую колонку для {table_name}: {first_col}") return first_col return None except Exception as e: migration_logger.error(f"Ошибка поиска PK для {table_name}: {e}") return None def get_foreign_keys(table_name: str) -> List[Dict[str, str]]: """Получить внешние ключи без проверки связанных таблиц""" try: query = f""" SELECT fk.name as fk_name, pc.name as parent_column, rc.name as referenced_column, rt.name as referenced_table FROM sys.foreign_keys fk INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id INNER JOIN sys.columns pc ON fkc.parent_object_id = pc.object_id AND fkc.parent_column_id = pc.column_id INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id WHERE fk.parent_object_id = OBJECT_ID('{table_name}') ORDER BY fk.name, fkc.constraint_column_id """ fk_df = pd.read_sql_query(query, db_connector.src_engine) if fk_df.empty: return [] foreign_keys = {} for _, row in fk_df.iterrows(): fk_name = row['fk_name'] if fk_name not in foreign_keys: foreign_keys[fk_name] = { 'name': fk_name, 'parent_column': row['parent_column'], 'referenced_table': row['referenced_table'], 'referenced_column': row['referenced_column'] } result = list(foreign_keys.values()) migration_logger.info(f"Найдено {len(result)} внешних ключей для {table_name}") return result except Exception as e: migration_logger.error(f"Ошибка получения внешних ключей для {table_name}: {e}") return [] def get_indexes(table_name: str) -> List[Dict[str, Any]]: """Получить индексы таблицы""" try: query = f""" SELECT i.name as index_name, i.is_unique, c.name as column_name FROM sys.indexes i INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id WHERE i.object_id = OBJECT_ID('{table_name}') AND i.is_primary_key = 0 ORDER BY i.name, ic.key_ordinal """ idx_df = pd.read_sql_query(query, db_connector.src_engine) if idx_df.empty: return [] indexes = {} for _, row in idx_df.iterrows(): idx_name = row['index_name'] if idx_name not in indexes: indexes[idx_name] = { 'name': idx_name, 'unique': bool(row['is_unique']), 'columns': [] } indexes[idx_name]['columns'].append(row['column_name']) result = list(indexes.values()) migration_logger.info(f"Найдено {len(result)} индексов для {table_name}") return result except Exception as e: migration_logger.error(f"Ошибка получения индексов для {table_name}: {e}") return [] def get_max_id_from_postgres(table_name: str, id_column: str) -> Optional[int]: """Получить максимальный ID из PostgreSQL""" try: query = f'SELECT MAX("{id_column}") as max_id FROM "{table_name.lower()}"' df = pd.read_sql_query(query, db_connector.dst_engine) if not df.empty and df.iloc[0]['max_id'] is not None: return int(df.iloc[0]['max_id']) return 0 except Exception as e: migration_logger.error(f"Ошибка получения max ID из PG для {table_name}: {e}") return 0