153 lines
5.9 KiB
Python
153 lines
5.9 KiB
Python
from typing import Optional, List, Dict, Any
|
||
import pandas as pd
|
||
from app.core.database import db_connector
|
||
from app.core.logging import migration_logger
|
||
|
||
|
||
def get_primary_key(table_name: str) -> Optional[str]:
|
||
"""Получить колонку первичного ключа"""
|
||
try:
|
||
query = f"""
|
||
SELECT TOP 1
|
||
c.name as column_name
|
||
FROM sys.indexes i
|
||
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
|
||
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
|
||
WHERE i.object_id = OBJECT_ID('{table_name}')
|
||
AND i.is_primary_key = 1
|
||
"""
|
||
|
||
pk_df = pd.read_sql_query(query, db_connector.src_engine)
|
||
|
||
if not pk_df.empty:
|
||
pk_column = pk_df.iloc[0]['column_name']
|
||
migration_logger.info(f"PRIMARY KEY для {table_name}: {pk_column}")
|
||
return pk_column
|
||
|
||
migration_logger.warning(f"В {table_name} нет PRIMARY KEY, ищем ID колонку...")
|
||
|
||
columns_query = f"""
|
||
SELECT COLUMN_NAME
|
||
FROM INFORMATION_SCHEMA.COLUMNS
|
||
WHERE TABLE_NAME = '{table_name}'
|
||
"""
|
||
|
||
columns_df = pd.read_sql_query(columns_query, db_connector.src_engine)
|
||
|
||
id_keywords = ['ID', 'Id', 'id', 'Code', 'KEY']
|
||
for col in columns_df['COLUMN_NAME']:
|
||
for keyword in id_keywords:
|
||
if keyword in col:
|
||
migration_logger.info(f"ID колонка для {table_name}: {col}")
|
||
return col
|
||
|
||
if not columns_df.empty:
|
||
first_col = columns_df.iloc[0]['COLUMN_NAME']
|
||
migration_logger.warning(f"Используем первую колонку для {table_name}: {first_col}")
|
||
return first_col
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка поиска PK для {table_name}: {e}")
|
||
return None
|
||
|
||
|
||
def get_foreign_keys(table_name: str) -> List[Dict[str, str]]:
|
||
"""Получить внешние ключи без проверки связанных таблиц"""
|
||
try:
|
||
query = f"""
|
||
SELECT
|
||
fk.name as fk_name,
|
||
pc.name as parent_column,
|
||
rc.name as referenced_column,
|
||
rt.name as referenced_table
|
||
FROM sys.foreign_keys fk
|
||
INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
|
||
INNER JOIN sys.columns pc ON fkc.parent_object_id = pc.object_id AND fkc.parent_column_id = pc.column_id
|
||
INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id
|
||
INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id
|
||
WHERE fk.parent_object_id = OBJECT_ID('{table_name}')
|
||
ORDER BY fk.name, fkc.constraint_column_id
|
||
"""
|
||
|
||
fk_df = pd.read_sql_query(query, db_connector.src_engine)
|
||
|
||
if fk_df.empty:
|
||
return []
|
||
|
||
foreign_keys = {}
|
||
for _, row in fk_df.iterrows():
|
||
fk_name = row['fk_name']
|
||
if fk_name not in foreign_keys:
|
||
foreign_keys[fk_name] = {
|
||
'name': fk_name,
|
||
'parent_column': row['parent_column'],
|
||
'referenced_table': row['referenced_table'],
|
||
'referenced_column': row['referenced_column']
|
||
}
|
||
|
||
result = list(foreign_keys.values())
|
||
migration_logger.info(f"Найдено {len(result)} внешних ключей для {table_name}")
|
||
return result
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка получения внешних ключей для {table_name}: {e}")
|
||
return []
|
||
|
||
|
||
def get_indexes(table_name: str) -> List[Dict[str, Any]]:
|
||
"""Получить индексы таблицы"""
|
||
try:
|
||
query = f"""
|
||
SELECT
|
||
i.name as index_name,
|
||
i.is_unique,
|
||
c.name as column_name
|
||
FROM sys.indexes i
|
||
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
|
||
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
|
||
WHERE i.object_id = OBJECT_ID('{table_name}')
|
||
AND i.is_primary_key = 0
|
||
ORDER BY i.name, ic.key_ordinal
|
||
"""
|
||
|
||
idx_df = pd.read_sql_query(query, db_connector.src_engine)
|
||
|
||
if idx_df.empty:
|
||
return []
|
||
|
||
indexes = {}
|
||
for _, row in idx_df.iterrows():
|
||
idx_name = row['index_name']
|
||
if idx_name not in indexes:
|
||
indexes[idx_name] = {
|
||
'name': idx_name,
|
||
'unique': bool(row['is_unique']),
|
||
'columns': []
|
||
}
|
||
indexes[idx_name]['columns'].append(row['column_name'])
|
||
|
||
result = list(indexes.values())
|
||
migration_logger.info(f"Найдено {len(result)} индексов для {table_name}")
|
||
return result
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка получения индексов для {table_name}: {e}")
|
||
return []
|
||
|
||
|
||
def get_max_id_from_postgres(table_name: str, id_column: str) -> Optional[int]:
|
||
"""Получить максимальный ID из PostgreSQL"""
|
||
try:
|
||
query = f'SELECT MAX("{id_column}") as max_id FROM "{table_name.lower()}"'
|
||
df = pd.read_sql_query(query, db_connector.dst_engine)
|
||
|
||
if not df.empty and df.iloc[0]['max_id'] is not None:
|
||
return int(df.iloc[0]['max_id'])
|
||
|
||
return 0
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка получения max ID из PG для {table_name}: {e}")
|
||
return 0 |