# app/services/replication_metadata_repo.py from typing import Optional, List from datetime import datetime from fastapi import HTTPException from sqlalchemy import select from sqlalchemy.orm import Session, selectinload from app.core.database import db_connector from app.models.replication import ReplicationMetadata, ReplicationLog, ReplicationSchedule from app.core.logging import migration_logger class ReplicationMetadataRepo: """Репозиторий для работы с метаданными репликации""" def __init__(self): self.engine = db_connector.dst_engine self.SessionLocal = db_connector.dst_session def init_metadata_table(self): """Создает таблицу метаданных, если её нет""" from app.models.replication import Base Base.metadata.create_all(self.engine) migration_logger.info("Таблица replication_metadata создана/проверена") def get_table_metadata(self, metadata_id: int) -> Optional[ReplicationMetadata]: """Получает метаданные для таблицы""" session = self.SessionLocal() try: metadata = session.query(ReplicationMetadata).filter_by( id=metadata_id ).first() # Если нет, создаем # if not metadata: # metadata = ReplicationMetadata.create_if_not_exists(session, table_name) return metadata finally: session.close() def get_last_sync_time(self, metadata_id: int) -> Optional[datetime]: """Получает время последней синхронизации таблицы""" metadata = self.get_table_metadata(metadata_id) return metadata.last_sync_time if metadata else None def get_last_id(self, table_name: str) -> Optional[int]: """Получает последний обработанный ID для таблицы""" metadata = self.get_table_metadata(table_name) return metadata.last_id if metadata else None def update_sync_time(self, schedule_id: int) -> bool: """Обновляет время синхронизации таблицы""" session = self.SessionLocal() try: schedule = session.query(ReplicationSchedule).filter_by( id=schedule_id ).first() if not schedule: return False else: datenow = datetime.now() schedule.last_run = datenow metadata = schedule.table metadata.last_sync_time = datenow metadata.updated_at = datenow session.commit() migration_logger.debug(f"Updated sync time for {metadata.table_name} to {metadata.last_sync_time}") return True except Exception as e: session.rollback() migration_logger.error(f"Ошибка при обновлении времени синхронизации распиания с ID: {schedule_id}: {e}") return False finally: session.close() def update_last_id(self, table_name: str, last_id: int) -> bool: """Обновляет последний обработанный ID""" session = self.SessionLocal() try: metadatas = session.query(ReplicationMetadata).filter_by( table_name=table_name ).all() if not metadatas: return False else: for metadata in metadatas: metadata.last_id = last_id metadata.updated_at = datetime.now() session.commit() migration_logger.debug(f"Updated last_id for {table_name} to {last_id}") return True except Exception as e: session.rollback() migration_logger.error(f"Error updating last_id for {table_name}: {e}") return False finally: session.close() def update_table_stats(self, table_name: str, added_rows: int) -> bool: """Обновляет статистику таблицы""" session = self.SessionLocal() try: metadata = session.query(ReplicationMetadata).filter_by( table_name=table_name ).first() if not metadata: metadata = ReplicationMetadata( table_name=table_name, last_sync_time=datetime.now(), total_rows=added_rows ) session.add(metadata) else: metadata.total_rows += added_rows metadata.updated_at = datetime.now() session.commit() return True except Exception as e: session.rollback() migration_logger.error(f"Error updating stats for {table_name}: {e}") return False finally: session.close() def log_operation(self, table_name: str, operation: str, records_count: int, status: str = "SUCCESS", error_message: Optional[str] = None): """Логирует операцию репликации""" session = self.SessionLocal() try: log = ReplicationLog( table_name=table_name, operation=operation, records_count=records_count, status=status, error_message=error_message ) session.add(log) session.commit() except Exception as e: session.rollback() migration_logger.error(f"Error logging operation: {e}") finally: session.close() def get_all_stats(self) -> dict: """Получает статистику по всем таблицам""" session = self.SessionLocal() try: metadata_list = session.query(ReplicationMetadata).all() # Обрабатываем None значения в total_rows total_rows = sum(m.total_rows for m in metadata_list if m.total_rows is not None) active_tables = sum(1 for m in metadata_list if m.is_active) return { 'total_rows': total_rows, 'tables_count': len(metadata_list), 'active_tables': active_tables, 'tables': [ { 'name': m.table_name, 'last_sync': m.last_sync_time, 'last_id': m.last_id, 'rows': m.total_rows, 'active': m.is_active } for m in metadata_list ] } finally: session.close() def get_session(self) -> Session: return self.SessionLocal() # ========== Методы для расписаний ========== def init_default_schedules(self, metadatas: List[ReplicationMetadata]): """Инициализирует расписания по умолчанию для списка таблиц""" session = self.get_session() try: init_count = 0 for metadata in metadatas: # Проверяем, есть ли уже расписание query = session.query(ReplicationSchedule).filter_by( metadata_id=metadata.id ) schedule_exist = session.query(query.exists()) if not schedule_exist: job_name = f"{metadata.table_name} - расписание по умолчанию" # Создаем расписание по умолчанию (каждый день в 00:00) schedule = ReplicationSchedule( metadata_id=metadata.id, name=job_name, schedule_time=datetime.strptime("00:00", "%H:%M").time(), days=[], # Пустой список = все дни full_reload=False, enabled=False ) session.add(schedule) migration_logger.debug(f"Создано расписание по умолчанию для {metadata.table_name}") init_count += 1 session.commit() migration_logger.info(f"Инициализированы расписания по умолчанию для {init_count} таблиц") except Exception as e: session.rollback() migration_logger.error(f"Ошибка инициализации расписаний: {e}") finally: session.close() def add_metadata(self, table_name: str, life_table_name: Optional[str] = None, description: Optional[str] = None, enabled: Optional[bool] = False) -> Optional[ReplicationMetadata]: """Добавить метадату""" session = self.get_session() try: metadata_exists = session.query(ReplicationMetadata).filter_by( table_name=table_name ).first() if metadata_exists: raise HTTPException(status_code=400, detail=f"Метадата с таблицей {metadata_exists.table_name} существует") metadata = ReplicationMetadata( table_name=table_name, life_table_name=life_table_name, description=description, is_active=enabled ) session.add(metadata) session.commit() migration_logger.info(f"Добавлена новая метадата для {metadata.table_name}") return metadata except Exception as e: session.rollback() migration_logger.error(f"Ошибка добавления расписания: {e}") return None finally: session.close() def add_schedule(self, metadata_id: int, schedule_time: str, days: Optional[List[str]] = None, full_reload: bool = False, enabled: bool = True, name: Optional[str] = None, description: Optional[str] = None) -> Optional[ReplicationSchedule]: """Добавить НОВОЕ расписание для таблицы""" session = self.get_session() try: # Проверяем, существует ли метаданные metadata = session.query(ReplicationMetadata).filter_by( id=metadata_id ).first() if not metadata: raise HTTPException(status_code=404, detail=f"Метадата с ID {metadata_id} не найдена") # Парсим время time_obj = datetime.strptime(schedule_time, "%H:%M").time() # Создаем новое расписание schedule = ReplicationSchedule( metadata_id=metadata_id, schedule_time=time_obj, days=days if days else [], full_reload=full_reload, enabled=enabled, name=name or f"{metadata.table_name} - {'полная' if full_reload else 'инкремент'}", description=description ) session.add(schedule) session.commit() migration_logger.info(f"Добавлено новое расписание для {metadata.table_name} в {schedule_time}") return schedule.to_dict() except Exception as e: session.rollback() migration_logger.error(f"Ошибка добавления расписания: {e}") return None finally: session.close() def delete_schedule(self, schedule_id: int) -> bool: """Удалить расписание по ID""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( id=schedule_id ).first() if schedule: session.delete(schedule) session.commit() migration_logger.info(f"Удалено расписание ID={schedule_id} для {schedule.table.table_name}") return True return False except Exception as e: session.rollback() migration_logger.error(f"Ошибка удаления расписания: {e}") return False finally: session.close() def update_schedule(self, schedule_id: int, **kwargs) -> bool: """Обновить существующее расписание по ID""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( id=schedule_id ).first() if schedule: for key, value in kwargs.items(): if hasattr(schedule, key) and value is not None: if key == 'schedule_time' and isinstance(value, str): value = datetime.strptime(value, "%H:%M").time() setattr(schedule, key, value) schedule.updated_at = datetime.now() session.commit() migration_logger.info(f"Обновлено расписание ID={schedule_id}") return True return False except Exception as e: session.rollback() migration_logger.error(f"Ошибка обновления расписания: {e}") return False finally: session.close() def get_schedule(self, metadata_id: int) -> List[ReplicationSchedule]: """Получает расписание для таблицы""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( metadata_id=metadata_id ).all() return schedule finally: session.close() def get_all_schedules(self) -> List[ReplicationSchedule]: """Получает все расписания""" session = self.get_session() try: schedules = session.execute( select(ReplicationSchedule) .options(selectinload(ReplicationSchedule.table)) ).scalars().all() # Явно пометить сессию как не expiring (чтобы объекты не expire после commit) for schedule in schedules: session.expunge(schedule) # Удалить из сессии, но сохранить данные return schedules finally: session.close() def get_all_metadata(self) -> List[ReplicationMetadata]: """Получает все метадаты""" session = self.get_session() try: return session.query(ReplicationMetadata).all() finally: session.close() def get_due_schedules(self, current_time: Optional[datetime] = None) -> List[ReplicationSchedule]: """Получает расписания, которые должны запуститься сейчас""" if current_time is None: current_time = datetime.now() current_time_obj = current_time.time() current_weekday = current_time.weekday() session = self.get_session() try: schedules = session.query(ReplicationSchedule).filter_by( enabled=True ).all() due = [] for schedule in schedules: # Проверяем время и день time_diff = abs( (schedule.schedule_time.hour * 60 + schedule.schedule_time.minute) - (current_time_obj.hour * 60 + current_time_obj.minute) ) if time_diff <= 1 and current_weekday in schedule.days_list: due.append(schedule) return due finally: session.close() def update_schedule_last_run(self, schedule_id: int) -> bool: """Обновляет время последнего запуска расписания""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( id=schedule_id ).first() if schedule: schedule.last_run = datetime.now() schedule.updated_at = datetime.now() session.commit() return True return False except Exception as e: session.rollback() migration_logger.error(f"Ошибка обновления last_run: {e}") return False finally: session.close() def disable_schedule(self, schedule_id: int) -> bool: """Отключает расписание""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( id=schedule_id ).first() if schedule: schedule.enabled = False schedule.updated_at = datetime.now() session.commit() migration_logger.info(f"Отключено расписание ID={schedule_id} для {schedule.table.table_name}") return True return False except Exception as e: session.rollback() migration_logger.error(f"Ошибка отключения расписания: {e}") return False finally: session.close() def enable_schedule(self, schedule_id: int) -> bool: """Включает расписание""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( id=schedule_id ).first() if schedule: schedule.enabled = True schedule.updated_at = datetime.now() session.commit() migration_logger.info(f"Включено расписание ID={schedule_id} для {schedule.table.table_name}") return True return False except Exception as e: session.rollback() migration_logger.error(f"Ошибка включения расписания: {e}") return False finally: session.close() # ========== Методы для статистики ========== def sync_table_stats(self, table_name: str, id_column: str): """ Синхронизирует статистику таблицы с реальными данными в PostgreSQL """ from app.services.data_reader import data_reader stats = data_reader.get_table_stats(table_name, id_column) session = self.get_session() try: metadatas = session.query(ReplicationMetadata).filter_by( table_name=table_name ).all() if metadatas: for metadata in metadatas: metadata.total_rows = stats['total_rows'] metadata.last_id = stats['max_id'] metadata.updated_at = datetime.now() session.commit() migration_logger.info(f"Синхронизирована статистика {table_name}: {stats['total_rows']} строк, max_id={stats['max_id']}") else: migration_logger.warning(f"Метаданные для {table_name} не найдены") finally: session.close() def sync_all_tables_stats(self, table_names: List[str], id_columns: dict): """ Синхронизирует статистику для всех таблиц id_columns: словарь {table_name: id_column_name} """ migration_logger.info("Синхронизация статистики всех таблиц...") for table_name in table_names: id_column = id_columns.get(table_name, f"{table_name.split('_')[-1]}ID") self.sync_table_stats(table_name, id_column) migration_logger.info("Синхронизация статистики завершена") # Глобальный экземпляр replication_metadata_repo = ReplicationMetadataRepo()