Files
replicator/app/utils/index_helpers.py
2026-03-08 20:21:15 +09:00

153 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional, List, Dict, Any
import pandas as pd
from app.core.database import db_connector
from app.core.logging import migration_logger
def get_primary_key(table_name: str) -> Optional[str]:
"""Получить колонку первичного ключа"""
try:
query = f"""
SELECT TOP 1
c.name as column_name
FROM sys.indexes i
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
WHERE i.object_id = OBJECT_ID('{table_name}')
AND i.is_primary_key = 1
"""
pk_df = pd.read_sql_query(query, db_connector.src_engine)
if not pk_df.empty:
pk_column = pk_df.iloc[0]['column_name']
migration_logger.info(f"PRIMARY KEY для {table_name}: {pk_column}")
return pk_column
migration_logger.warning(f"В {table_name} нет PRIMARY KEY, ищем ID колонку...")
columns_query = f"""
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
"""
columns_df = pd.read_sql_query(columns_query, db_connector.src_engine)
id_keywords = ['ID', 'Id', 'id', 'Code', 'KEY']
for col in columns_df['COLUMN_NAME']:
for keyword in id_keywords:
if keyword in col:
migration_logger.info(f"ID колонка для {table_name}: {col}")
return col
if not columns_df.empty:
first_col = columns_df.iloc[0]['COLUMN_NAME']
migration_logger.warning(f"Используем первую колонку для {table_name}: {first_col}")
return first_col
return None
except Exception as e:
migration_logger.error(f"Ошибка поиска PK для {table_name}: {e}")
return None
def get_foreign_keys(table_name: str) -> List[Dict[str, str]]:
"""Получить внешние ключи без проверки связанных таблиц"""
try:
query = f"""
SELECT
fk.name as fk_name,
pc.name as parent_column,
rc.name as referenced_column,
rt.name as referenced_table
FROM sys.foreign_keys fk
INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
INNER JOIN sys.columns pc ON fkc.parent_object_id = pc.object_id AND fkc.parent_column_id = pc.column_id
INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id
INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id
WHERE fk.parent_object_id = OBJECT_ID('{table_name}')
ORDER BY fk.name, fkc.constraint_column_id
"""
fk_df = pd.read_sql_query(query, db_connector.src_engine)
if fk_df.empty:
return []
foreign_keys = {}
for _, row in fk_df.iterrows():
fk_name = row['fk_name']
if fk_name not in foreign_keys:
foreign_keys[fk_name] = {
'name': fk_name,
'parent_column': row['parent_column'],
'referenced_table': row['referenced_table'],
'referenced_column': row['referenced_column']
}
result = list(foreign_keys.values())
migration_logger.info(f"Найдено {len(result)} внешних ключей для {table_name}")
return result
except Exception as e:
migration_logger.error(f"Ошибка получения внешних ключей для {table_name}: {e}")
return []
def get_indexes(table_name: str) -> List[Dict[str, Any]]:
"""Получить индексы таблицы"""
try:
query = f"""
SELECT
i.name as index_name,
i.is_unique,
c.name as column_name
FROM sys.indexes i
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
WHERE i.object_id = OBJECT_ID('{table_name}')
AND i.is_primary_key = 0
ORDER BY i.name, ic.key_ordinal
"""
idx_df = pd.read_sql_query(query, db_connector.src_engine)
if idx_df.empty:
return []
indexes = {}
for _, row in idx_df.iterrows():
idx_name = row['index_name']
if idx_name not in indexes:
indexes[idx_name] = {
'name': idx_name,
'unique': bool(row['is_unique']),
'columns': []
}
indexes[idx_name]['columns'].append(row['column_name'])
result = list(indexes.values())
migration_logger.info(f"Найдено {len(result)} индексов для {table_name}")
return result
except Exception as e:
migration_logger.error(f"Ошибка получения индексов для {table_name}: {e}")
return []
def get_max_id_from_postgres(table_name: str, id_column: str) -> Optional[int]:
"""Получить максимальный ID из PostgreSQL"""
try:
query = f'SELECT MAX("{id_column}") as max_id FROM "{table_name.lower()}"'
df = pd.read_sql_query(query, db_connector.dst_engine)
if not df.empty and df.iloc[0]['max_id'] is not None:
return int(df.iloc[0]['max_id'])
return 0
except Exception as e:
migration_logger.error(f"Ошибка получения max ID из PG для {table_name}: {e}")
return 0