from typing import List, Dict, Any, Optional import pandas as pd from sqlalchemy import inspect, text from app.core.database import db_connector from app.core.logging import migration_logger class SchemaManager: """Управление схемой таблиц""" def get_mssql_columns(self, table_name: str) -> List[Dict[str, Any]]: """Получить структуру колонок из MSSQL""" try: query = f""" SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' ORDER BY ORDINAL_POSITION """ df = pd.read_sql_query(query, db_connector.src_engine) return df.to_dict('records') except Exception as e: migration_logger.error(f"Ошибка получения колонок из MSSQL для {table_name}: {e}") return [] def get_postgres_columns(self, table_name: str) -> List[Dict[str, Any]]: """Получить структуру колонок из PostgreSQL""" try: inspector = inspect(db_connector.dst_engine) columns = inspector.get_columns(table_name.lower()) return [{'name': c['name'], 'type': str(c['type']), 'nullable': c['nullable']} for c in columns] except Exception as e: migration_logger.error(f"Ошибка получения колонок из PG для {table_name}: {e}") return [] def detect_new_columns(self, table_name: str) -> List[Dict[str, Any]]: """Обнаружить новые колонки в MSSQL, которых нет в PostgreSQL""" mssql_cols = self.get_mssql_columns(table_name) pg_cols = self.get_postgres_columns(table_name) pg_col_names = {c['name'] for c in pg_cols} new_columns = [] for col in mssql_cols: if col['COLUMN_NAME'] not in pg_col_names: new_columns.append({ 'name': col['COLUMN_NAME'], 'type': self._mssql_to_postgres_type(col['DATA_TYPE'], col['CHARACTER_MAXIMUM_LENGTH']), 'nullable': True # Всегда создаем как NULL для новых колонок }) if new_columns: migration_logger.info(f"🔍 Обнаружено {len(new_columns)} новых колонок в {table_name}: {[c['name'] for c in new_columns]}") return new_columns def _mssql_to_postgres_type(self, mssql_type: str, max_length: Optional[int]) -> str: """Конвертация типа MSSQL в PostgreSQL""" mssql_type = mssql_type.lower() type_map = { 'int': 'INTEGER', 'bigint': 'BIGINT', 'smallint': 'SMALLINT', 'tinyint': 'SMALLINT', 'bit': 'BOOLEAN', 'float': 'DOUBLE PRECISION', 'real': 'REAL', 'decimal': 'NUMERIC', 'numeric': 'NUMERIC', 'datetime': 'TIMESTAMP', 'datetime2': 'TIMESTAMP', 'date': 'DATE', 'time': 'TIME', 'char': f'CHAR({max_length})' if max_length else 'CHAR(1)', 'nchar': f'CHAR({max_length})' if max_length else 'CHAR(1)', 'varchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT', 'nvarchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT', 'text': 'TEXT', 'ntext': 'TEXT', 'uniqueidentifier': 'UUID', } return type_map.get(mssql_type, 'TEXT') def add_new_columns(self, table_name: str, new_columns: List[Dict[str, Any]]): """Добавить новые колонки в PostgreSQL""" if not new_columns: return pg_table = table_name.lower() for col in new_columns: try: alter_sql = f'ALTER TABLE "{pg_table}" ADD COLUMN "{col["name"]}" {col["type"]} NULL' with db_connector.dst_connection() as conn: conn.execute(text(alter_sql)) conn.commit() migration_logger.info(f"Добавлена новая колонка {col['name']} ({col['type']}) в {pg_table}") except Exception as e: migration_logger.error(f"Ошибка добавления колонки {col['name']} в {table_name}: {e}") def check_foreign_keys_exist(self, table_name: str) -> List[Dict[str, Any]]: """Проверить, какие внешние ключи уже существуют в PostgreSQL""" try: query = f""" SELECT tc.constraint_name, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = '{table_name.lower()}' """ df = pd.read_sql_query(query, db_connector.dst_engine) existing_fks = [] for _, row in df.iterrows(): existing_fks.append({ 'name': row['constraint_name'], 'column': row['column_name'], 'references_table': row['foreign_table_name'], 'references_column': row['foreign_column_name'] }) return existing_fks except Exception as e: migration_logger.error(f"Ошибка проверки FK в PostgreSQL для {table_name}: {e}") return [] schema_manager = SchemaManager()