Files
replicator/app/services/schema_manager.py
2026-03-08 20:21:15 +09:00

150 lines
6.3 KiB
Python

from typing import List, Dict, Any, Optional
import pandas as pd
from sqlalchemy import inspect, text
from app.core.database import db_connector
from app.core.logging import migration_logger
class SchemaManager:
"""Управление схемой таблиц"""
def get_mssql_columns(self, table_name: str) -> List[Dict[str, Any]]:
"""Получить структуру колонок из MSSQL"""
try:
query = f"""
SELECT
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
CHARACTER_MAXIMUM_LENGTH
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
ORDER BY ORDINAL_POSITION
"""
df = pd.read_sql_query(query, db_connector.src_engine)
return df.to_dict('records')
except Exception as e:
migration_logger.error(f"Ошибка получения колонок из MSSQL для {table_name}: {e}")
return []
def get_postgres_columns(self, table_name: str) -> List[Dict[str, Any]]:
"""Получить структуру колонок из PostgreSQL"""
try:
inspector = inspect(db_connector.dst_engine)
columns = inspector.get_columns(table_name.lower())
return [{'name': c['name'], 'type': str(c['type']), 'nullable': c['nullable']} for c in columns]
except Exception as e:
migration_logger.error(f"Ошибка получения колонок из PG для {table_name}: {e}")
return []
def detect_new_columns(self, table_name: str) -> List[Dict[str, Any]]:
"""Обнаружить новые колонки в MSSQL, которых нет в PostgreSQL"""
mssql_cols = self.get_mssql_columns(table_name)
pg_cols = self.get_postgres_columns(table_name)
pg_col_names = {c['name'] for c in pg_cols}
new_columns = []
for col in mssql_cols:
if col['COLUMN_NAME'] not in pg_col_names:
new_columns.append({
'name': col['COLUMN_NAME'],
'type': self._mssql_to_postgres_type(col['DATA_TYPE'], col['CHARACTER_MAXIMUM_LENGTH']),
'nullable': True # Всегда создаем как NULL для новых колонок
})
if new_columns:
migration_logger.info(f"🔍 Обнаружено {len(new_columns)} новых колонок в {table_name}: {[c['name'] for c in new_columns]}")
return new_columns
def _mssql_to_postgres_type(self, mssql_type: str, max_length: Optional[int]) -> str:
"""Конвертация типа MSSQL в PostgreSQL"""
mssql_type = mssql_type.lower()
type_map = {
'int': 'INTEGER',
'bigint': 'BIGINT',
'smallint': 'SMALLINT',
'tinyint': 'SMALLINT',
'bit': 'BOOLEAN',
'float': 'DOUBLE PRECISION',
'real': 'REAL',
'decimal': 'NUMERIC',
'numeric': 'NUMERIC',
'datetime': 'TIMESTAMP',
'datetime2': 'TIMESTAMP',
'date': 'DATE',
'time': 'TIME',
'char': f'CHAR({max_length})' if max_length else 'CHAR(1)',
'nchar': f'CHAR({max_length})' if max_length else 'CHAR(1)',
'varchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT',
'nvarchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT',
'text': 'TEXT',
'ntext': 'TEXT',
'uniqueidentifier': 'UUID',
}
return type_map.get(mssql_type, 'TEXT')
def add_new_columns(self, table_name: str, new_columns: List[Dict[str, Any]]):
"""Добавить новые колонки в PostgreSQL"""
if not new_columns:
return
pg_table = table_name.lower()
for col in new_columns:
try:
alter_sql = f'ALTER TABLE "{pg_table}" ADD COLUMN "{col["name"]}" {col["type"]} NULL'
with db_connector.dst_connection() as conn:
conn.execute(text(alter_sql))
conn.commit()
migration_logger.info(f"Добавлена новая колонка {col['name']} ({col['type']}) в {pg_table}")
except Exception as e:
migration_logger.error(f"Ошибка добавления колонки {col['name']} в {table_name}: {e}")
def check_foreign_keys_exist(self, table_name: str) -> List[Dict[str, Any]]:
"""Проверить, какие внешние ключи уже существуют в PostgreSQL"""
try:
query = f"""
SELECT
tc.constraint_name,
kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_name = '{table_name.lower()}'
"""
df = pd.read_sql_query(query, db_connector.dst_engine)
existing_fks = []
for _, row in df.iterrows():
existing_fks.append({
'name': row['constraint_name'],
'column': row['column_name'],
'references_table': row['foreign_table_name'],
'references_column': row['foreign_column_name']
})
return existing_fks
except Exception as e:
migration_logger.error(f"Ошибка проверки FK в PostgreSQL для {table_name}: {e}")
return []
schema_manager = SchemaManager()