150 lines
6.3 KiB
Python
150 lines
6.3 KiB
Python
from typing import List, Dict, Any, Optional
|
|
import pandas as pd
|
|
from sqlalchemy import inspect, text
|
|
from app.core.database import db_connector
|
|
from app.core.logging import migration_logger
|
|
|
|
|
|
class SchemaManager:
|
|
"""Управление схемой таблиц"""
|
|
|
|
def get_mssql_columns(self, table_name: str) -> List[Dict[str, Any]]:
|
|
"""Получить структуру колонок из MSSQL"""
|
|
try:
|
|
query = f"""
|
|
SELECT
|
|
COLUMN_NAME,
|
|
DATA_TYPE,
|
|
IS_NULLABLE,
|
|
CHARACTER_MAXIMUM_LENGTH
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_NAME = '{table_name}'
|
|
ORDER BY ORDINAL_POSITION
|
|
"""
|
|
|
|
df = pd.read_sql_query(query, db_connector.src_engine)
|
|
return df.to_dict('records')
|
|
except Exception as e:
|
|
migration_logger.error(f"Ошибка получения колонок из MSSQL для {table_name}: {e}")
|
|
return []
|
|
|
|
def get_postgres_columns(self, table_name: str) -> List[Dict[str, Any]]:
|
|
"""Получить структуру колонок из PostgreSQL"""
|
|
try:
|
|
inspector = inspect(db_connector.dst_engine)
|
|
columns = inspector.get_columns(table_name.lower())
|
|
return [{'name': c['name'], 'type': str(c['type']), 'nullable': c['nullable']} for c in columns]
|
|
except Exception as e:
|
|
migration_logger.error(f"Ошибка получения колонок из PG для {table_name}: {e}")
|
|
return []
|
|
|
|
def detect_new_columns(self, table_name: str) -> List[Dict[str, Any]]:
|
|
"""Обнаружить новые колонки в MSSQL, которых нет в PostgreSQL"""
|
|
mssql_cols = self.get_mssql_columns(table_name)
|
|
pg_cols = self.get_postgres_columns(table_name)
|
|
|
|
pg_col_names = {c['name'] for c in pg_cols}
|
|
|
|
new_columns = []
|
|
for col in mssql_cols:
|
|
if col['COLUMN_NAME'] not in pg_col_names:
|
|
new_columns.append({
|
|
'name': col['COLUMN_NAME'],
|
|
'type': self._mssql_to_postgres_type(col['DATA_TYPE'], col['CHARACTER_MAXIMUM_LENGTH']),
|
|
'nullable': True # Всегда создаем как NULL для новых колонок
|
|
})
|
|
|
|
if new_columns:
|
|
migration_logger.info(f"🔍 Обнаружено {len(new_columns)} новых колонок в {table_name}: {[c['name'] for c in new_columns]}")
|
|
|
|
return new_columns
|
|
|
|
def _mssql_to_postgres_type(self, mssql_type: str, max_length: Optional[int]) -> str:
|
|
"""Конвертация типа MSSQL в PostgreSQL"""
|
|
mssql_type = mssql_type.lower()
|
|
|
|
type_map = {
|
|
'int': 'INTEGER',
|
|
'bigint': 'BIGINT',
|
|
'smallint': 'SMALLINT',
|
|
'tinyint': 'SMALLINT',
|
|
'bit': 'BOOLEAN',
|
|
'float': 'DOUBLE PRECISION',
|
|
'real': 'REAL',
|
|
'decimal': 'NUMERIC',
|
|
'numeric': 'NUMERIC',
|
|
'datetime': 'TIMESTAMP',
|
|
'datetime2': 'TIMESTAMP',
|
|
'date': 'DATE',
|
|
'time': 'TIME',
|
|
'char': f'CHAR({max_length})' if max_length else 'CHAR(1)',
|
|
'nchar': f'CHAR({max_length})' if max_length else 'CHAR(1)',
|
|
'varchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT',
|
|
'nvarchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT',
|
|
'text': 'TEXT',
|
|
'ntext': 'TEXT',
|
|
'uniqueidentifier': 'UUID',
|
|
}
|
|
|
|
return type_map.get(mssql_type, 'TEXT')
|
|
|
|
def add_new_columns(self, table_name: str, new_columns: List[Dict[str, Any]]):
|
|
"""Добавить новые колонки в PostgreSQL"""
|
|
if not new_columns:
|
|
return
|
|
|
|
pg_table = table_name.lower()
|
|
|
|
for col in new_columns:
|
|
try:
|
|
alter_sql = f'ALTER TABLE "{pg_table}" ADD COLUMN "{col["name"]}" {col["type"]} NULL'
|
|
|
|
with db_connector.dst_connection() as conn:
|
|
conn.execute(text(alter_sql))
|
|
conn.commit()
|
|
|
|
migration_logger.info(f"Добавлена новая колонка {col['name']} ({col['type']}) в {pg_table}")
|
|
|
|
except Exception as e:
|
|
migration_logger.error(f"Ошибка добавления колонки {col['name']} в {table_name}: {e}")
|
|
|
|
def check_foreign_keys_exist(self, table_name: str) -> List[Dict[str, Any]]:
|
|
"""Проверить, какие внешние ключи уже существуют в PostgreSQL"""
|
|
try:
|
|
query = f"""
|
|
SELECT
|
|
tc.constraint_name,
|
|
kcu.column_name,
|
|
ccu.table_name AS foreign_table_name,
|
|
ccu.column_name AS foreign_column_name
|
|
FROM
|
|
information_schema.table_constraints AS tc
|
|
JOIN information_schema.key_column_usage AS kcu
|
|
ON tc.constraint_name = kcu.constraint_name
|
|
AND tc.table_schema = kcu.table_schema
|
|
JOIN information_schema.constraint_column_usage AS ccu
|
|
ON ccu.constraint_name = tc.constraint_name
|
|
AND ccu.table_schema = tc.table_schema
|
|
WHERE tc.constraint_type = 'FOREIGN KEY'
|
|
AND tc.table_name = '{table_name.lower()}'
|
|
"""
|
|
|
|
df = pd.read_sql_query(query, db_connector.dst_engine)
|
|
|
|
existing_fks = []
|
|
for _, row in df.iterrows():
|
|
existing_fks.append({
|
|
'name': row['constraint_name'],
|
|
'column': row['column_name'],
|
|
'references_table': row['foreign_table_name'],
|
|
'references_column': row['foreign_column_name']
|
|
})
|
|
|
|
return existing_fks
|
|
|
|
except Exception as e:
|
|
migration_logger.error(f"Ошибка проверки FK в PostgreSQL для {table_name}: {e}")
|
|
return []
|
|
|
|
|
|
schema_manager = SchemaManager() |