import re from typing import Optional, List, Dict, Any import traceback from datetime import datetime import pandas as pd from app.models.replication import ReplicationSchedule from app.services.replication_state import replication_state from app.services.data_reader import data_reader from app.services.data_writer import data_writer from app.services.schema_manager import schema_manager from app.utils.index_helpers import get_primary_key, get_max_id_from_postgres, get_foreign_keys, get_indexes from app.utils.email_sender import email_sender from app.core.logging import migration_logger from app.core.config import settings from app.core.database import db_connector class DatabaseMigrator: """Мигратор данных по ID и по Life-таблицам""" def __init__(self): self.state = replication_state self.reader = data_reader self.writer = data_writer self.schema = schema_manager self.is_running = False self.current_table = None self.start_time = None self.all_foreign_keys = {} self.errors = [] # Таблицы, которые используют Life-механизм self.life_tables = getattr(settings, 'LIFE_TABLES', []) # Карта соответствия: основная таблица -> Life-таблица self.life_mapping = {} def _parse_table_name(self, table_name: str) -> Dict[str, str]: """ Парсит имя таблицы и возвращает компоненты. Примеры: "oms_kl_VisitResult" -> { 'schema': 'oms', 'basename': 'kl_VisitResult', 'full_name': 'oms_kl_VisitResult' } "stt_MedicalHistory" -> { 'schema': 'stt', 'basename': 'MedicalHistory', 'full_name': 'stt_MedicalHistory' } """ # Ищем префикс (oms_, stt_, и т.д.) match = re.match(r'^([A-Za-z]+)_(.*)$', table_name) if match: schema = match.group(1) basename = match.group(2) return { 'schema': schema, 'basename': basename, 'full_name': table_name } else: # Если нет префикса return { 'schema': '', 'basename': table_name, 'full_name': table_name } def _get_life_table_name(self, table_name: str) -> Optional[str]: """Получает имя Life-таблицы для основной таблицы""" parsed = self._parse_table_name(table_name) if parsed['schema']: return f"Life_{parsed['schema']}_{parsed['basename']}" else: return f"Life_{table_name}" def _get_life_id_field(self, table_name: str) -> str: """Получает имя LifeID поля""" parsed = self._parse_table_name(table_name) return f"{parsed['basename']}LifeID" def _get_base_id_field(self, table_name: str) -> str: """Получает имя базового ID поля""" parsed = self._parse_table_name(table_name) return f"{parsed['basename']}ID" def migrate_table_by_time(self, table_name: str, life_table_name: str, last_sync_time: datetime) -> Dict[str, int]: """Миграция таблицы через Life-механизм по времени""" base_id_field = self._get_base_id_field(table_name) life_id_field = self._get_life_id_field(table_name) migration_logger.info(f"Миграция {table_name} через {life_table_name} с {last_sync_time}") stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0} try: # Получаем последние версии из Life-таблицы query = f""" WITH LatestLife AS ( SELECT {base_id_field}, MAX({life_id_field}) as MaxLifeID FROM {life_table_name} WHERE x_DateTime > CAST(? AS datetime) GROUP BY {base_id_field} ) SELECT dl.* FROM {life_table_name} dl INNER JOIN LatestLife ll ON dl.{life_id_field} = ll.MaxLifeID """ # Читаем данные чанками chunk_size = getattr(settings, 'CHUNK_SIZE', 1000) for chunk in self.reader.read_custom_query_chunked(query, params=(last_sync_time,), chunksize=chunk_size): if chunk.empty: continue # Разделяем по операциям inserts = chunk[chunk['x_Operation'] == 'i'] updates = chunk[chunk['x_Operation'] == 'u'] deletes = chunk[chunk['x_Operation'] == 'd'] # Обрабатываем вставки if not inserts.empty: inserts_to_write = self._prepare_data_for_write(inserts, table_name) if not inserts_to_write.empty: self.state.log_operation( table_name=table_name, operation='INSERT', records_count=len(inserts) ) self.writer.upsert_data(table_name, inserts_to_write, base_id_field) stats['inserted'] += len(inserts) # Обрабатываем обновления if not updates.empty: updates_to_write = self._prepare_data_for_write(updates, table_name) if not updates_to_write.empty: self.state.log_operation( table_name=table_name, operation='UPDATE', records_count=len(updates) ) self.writer.upsert_data(table_name, updates_to_write, base_id_field) stats['updated'] += len(updates) # Обрабатываем удаления if not deletes.empty: self.state.log_operation( table_name=table_name, operation='DELETE', records_count=len(deletes) ) delete_ids = deletes[base_id_field].tolist() self.writer.delete_data(table_name, base_id_field, delete_ids) stats['deleted'] += len(deletes) stats['total'] += len(chunk) migration_logger.info(f" Чанк: +{len(inserts)} вставок, ~{len(updates)} обновлений, -{len(deletes)} удалений") if stats['total'] > 0: migration_logger.info(f"{table_name}: +{stats['inserted']} вставок, ~{stats['updated']} обновлений, -{stats['deleted']} удалений") else: migration_logger.info(f"ℹ️ {table_name}: изменений нет") except Exception as e: error_msg = f"Ошибка при миграции {table_name} через Life: {e}" migration_logger.error(error_msg) migration_logger.error(e.args) self.state.log_operation( table_name=table_name, operation='ERROR', records_count=0, status='ERROR', error_message=str(e)[:500] ) self.errors.append({ 'table': table_name, 'error': error_msg, 'traceback': traceback.format_exc(), 'time': datetime.now() }) raise return stats def _prepare_data_for_write(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame: """Подготавливает данные из Life-таблицы для записи в основную таблицу""" # Исключаем служебные поля exclude_fields = {'x_Operation', 'x_DateTime', 'x_Seance', 'x_User'} # Определяем, какие поля оставить fields_to_keep = [] for col in df.columns: if col not in exclude_fields and not col.endswith('LifeID'): fields_to_keep.append(col) result = df[fields_to_keep].copy() # Убеждаемся, что нет дубликатов по ID base_id_field = self._get_base_id_field(table_name) result = result.drop_duplicates(subset=[base_id_field]) return result def migrate_table(self, table_name: str, schedule_id: int, metadata_id: int, life_table_name: Optional[str], uses_life: bool = False, full_reload: bool = False) -> bool: """Миграция одной таблицы (поддерживает и ID, и Life)""" migration_logger.table_start(table_name) self.current_table = table_name try: # Получаем ID колонку для статистики id_column = get_primary_key(table_name) if uses_life and not full_reload and life_table_name: # МИГРАЦИЯ ЧЕРЕЗ LIFE-ТАБЛИЦУ ПО ВРЕМЕНИ last_sync = self.state.get_table_last_sync(metadata_id) if last_sync: stats = self.migrate_table_by_time(table_name, life_table_name, last_sync) # Обновляем время синхронизации self.state.update_table_sync_time(schedule_id) # Обновляем статистику if id_column: self._update_table_statistics(table_name, id_column) migration_logger.table_success(table_name, stats['total']) return True else: # Если синхронизации не было - делаем полную загрузку migration_logger.info(f"Первая синхронизация {table_name} - полная загрузка") full_reload = True if full_reload: # ПОЛНАЯ ПЕРЕЗАГРУЗКА (по ID) result = self._full_reload_by_id(table_name) # Обновляем статистику после полной загрузки if result and id_column: self._update_table_statistics(table_name, id_column) return result else: # ИНКРЕМЕНТАЛЬНАЯ ПО ID (для таблиц без Life) result = self._incremental_by_id(table_name) # Обновляем статистику после инкрементальной загрузки if result and id_column: self._update_table_statistics(table_name, id_column) return result except Exception as e: error_msg = f"Критическая ошибка при обработке {table_name}: {e}" migration_logger.error(error_msg) self.errors.append({ 'table': table_name, 'error': error_msg, 'traceback': traceback.format_exc(), 'time': datetime.now() }) return False finally: self.current_table = None def _full_reload_by_id(self, table_name: str) -> bool: """Полная перезагрузка таблицы по ID""" migration_logger.info(f"Полная загрузка {table_name} по ID") try: # Получаем ID колонку id_column = get_primary_key(table_name) if not id_column: error_msg = f"Не могу найти ID колонку для {table_name}" migration_logger.error(error_msg) self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()}) return False # Получаем метаданные foreign_keys = get_foreign_keys(table_name) indexes = get_indexes(table_name) # Загружаем данные чанками first_chunk = True total_rows = 0 for chunk in self.reader.read_by_id_chunked(table_name, id_column, None): if first_chunk: self.writer.create_table(table_name, chunk) first_chunk = False else: self.writer.append_data(table_name, chunk) total_rows += len(chunk) if total_rows == 0: migration_logger.warning(f"Таблица {table_name} пуста") return True # Создаем индексы if indexes: self.writer.create_indexes(table_name, indexes) migration_logger.info(f"📇 Создано {len(indexes)} индексов") # Сохраняем информацию о внешних ключах if foreign_keys: self.all_foreign_keys[table_name] = foreign_keys migration_logger.info(f"🔗 Сохранено {len(foreign_keys)} внешних ключей") # Обновляем last_id и время синхронизации #max_id = self._get_max_id(table_name, id_column) #self.state.update_last_id(table_name, max_id) #self.state.update_table_sync_time(table_name) #self.state.update_table_stats(table_name, total_rows) return True except Exception as e: error_msg = f"Ошибка при полной загрузке {table_name}: {e}" migration_logger.error(error_msg) self.errors.append({ 'table': table_name, 'error': error_msg, 'traceback': traceback.format_exc(), 'time': datetime.now() }) return False def _incremental_by_id(self, table_name: str, metadata) -> bool: """Инкрементальная загрузка по ID (для таблиц без Life)""" migration_logger.info(f"Инкрементальная загрузка {table_name} по ID") try: id_column = get_primary_key(table_name) if not id_column: error_msg = f"Не могу найти ID колонку для {table_name}" migration_logger.error(error_msg) self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()}) return False foreign_keys = get_foreign_keys(table_name) indexes = get_indexes(table_name) # Проверяем новые колонки new_columns = self.schema.detect_new_columns(table_name) if new_columns: self.schema.add_new_columns(table_name, new_columns) # Получаем последний ID last_id = self.state.get_last_id(table_name) migration_logger.info(f"last_id из состояния: {last_id}") if last_id is None: last_id = get_max_id_from_postgres(table_name, id_column) migration_logger.info(f"Последний ID в PG: {last_id}") # Загружаем новые данные total_loaded = 0 first_chunk = True for chunk in self.reader.read_by_id_chunked(table_name, id_column, last_id): if first_chunk: # Проверяем структуру pg_cols = {c['name'] for c in self.schema.get_postgres_columns(table_name)} if not pg_cols.issubset(set(chunk.columns)): missing = pg_cols - set(chunk.columns) migration_logger.warning(f"В PG есть колонки, которых нет в чанке: {missing}") first_chunk = False self.writer.append_data(table_name, chunk) total_loaded += len(chunk) #if total_loaded > 0: # # Обновляем последний ID и время синхронизации # max_id = self._get_max_id(table_name, id_column) # self.state.update_last_id(table_name, max_id) # self.state.update_table_sync_time(table_name) # self.state.update_table_stats(table_name, total_loaded) # Сохраняем FK для создания позже if foreign_keys: if table_name not in self.all_foreign_keys: self.all_foreign_keys[table_name] = [] self.all_foreign_keys[table_name].extend(foreign_keys) return True except Exception as e: error_msg = f"Ошибка при инкрементальной загрузке {table_name}: {e}" migration_logger.error(error_msg) self.errors.append({ 'table': table_name, 'error': error_msg, 'traceback': traceback.format_exc(), 'time': datetime.now() }) return False def _get_max_id(self, table_name: str, id_column: str) -> int: """Получает максимальный ID из источника""" max_id_query = f"SELECT MAX({id_column}) as max_id FROM {table_name}" max_df = pd.read_sql_query(max_id_query, db_connector.src_engine) return int(max_df.iloc[0]['max_id']) if not max_df.empty else 0 def create_all_foreign_keys(self): """Создать все внешние ключи после завершения миграции""" if not self.all_foreign_keys: migration_logger.info("Нет внешних ключей для создания") return migration_logger.info("="*60) migration_logger.info("СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ") migration_logger.info("="*60) for table_name, foreign_keys in self.all_foreign_keys.items(): try: existing = self.schema.check_foreign_keys_exist(table_name) existing_names = {f['name'] for f in existing} to_create = [fk for fk in foreign_keys if fk['name'] not in existing_names] if to_create: self.writer.create_foreign_keys(table_name, to_create) else: migration_logger.info(f"Все внешние ключи для {table_name} уже существуют") except Exception as e: error_msg = f"Ошибка создания FK для {table_name}: {e}" migration_logger.error(error_msg) self.errors.append({ 'table': table_name, 'error': error_msg, 'traceback': traceback.format_exc(), 'time': datetime.now() }) def run_migration( self, table_name: str, schedule_id: int, metadata_id: int, life_table_name: Optional[str] = None, uses_life: bool = False, full_reload: bool = False, send_email: bool = True ): """Запуск миграции таблицы""" self.is_running = True self.start_time = datetime.now() self.all_foreign_keys = {} self.errors = [] last_replication = self.state.get_last_replication_time() migration_logger.info("="*70) migration_logger.info("НАЧАЛО МИГРАЦИИ") migration_logger.info(f"Время старта: {self.start_time}") if last_replication: migration_logger.info(f"Последняя миграция: {last_replication}") migration_logger.info(f"Таблица для обработки: {table_name}") migration_logger.info(f"Режим: {'ПОЛНАЯ' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'}") migration_logger.info("="*70) if not self.is_running: migration_logger.warning("Миграция остановлена пользователем") return migration_logger.info(f"Обработка таблицы {table_name}") results = self.migrate_table(table_name, schedule_id, metadata_id, life_table_name, uses_life, full_reload) # Создаем внешние ключи после всех таблиц self.create_all_foreign_keys() total_time = (datetime.now() - self.start_time).total_seconds() stats = self.state.get_all_stats() self._log_final_stats(results, stats, total_time) # Отправляем уведомление if send_email: self._send_notification(results, stats, total_time) self.is_running = False return results def _log_final_stats(self, has_migrated: bool, stats: dict, total_time: float): """Логирует финальную статистику""" migration_logger.info("="*70) migration_logger.info("ИТОГОВАЯ СТАТИСТИКА") migration_logger.info("="*70) migration_logger.info(f"Ошибок: {len(self.errors)}") migration_logger.info(f"Всего строк в БД: {stats.get('total_rows', 0)}") migration_logger.info(f"Общее время: {total_time:.1f}с") migration_logger.info("="*70) def _send_notification(self, has_migrated: bool, stats: dict, total_time: float): """Отправляет уведомление о результате""" if self.errors: error_body = self._build_error_email_body(has_migrated, stats, total_time) email_sender.send_email( subject=f"МИГРАЦИЯ С ОШИБКАМИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}", body=error_body ) # else: # email_sender.send_success_notification(stats, total_time) def _build_error_email_body(self, has_migrated: bool, stats: dict, total_time: float) -> str: """Строит тело письма с ошибками""" body = f""" МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ {'='*60} Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Длительность: {total_time:.1f} сек СТАТИСТИКА: {'='*40} Ошибок: {len(self.errors)} Всего строк: {stats.get('total_rows', 0)} СПИСОК ОШИБОК: {'='*40} """ for i, err in enumerate(self.errors, 1): body += f"\n{i}. Таблица: {err.get('table', 'N/A')}\n" body += f" Ошибка: {err['error']}\n" body += f" Время: {err['time'].strftime('%H:%M:%S') if 'time' in err else 'N/A'}\n" return body def stop_migration(self): self.is_running = False migration_logger.warning("Миграция остановлена") email_sender.send_email( subject=f"МИГРАЦИЯ ОСТАНОВЛЕНА - {datetime.now().strftime('%Y-%m-%d %H:%M')}", body=f"Миграция была остановлена пользователем в {datetime.now().strftime('%H:%M:%S')}" ) def get_status(self) -> dict: if not self.is_running: return { 'is_running': False, 'last_errors': len(self.errors), 'last_replication': self.state.get_last_replication_info() } elapsed = (datetime.now() - self.start_time).total_seconds() if self.start_time else 0 return { 'is_running': True, 'current_table': self.current_table, 'elapsed_seconds': elapsed, 'errors_count': len(self.errors) } def _update_table_statistics(self, table_name: str, id_column: str): """ Обновляет статистику таблицы на основе реальных данных в PostgreSQL Вызывается сразу после миграции таблицы """ try: migration_logger.info(f"Обновление статистики для {table_name}...") # Получаем реальную статистику из PostgreSQL dst_stats = self.reader.get_table_stats(table_name, id_column) # Обновляем в метаданных self.state.update_last_id(table_name, dst_stats['max_id']) # Для total_rows нужно установить точное значение, а не добавлять # Поэтому используем отдельный метод для установки replication_state._set_table_total_rows(table_name, dst_stats['total_rows']) migration_logger.info(f"Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}") # Логируем операцию self.state.log_operation( table_name=table_name, operation='STATS_UPDATE', records_count=dst_stats['total_rows'], status='SUCCESS' ) except Exception as e: migration_logger.error(f"Ошибка обновления статистики для {table_name}: {e}") self.state.log_operation( table_name=table_name, operation='STATS_UPDATE', records_count=0, status='ERROR', error_message=str(e)[:500] ) migrator = DatabaseMigrator()