import json from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional from app.models.replication import ReplicationSchedule from app.repository.replication_metadata_repo import replication_metadata_repo from app.core.logging import migration_logger from app.core.config import settings class ReplicationState: """Хранение состояния репликации по ID""" def __init__(self): self.replication_repo = replication_metadata_repo self.replication_repo.init_metadata_table() def get_last_replication_time(self) -> Optional[datetime]: """Получает время последней репликации (максимальное по всем таблицам)""" stats = self.replication_repo.get_all_stats() if stats['tables']: times = [t['last_sync'] for t in stats['tables'] if t['last_sync']] return max(times) if times else None return None def get_last_replication_info(self) -> dict: """Получает информацию о последней репликации""" stats = self.replication_repo.get_all_stats() return { 'last_replication': self.get_last_replication_time(), 'total_rows': stats['total_rows'], 'tables_count': stats['tables_count'] } def get_table_last_sync(self, metadata_id: int) -> Optional[datetime]: """Получает время последней синхронизации таблицы""" return self.replication_repo.get_last_sync_time(metadata_id) def get_last_id(self, table_name: str) -> Optional[int]: """Получает последний обработанный ID для таблицы""" return self.replication_repo.get_last_id(table_name) def update_last_id(self, table_name: str, last_id: int): """Обновляет последний обработанный ID""" self.replication_repo.update_last_id(table_name, last_id) def update_table_sync_time(self, schedule_id: int): """Обновляет время синхронизации таблицы""" self.replication_repo.update_sync_time(schedule_id) def update_table_stats(self, table_name: str, added_rows: int): """Обновляет статистику таблицы""" self.replication_repo.update_table_stats(table_name, added_rows) def get_all_stats(self) -> dict: """Получает статистику по всем таблицам""" return self.replication_repo.get_all_stats() def log_operation(self, table_name: str, operation: str, records_count: int, status: str = "SUCCESS", error_message: Optional[str] = None): """Логирует операцию""" self.replication_repo.log_operation(table_name, operation, records_count, status, error_message) def _set_table_total_rows(self, table_name: str, total_rows: int): """ Устанавливает точное количество строк для таблицы (в отличие от update_table_stats, который добавляет) """ from app.models.replication import ReplicationMetadata session = self.replication_repo.get_session() try: metadatas = session.query(ReplicationMetadata).filter_by( table_name=table_name ).all() if metadatas: for metadata in metadatas: metadata.total_rows = total_rows metadata.updated_at = datetime.now() session.commit() migration_logger.debug(f"Установлено total_rows={total_rows} для {table_name}") else: migration_logger.warning(f"Метаданные для {table_name} не найдены") except Exception as e: session.rollback() migration_logger.error(f"Ошибка установки total_rows для {table_name}: {e}") finally: session.close() replication_state = ReplicationState()