import logging import os import json import traceback from datetime import datetime from typing import Dict, Any, Optional, List from pathlib import Path from app.core.config import settings class MigrationLogger: """Класс для логирования процесса миграции""" def __init__(self): self.start_time = datetime.now() self.timestamp = self.start_time.strftime("%Y%m%d_%H%M%S") # Создаем директорию для логов log_dir = Path(settings.LOG_DIR) log_dir.mkdir(exist_ok=True) self.log_file = log_dir / f"migration_log_{self.timestamp}.log" self.stats = { 'total_tables': len(settings.TABLES_TO_COPY), 'copied_tables': [], 'failed_tables': [], 'schema_changes': [], 'total_rows': 0, 'start_time': self.start_time, 'end_time': None, 'errors': [] } # Настраиваем logging self._setup_logging() def _setup_logging(self): """Настройка системы логирования""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(self.log_file, encoding='utf-8'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def info(self, message: str): """Логирование информационного сообщения""" self.logger.info(message) def error(self, message: str, exception: Optional[Exception] = None): """Логирование ошибки""" error_details = { 'message': message, 'exception': str(exception) if exception else None, 'traceback': traceback.format_exc() if exception else None, 'timestamp': datetime.now().isoformat() } self.stats['errors'].append(error_details) self.logger.error(f"{message}. Ошибка: {exception}") def warning(self, message: str): """Логирование предупреждения""" self.logger.warning(message) def debug(self, message: str): """Логирование отладочной информации""" self.logger.debug(message) def table_start(self, table_name: str): """Логирование начала обработки таблицы""" self.logger.info(f"{'='*60}") self.logger.info(f"Начало обработки таблицы: {table_name}") self.logger.info(f"{'='*60}") def table_success(self, table_name: str, row_count: int): """Логирование успешного копирования таблицы""" self.stats['copied_tables'].append({ 'name': table_name, 'row_count': row_count, 'timestamp': datetime.now().isoformat() }) self.stats['total_rows'] += row_count self.logger.info(f"Таблица {table_name} успешно скопирована ({row_count} строк)") def table_failure(self, table_name: str, error: str): """Логирование ошибки при копировании таблицы""" self.stats['failed_tables'].append({ 'name': table_name, 'error': error, 'timestamp': datetime.now().isoformat() }) self.logger.error(f"Ошибка при копировании таблицы {table_name}: {error}") def progress(self, table_name: str, chunk_num: int, total_rows: int): """Логирование прогресса загрузки""" self.logger.info(f"Таблица {table_name}: загружено чанков {chunk_num}, строк {total_rows}") def schema_change(self, table_name: str, new_columns: List[str]): """Логирование изменения схемы""" self.stats['schema_changes'].append({ 'table': table_name, 'new_columns': new_columns, 'timestamp': datetime.now().isoformat() }) self.logger.info(f"В таблице {table_name} обнаружены новые колонки: {new_columns}") def generate_report(self) -> Dict[str, Any]: """Генерация итогового отчета""" self.stats['end_time'] = datetime.now() duration = self.stats['end_time'] - self.stats['start_time'] self.stats['duration_seconds'] = duration.total_seconds() self.stats['duration_human'] = str(duration) report = { 'summary': { 'total_tables': self.stats['total_tables'], 'successful_tables': len(self.stats['copied_tables']), 'failed_tables': len(self.stats['failed_tables']), 'schema_changes': len(self.stats['schema_changes']), 'success_rate': (len(self.stats['copied_tables']) / self.stats['total_tables'] * 100) if self.stats['total_tables'] > 0 else 0, 'total_rows': self.stats['total_rows'], 'start_time': self.stats['start_time'].isoformat(), 'end_time': self.stats['end_time'].isoformat(), 'duration': self.stats['duration_human'] }, 'successful_tables': [ {'name': t['name'], 'rows': t['row_count']} for t in self.stats['copied_tables'] ], 'failed_tables': [ {'name': t['name'], 'error': t['error']} for t in self.stats['failed_tables'] ], 'schema_changes': self.stats['schema_changes'], 'errors': self.stats['errors'] } # Сохраняем отчет в JSON report_file = Path(settings.LOG_DIR) / f"migration_report_{self.timestamp}.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2, default=str) return report def get_log_content(self) -> str: """Получение содержимого лог-файла""" try: with open(self.log_file, 'r', encoding='utf-8') as f: return f.read() except Exception as e: return f"Ошибка при чтении лог-файла: {e}" # Глобальный экземпляр логгера migration_logger = MigrationLogger()