Files
replicator/app/services/migrator.py
2026-03-08 20:21:15 +09:00

607 lines
27 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import Optional, List, Dict, Any
import traceback
from datetime import datetime
import pandas as pd
from app.services.replication_state import replication_state
from app.services.data_reader import data_reader
from app.services.data_writer import data_writer
from app.services.schema_manager import schema_manager
from app.utils.index_helpers import get_primary_key, get_max_id_from_postgres, get_foreign_keys, get_indexes
from app.utils.email_sender import email_sender
from app.core.logging import migration_logger
from app.core.config import settings
from app.core.database import db_connector
class DatabaseMigrator:
"""Мигратор данных по ID и по Life-таблицам"""
def __init__(self):
self.state = replication_state
self.reader = data_reader
self.writer = data_writer
self.schema = schema_manager
self.is_running = False
self.current_table = None
self.start_time = None
self.all_foreign_keys = {}
self.errors = []
# Таблицы, которые используют Life-механизм
self.life_tables = getattr(settings, 'LIFE_TABLES', [])
# Карта соответствия: основная таблица -> Life-таблица
self.life_mapping = {}
def _parse_table_name(self, table_name: str) -> Dict[str, str]:
"""
Парсит имя таблицы и возвращает компоненты.
Примеры:
"oms_kl_VisitResult" -> {
'schema': 'oms',
'basename': 'kl_VisitResult',
'full_name': 'oms_kl_VisitResult'
}
"stt_MedicalHistory" -> {
'schema': 'stt',
'basename': 'MedicalHistory',
'full_name': 'stt_MedicalHistory'
}
"""
# Ищем префикс (oms_, stt_, и т.д.)
match = re.match(r'^([A-Za-z]+)_(.*)$', table_name)
if match:
schema = match.group(1)
basename = match.group(2)
return {
'schema': schema,
'basename': basename,
'full_name': table_name
}
else:
# Если нет префикса
return {
'schema': '',
'basename': table_name,
'full_name': table_name
}
def _get_life_table_name(self, table_name: str) -> Optional[str]:
"""Получает имя Life-таблицы для основной таблицы"""
parsed = self._parse_table_name(table_name)
if parsed['schema']:
return f"Life_{parsed['schema']}_{parsed['basename']}"
else:
return f"Life_{table_name}"
def _get_life_id_field(self, table_name: str) -> str:
"""Получает имя LifeID поля"""
parsed = self._parse_table_name(table_name)
return f"{parsed['basename']}LifeID"
def _get_base_id_field(self, table_name: str) -> str:
"""Получает имя базового ID поля"""
parsed = self._parse_table_name(table_name)
return f"{parsed['basename']}ID"
def migrate_table_by_time(self, table_name: str, last_sync_time: datetime) -> Dict[str, int]:
"""Миграция таблицы через Life-механизм по времени"""
life_table = self._get_life_table_name(table_name)
base_id_field = self._get_base_id_field(table_name)
life_id_field = self._get_life_id_field(table_name)
migration_logger.info(f"Миграция {table_name} через {life_table} с {last_sync_time}")
stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0}
try:
# Получаем последние версии из Life-таблицы
query = f"""
WITH LatestLife AS (
SELECT
{base_id_field},
MAX({life_id_field}) as MaxLifeID
FROM {life_table}
WHERE x_DateTime > CAST(? AS datetime)
GROUP BY {base_id_field}
)
SELECT dl.*
FROM {life_table} dl
INNER JOIN LatestLife ll
ON dl.{life_id_field} = ll.MaxLifeID
"""
# Читаем данные чанками
chunk_size = getattr(settings, 'CHUNK_SIZE', 1000)
for chunk in self.reader.read_custom_query_chunked(query, params=(last_sync_time,), chunksize=chunk_size):
if chunk.empty:
continue
# Разделяем по операциям
inserts = chunk[chunk['x_Operation'] == 'i']
updates = chunk[chunk['x_Operation'] == 'u']
deletes = chunk[chunk['x_Operation'] == 'd']
# Обрабатываем вставки
if not inserts.empty:
inserts_to_write = self._prepare_data_for_write(inserts, table_name)
if not inserts_to_write.empty:
self.state.log_operation(
table_name=table_name,
operation='INSERT',
records_count=len(inserts)
)
self.writer.upsert_data(table_name, inserts_to_write, base_id_field)
stats['inserted'] += len(inserts)
# Обрабатываем обновления
if not updates.empty:
updates_to_write = self._prepare_data_for_write(updates, table_name)
if not updates_to_write.empty:
self.state.log_operation(
table_name=table_name,
operation='UPDATE',
records_count=len(updates)
)
self.writer.upsert_data(table_name, updates_to_write, base_id_field)
stats['updated'] += len(updates)
# Обрабатываем удаления
if not deletes.empty:
self.state.log_operation(
table_name=table_name,
operation='DELETE',
records_count=len(deletes)
)
delete_ids = deletes[base_id_field].tolist()
self.writer.delete_data(table_name, base_id_field, delete_ids)
stats['deleted'] += len(deletes)
stats['total'] += len(chunk)
migration_logger.info(f" Чанк: +{len(inserts)} вставок, ~{len(updates)} обновлений, -{len(deletes)} удалений")
if stats['total'] > 0:
migration_logger.info(f"{table_name}: +{stats['inserted']} вставок, ~{stats['updated']} обновлений, -{stats['deleted']} удалений")
else:
migration_logger.info(f" {table_name}: изменений нет")
except Exception as e:
error_msg = f"Ошибка при миграции {table_name} через Life: {e}"
migration_logger.error(error_msg)
migration_logger.error(e.args)
self.state.log_operation(
table_name=table_name,
operation='ERROR',
records_count=0,
status='ERROR',
error_message=str(e)[:500]
)
self.errors.append({
'table': table_name,
'error': error_msg,
'traceback': traceback.format_exc(),
'time': datetime.now()
})
raise
return stats
def _prepare_data_for_write(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
"""Подготавливает данные из Life-таблицы для записи в основную таблицу"""
# Исключаем служебные поля
exclude_fields = {'x_Operation', 'x_DateTime', 'x_Seance', 'x_User'}
# Определяем, какие поля оставить
fields_to_keep = []
for col in df.columns:
if col not in exclude_fields and not col.endswith('LifeID'):
fields_to_keep.append(col)
result = df[fields_to_keep].copy()
# Убеждаемся, что нет дубликатов по ID
base_id_field = self._get_base_id_field(table_name)
result = result.drop_duplicates(subset=[base_id_field])
return result
def migrate_table(self, table_name: str, full_reload: bool = False) -> bool:
"""Миграция одной таблицы (поддерживает и ID, и Life)"""
migration_logger.table_start(table_name)
self.current_table = table_name
table_start_time = datetime.now()
try:
# Получаем ID колонку для статистики
id_column = get_primary_key(table_name)
# Определяем, использует ли таблица Life-механизм
uses_life = table_name in self.life_tables
if uses_life and not full_reload:
# МИГРАЦИЯ ЧЕРЕЗ LIFE-ТАБЛИЦУ ПО ВРЕМЕНИ
last_sync = self.state.get_table_last_sync(table_name)
if last_sync:
stats = self.migrate_table_by_time(table_name, last_sync)
# Обновляем время синхронизации
self.state.update_table_sync_time(table_name)
# Обновляем статистику
if id_column:
self._update_table_statistics(table_name, id_column)
migration_logger.table_success(table_name, stats['total'])
return True
else:
# Если синхронизации не было - делаем полную загрузку
migration_logger.info(f"Первая синхронизация {table_name} - полная загрузка")
full_reload = True
if full_reload:
# ПОЛНАЯ ПЕРЕЗАГРУЗКА (по ID)
result = self._full_reload_by_id(table_name)
# Обновляем статистику после полной загрузки
if result and id_column:
self._update_table_statistics(table_name, id_column)
return result
else:
# ИНКРЕМЕНТАЛЬНАЯ ПО ID (для таблиц без Life)
result = self._incremental_by_id(table_name)
# Обновляем статистику после инкрементальной загрузки
if result and id_column:
self._update_table_statistics(table_name, id_column)
return result
except Exception as e:
error_msg = f"Критическая ошибка при обработке {table_name}: {e}"
migration_logger.error(error_msg)
self.errors.append({
'table': table_name,
'error': error_msg,
'traceback': traceback.format_exc(),
'time': datetime.now()
})
return False
finally:
self.current_table = None
def _full_reload_by_id(self, table_name: str) -> bool:
"""Полная перезагрузка таблицы по ID"""
migration_logger.info(f"Полная загрузка {table_name} по ID")
try:
# Получаем ID колонку
id_column = get_primary_key(table_name)
if not id_column:
error_msg = f"Не могу найти ID колонку для {table_name}"
migration_logger.error(error_msg)
self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()})
return False
# Получаем метаданные
foreign_keys = get_foreign_keys(table_name)
indexes = get_indexes(table_name)
# Загружаем данные чанками
first_chunk = True
total_rows = 0
for chunk in self.reader.read_by_id_chunked(table_name, id_column, None):
if first_chunk:
self.writer.create_table(table_name, chunk)
first_chunk = False
else:
self.writer.append_data(table_name, chunk)
total_rows += len(chunk)
if total_rows == 0:
migration_logger.warning(f"Таблица {table_name} пуста")
return True
# Создаем индексы
if indexes:
self.writer.create_indexes(table_name, indexes)
migration_logger.info(f"📇 Создано {len(indexes)} индексов")
# Сохраняем информацию о внешних ключах
if foreign_keys:
self.all_foreign_keys[table_name] = foreign_keys
migration_logger.info(f"🔗 Сохранено {len(foreign_keys)} внешних ключей")
# Обновляем last_id и время синхронизации
#max_id = self._get_max_id(table_name, id_column)
#self.state.update_last_id(table_name, max_id)
#self.state.update_table_sync_time(table_name)
#self.state.update_table_stats(table_name, total_rows)
return True
except Exception as e:
error_msg = f"Ошибка при полной загрузке {table_name}: {e}"
migration_logger.error(error_msg)
self.errors.append({
'table': table_name,
'error': error_msg,
'traceback': traceback.format_exc(),
'time': datetime.now()
})
return False
def _incremental_by_id(self, table_name: str) -> bool:
"""Инкрементальная загрузка по ID (для таблиц без Life)"""
migration_logger.info(f"Инкрементальная загрузка {table_name} по ID")
try:
id_column = get_primary_key(table_name)
if not id_column:
error_msg = f"Не могу найти ID колонку для {table_name}"
migration_logger.error(error_msg)
self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()})
return False
foreign_keys = get_foreign_keys(table_name)
indexes = get_indexes(table_name)
# Проверяем новые колонки
new_columns = self.schema.detect_new_columns(table_name)
if new_columns:
self.schema.add_new_columns(table_name, new_columns)
# Получаем последний ID
last_id = self.state.get_last_id(table_name)
migration_logger.info(f"last_id из состояния: {last_id}")
if last_id is None:
last_id = get_max_id_from_postgres(table_name, id_column)
migration_logger.info(f"Последний ID в PG: {last_id}")
# Загружаем новые данные
total_loaded = 0
first_chunk = True
for chunk in self.reader.read_by_id_chunked(table_name, id_column, last_id):
if first_chunk:
# Проверяем структуру
pg_cols = {c['name'] for c in self.schema.get_postgres_columns(table_name)}
if not pg_cols.issubset(set(chunk.columns)):
missing = pg_cols - set(chunk.columns)
migration_logger.warning(f"В PG есть колонки, которых нет в чанке: {missing}")
first_chunk = False
self.writer.append_data(table_name, chunk)
total_loaded += len(chunk)
#if total_loaded > 0:
# # Обновляем последний ID и время синхронизации
# max_id = self._get_max_id(table_name, id_column)
# self.state.update_last_id(table_name, max_id)
# self.state.update_table_sync_time(table_name)
# self.state.update_table_stats(table_name, total_loaded)
# Сохраняем FK для создания позже
if foreign_keys:
if table_name not in self.all_foreign_keys:
self.all_foreign_keys[table_name] = []
self.all_foreign_keys[table_name].extend(foreign_keys)
return True
except Exception as e:
error_msg = f"Ошибка при инкрементальной загрузке {table_name}: {e}"
migration_logger.error(error_msg)
self.errors.append({
'table': table_name,
'error': error_msg,
'traceback': traceback.format_exc(),
'time': datetime.now()
})
return False
def _get_max_id(self, table_name: str, id_column: str) -> int:
"""Получает максимальный ID из источника"""
max_id_query = f"SELECT MAX({id_column}) as max_id FROM {table_name}"
max_df = pd.read_sql_query(max_id_query, db_connector.src_engine)
return int(max_df.iloc[0]['max_id']) if not max_df.empty else 0
def create_all_foreign_keys(self):
"""Создать все внешние ключи после завершения миграции"""
if not self.all_foreign_keys:
migration_logger.info(" Нет внешних ключей для создания")
return
migration_logger.info("="*60)
migration_logger.info("🔗 СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ")
migration_logger.info("="*60)
for table_name, foreign_keys in self.all_foreign_keys.items():
try:
existing = self.schema.check_foreign_keys_exist(table_name)
existing_names = {f['name'] for f in existing}
to_create = [fk for fk in foreign_keys if fk['name'] not in existing_names]
if to_create:
self.writer.create_foreign_keys(table_name, to_create)
else:
migration_logger.info(f"Все внешние ключи для {table_name} уже существуют")
except Exception as e:
error_msg = f"Ошибка создания FK для {table_name}: {e}"
migration_logger.error(error_msg)
self.errors.append({
'table': table_name,
'error': error_msg,
'traceback': traceback.format_exc(),
'time': datetime.now()
})
def run_migration(self, tables: Optional[List[str]] = None, full_reload: bool = False, send_email: bool = True):
"""Запуск миграции для всех таблиц"""
self.is_running = True
self.start_time = datetime.now()
self.all_foreign_keys = {}
self.errors = []
if tables is None:
tables = settings.TABLES_TO_COPY
last_replication = self.state.get_last_replication_time()
migration_logger.info("="*70)
migration_logger.info("НАЧАЛО МИГРАЦИИ")
migration_logger.info(f"Время старта: {self.start_time}")
if last_replication:
migration_logger.info(f"Последняя миграция: {last_replication}")
migration_logger.info(f"Таблиц для обработки: {len(tables)}")
migration_logger.info(f"Режим: {'ПОЛНАЯ' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'}")
migration_logger.info(f"Таблицы с Life-механизмом: {self.life_tables}")
migration_logger.info("="*70)
results = {}
for i, table_name in enumerate(tables, 1):
if not self.is_running:
migration_logger.warning("Миграция остановлена пользователем")
break
migration_logger.info(f"\n[{i}/{len(tables)}] Обработка таблицы {table_name}")
results[table_name] = self.migrate_table(table_name, full_reload)
# Создаем внешние ключи после всех таблиц
self.create_all_foreign_keys()
total_time = (datetime.now() - self.start_time).total_seconds()
stats = self.state.get_all_stats()
self._log_final_stats(results, stats, total_time)
# Отправляем уведомление
if send_email:
self._send_notification(results, stats, total_time)
self.is_running = False
return results
def _log_final_stats(self, results: dict, stats: dict, total_time: float):
"""Логирует финальную статистику"""
migration_logger.info("="*70)
migration_logger.info("ИТОГОВАЯ СТАТИСТИКА")
migration_logger.info("="*70)
migration_logger.info(f"Успешно: {sum(1 for r in results.values() if r)}/{len(results)}")
migration_logger.info(f"Ошибок: {len(self.errors)}")
migration_logger.info(f"Всего строк в БД: {stats.get('total_rows', 0)}")
migration_logger.info(f"Общее время: {total_time:.1f}с")
migration_logger.info("="*70)
def _send_notification(self, results: dict, stats: dict, total_time: float):
"""Отправляет уведомление о результате"""
if self.errors:
error_body = self._build_error_email_body(results, stats, total_time)
email_sender.send_email(
subject=f"МИГРАЦИЯ С ОШИБКАМИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
body=error_body
)
# else:
# email_sender.send_success_notification(stats, total_time)
def _build_error_email_body(self, results: dict, stats: dict, total_time: float) -> str:
"""Строит тело письма с ошибками"""
body = f"""
🚨 МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ
{'='*60}
Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Длительность: {total_time:.1f} сек
СТАТИСТИКА:
{'='*40}
Успешно: {sum(1 for r in results.values() if r)}/{len(results)}
Ошибок: {len(self.errors)}
Всего строк: {stats.get('total_rows', 0)}
СПИСОК ОШИБОК:
{'='*40}
"""
for i, err in enumerate(self.errors, 1):
body += f"\n{i}. Таблица: {err.get('table', 'N/A')}\n"
body += f" Ошибка: {err['error']}\n"
body += f" Время: {err['time'].strftime('%H:%M:%S') if 'time' in err else 'N/A'}\n"
return body
def stop_migration(self):
self.is_running = False
migration_logger.warning("Миграция остановлена")
email_sender.send_email(
subject=f"МИГРАЦИЯ ОСТАНОВЛЕНА - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
body=f"Миграция была остановлена пользователем в {datetime.now().strftime('%H:%M:%S')}"
)
def get_status(self) -> dict:
if not self.is_running:
return {
'is_running': False,
'last_errors': len(self.errors),
'last_replication': self.state.get_last_replication_info()
}
elapsed = (datetime.now() - self.start_time).total_seconds() if self.start_time else 0
return {
'is_running': True,
'current_table': self.current_table,
'elapsed_seconds': elapsed,
'errors_count': len(self.errors)
}
def _update_table_statistics(self, table_name: str, id_column: str):
"""
Обновляет статистику таблицы на основе реальных данных в PostgreSQL
Вызывается сразу после миграции таблицы
"""
try:
migration_logger.info(f"Обновление статистики для {table_name}...")
# Получаем реальную статистику из PostgreSQL
dst_stats = self.reader.get_table_stats(table_name, id_column)
# Обновляем в метаданных
self.state.update_last_id(table_name, dst_stats['max_id'])
# Для total_rows нужно установить точное значение, а не добавлять
# Поэтому используем отдельный метод для установки
replication_state._set_table_total_rows(table_name, dst_stats['total_rows'])
migration_logger.info(f" Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}")
# Логируем операцию
self.state.log_operation(
table_name=table_name,
operation='STATS_UPDATE',
records_count=dst_stats['total_rows'],
status='SUCCESS'
)
except Exception as e:
migration_logger.error(f"Ошибка обновления статистики для {table_name}: {e}")
self.state.log_operation(
table_name=table_name,
operation='STATS_UPDATE',
records_count=0,
status='ERROR',
error_message=str(e)[:500]
)
migrator = DatabaseMigrator()