commit c201d36ae63e887ac3432476d1f0d7ce4be19173 Author: brusnitsyn Date: Sun Mar 8 20:21:15 2026 +0900 first commit diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..c6acd2a --- /dev/null +++ b/.env.example @@ -0,0 +1,25 @@ +# MSSQL +MSSQL_SERVER= +MSSQL_DATABASE= +MSSQL_USERNAME= +MSSQL_PASSWORD= + +# PostgreSQL +POSTGRES_HOST= +POSTGRES_PORT= +POSTGRES_DATABASE= +POSTGRES_USERNAME= +POSTGRES_PASSWORD= + +# Email +EMAIL_HOST= +EMAIL_PORT= +EMAIL_USER= +EMAIL_PASSWORD= +EMAIL_FROM= +EMAIL_TO= + +# Migration +CHUNK_SIZE=5000 +BATCH_SIZE=10 +DEBUG=False \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c5ba498 --- /dev/null +++ b/.gitignore @@ -0,0 +1,177 @@ +# ---> Python +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +.vscode/ \ No newline at end of file diff --git a/app/api/routes.py b/app/api/routes.py new file mode 100644 index 0000000..986c368 --- /dev/null +++ b/app/api/routes.py @@ -0,0 +1,421 @@ +from datetime import datetime +from typing import Optional +from fastapi import APIRouter, HTTPException, BackgroundTasks, Query +from app.models.replication import ReplicationMetadata +from app.repository.replication_metadata_repo import replication_metadata_repo +from app.services.scheduler import scheduler +from app.services.migrator import migrator +from app.services.replication_state import replication_state +from app.core.logging import migration_logger +from app.core.config import settings +from app.utils.email_sender import email_sender +from app.taskiq.broker import refresh_schedules + +router = APIRouter(prefix="/api/v1") + + +@router.post("/migrate/start") +async def start_migration(background_tasks: BackgroundTasks, full_reload: bool = False): + """Запуск миграции""" + if migrator.is_running: + raise HTTPException(status_code=400, detail="Миграция уже выполняется") + + background_tasks.add_task(run_migration_task, full_reload) + return {"message": "Миграция запущена", "full_reload": full_reload} + + +@router.post("/migrate/stop") +async def stop_migration(): + migrator.stop_migration() + return {"message": "Миграция останавливается"} + + +@router.get("/migrate/status") +async def get_status(): + return migrator.get_status() + +@router.get("/replication/last") +async def get_last_replication(): + """Получить информацию о последней репликации (максимальное время по всем таблицам)""" + return replication_state.get_last_replication_info() + + +@router.get("/replication/tables") +async def get_tables_status(): + """Получить статус всех таблиц (из replication_metadata)""" + stats = replication_state.get_all_stats() + + # Форматируем для API + result = [] + for table in stats['tables']: + result.append({ + "table": table['name'], + "last_id": table['last_id'], + "rows_count": table['rows'], + "last_sync": table['last_sync'].isoformat() if table['last_sync'] else None, + "active": table['active'] + }) + + return { + "total_rows": stats['total_rows'], + "tables_count": stats['tables_count'], + "active_tables": stats['active_tables'], + "tables": result + } + + +@router.get("/replication/tables/{table_name}") +async def get_table_status(table_name: str): + """Получить статус конкретной таблицы""" + from app.repository.replication_metadata_repo import replication_metadata_repo + + metadata = replication_metadata_repo.get_table_metadata(table_name) + if not metadata: + raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена") + + return { + "table": metadata.table_name, + "last_id": metadata.last_id, + "last_sync_time": metadata.last_sync_time.isoformat() if metadata.last_sync_time else None, + "total_rows": metadata.total_rows, + "is_active": metadata.is_active, + "created_at": metadata.created_at.isoformat() if metadata.created_at else None, + "updated_at": metadata.updated_at.isoformat() if metadata.updated_at else None, + "last_error": metadata.last_error + } + + +@router.post("/replication/tables/{table_name}/reset") +async def reset_table(table_name: str): + """Сбросить состояние таблицы (обнулить last_id и last_sync_time)""" + from app.repository.replication_metadata_repo import replication_metadata_repo + + session = replication_metadata_repo.get_session() + try: + metadata = session.query(ReplicationMetadata).filter_by(table_name=table_name).first() + if metadata: + metadata.last_id = 0 + metadata.last_sync_time = datetime(1900, 1, 1) + metadata.total_rows = 0 + metadata.last_error = None + metadata.updated_at = datetime.now() + session.commit() + + migration_logger.info(f"Сброшено состояние таблицы {table_name}") + return {"message": f"Состояние таблицы {table_name} сброшено"} + else: + raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена") + finally: + session.close() + + +@router.get("/replication/logs") +async def get_replication_logs( + table_name: Optional[str] = None, + limit: int = Query(100, ge=1, le=1000), + status: Optional[str] = None +): + """Получить логи репликации""" + from app.repository.replication_metadata_repo import replication_metadata_repo + from app.models.replication import ReplicationLog + + session = replication_metadata_repo.get_session() + try: + query = session.query(ReplicationLog).order_by(ReplicationLog.created_at.desc()) + + if table_name: + query = query.filter(ReplicationLog.table_name == table_name) + + if status: + query = query.filter(ReplicationLog.status == status.upper()) + + logs = query.limit(limit).all() + + return [ + { + "id": log.id, + "table_name": log.table_name, + "operation": log.operation, + "records_count": log.records_count, + "status": log.status, + "error_message": log.error_message, + "created_at": log.created_at.isoformat() + } + for log in logs + ] + finally: + session.close() + +@router.post("/test-email") +async def test_email(): + """Тест отправки email""" + success = email_sender.send_email( + subject="Тестовое письмо", + body=f"Это тестовое письмо от Migration Service\n\nВремя: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) + if success: + return {"message": "Тестовое письмо отправлено"} + else: + raise HTTPException(status_code=500, detail="Ошибка отправки письма") + + +def run_migration_task(full_reload: bool): + try: + migrator.run_migration(full_reload=full_reload) + except Exception as e: + migration_logger.error("Ошибка в фоновой задаче", e) + + +# ==================== РАСПИСАНИЯ ==================== + +@router.get("/schedules") +async def get_schedules(): + """Получить все расписания миграций""" + schedules = replication_metadata_repo.get_all_schedules() + return [s.to_dict() for s in schedules] + + +@router.get("/schedules/next-runs") +async def get_next_runs(limit: int = 10): + """Получить следующие запуски""" + from datetime import timedelta + + now = datetime.now() + runs = [] + + # Получаем все расписания + schedules = replication_metadata_repo.get_all_schedules() + + for minute_offset in range(60 * 24 * 7): # Проверяем на неделю вперед + check_time = now + timedelta(minutes=minute_offset) + check_time_obj = check_time.time() + check_weekday = check_time.weekday() + + for schedule in schedules: + if not schedule.enabled: + continue + + # Проверяем совпадение времени и дня + time_diff = abs( + (schedule.schedule_time.hour * 60 + schedule.schedule_time.minute) - + (check_time_obj.hour * 60 + check_time_obj.minute) + ) + + if time_diff <= 1 and check_weekday in schedule.days_list: + # Получаем статистику таблицы + metadata = replication_metadata_repo.get_table_metadata(schedule.table_name) + + runs.append({ + 'table': schedule.table_name, + 'time': check_time.strftime('%Y-%m-%d %H:%M'), + 'day': check_time.strftime('%A'), + 'days_schedule': schedule.days_display, + 'full_reload': schedule.full_reload, + 'rows_count': metadata.total_rows if metadata else 0, + 'last_sync': metadata.last_sync_time.isoformat() if metadata and metadata.last_sync_time else None + }) + + if len(runs) >= limit: + break + + if len(runs) >= limit: + break + + return runs[:limit] + + +@router.post("/schedules/run-now") +async def run_scheduled_now(background_tasks: BackgroundTasks): + """Принудительно запустить все запланированные на текущее время миграции""" + due = scheduler.get_due_tables() + if not due: + return {'message': 'Нет таблиц для миграции в текущее время и день'} + + for schedule in due: + background_tasks.add_task( + run_scheduled_migration, + schedule.table_name, + schedule.full_reload + ) + + return { + 'message': f'Запущено {len(due)} миграций', + 'tables': [ + { + 'name': s.table_name, + 'time': s.schedule_time.strftime("%H:%M"), + 'days': s.days_display, + 'full_reload': s.full_reload + } + for s in due + ] + } + + +@router.post("/schedules/{table_name}") +async def set_schedule( + table_name: str, + schedule_time: str = Query("00:00", description="Время в формате HH:MM"), + days: Optional[str] = Query(None, description="Дни недели через запятую: пн,вт,ср,чт,пт,сб,вс"), + full_reload: bool = Query(False, description="Полная перезагрузка"), + enabled: bool = Query(True, description="Включено"), + name: Optional[str] = Query(None, description="Название расписания"), + description: Optional[str] = Query(None, description="Описание") +): + """Добавить новое расписание для таблицы""" + try: + if table_name not in settings.TABLES_TO_COPY: + raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена") + + days_list = None + if days: + days_list = [d.strip() for d in days.split(',')] + + from app.repository.replication_metadata_repo import replication_metadata_repo + + schedule = replication_metadata_repo.add_schedule( + table_name=table_name, + schedule_time=schedule_time, + days=days_list, + full_reload=full_reload, + enabled=enabled, + name=name, + description=description + ) + + if schedule: + await refresh_schedules() + return { + "message": f"Расписание добавлено для {table_name} в {schedule_time}", + "schedule": schedule + } + else: + raise HTTPException(status_code=500, detail="Ошибка добавления расписания") + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + +@router.get("/schedules/{table_name}") +async def get_table_schedule(table_name: str): + """Получить расписание для конкретной таблицы""" + schedule = replication_metadata_repo.get_schedule(table_name) + if not schedule: + raise HTTPException(status_code=404, detail="Расписание не найдено") + + # Получаем статистику таблицы + metadata = replication_metadata_repo.get_table_metadata(table_name) + + result = schedule.to_dict() + if metadata: + result['table_stats'] = { + 'rows_count': metadata.total_rows, + 'last_sync': metadata.last_sync_time.isoformat() if metadata.last_sync_time else None, + 'last_id': metadata.last_id + } + + return result + +@router.put("/schedules/{schedule_id}") +async def update_schedule( + schedule_id: int, + schedule_time: Optional[str] = Query(None, description="Время в формате HH:MM"), + days: Optional[str] = Query(None, description="Дни недели через запятую"), + full_reload: Optional[bool] = Query(None, description="Полная перезагрузка"), + enabled: Optional[bool] = Query(None, description="Включено"), + name: Optional[str] = Query(None, description="Название"), + description: Optional[str] = Query(None, description="Описание") +): + """Обновить существующее расписание по ID""" + from app.repository.replication_metadata_repo import replication_metadata_repo + + update_kwargs = {} + if schedule_time: + update_kwargs['schedule_time'] = schedule_time + if days: + update_kwargs['days'] = [d.strip() for d in days.split(',')] + if full_reload is not None: + update_kwargs['full_reload'] = full_reload + if enabled is not None: + update_kwargs['enabled'] = enabled + if name: + update_kwargs['name'] = name + if description: + update_kwargs['description'] = description + + success = replication_metadata_repo.update_schedule(schedule_id, **update_kwargs) + + if success: + await refresh_schedules() + return {"message": f"Расписание {schedule_id} обновлено"} + else: + raise HTTPException(status_code=404, detail=f"Расписание {schedule_id} не найдено") + +@router.post("/schedules/{table_name}/disable") +async def disable_schedule(table_name: str): + """Отключить расписание""" + success = replication_metadata_repo.disable_schedule(table_name) + if success: + await refresh_schedules() + return {'message': f'Расписание для {table_name} отключено'} + else: + raise HTTPException(status_code=404, detail=f"Расписание для {table_name} не найдено") + + +@router.post("/schedules/{table_name}/enable") +async def enable_schedule(table_name: str): + """Включить расписание""" + success = replication_metadata_repo.enable_schedule(table_name) + if success: + await refresh_schedules() + return {'message': f'Расписание для {table_name} включено'} + else: + raise HTTPException(status_code=404, detail=f"Расписание для {table_name} не найдено") + +# ==================== Фоновые задачи ==================== + +def run_migration_task(full_reload: bool): + """Фоновая задача для миграции всех таблиц""" + try: + migrator.run_migration(full_reload=full_reload) + except Exception as e: + migration_logger.error(f"Ошибка в фоновой задаче: {e}") + migration_logger.exception(e) + + +def run_scheduled_migration(table_name: str, full_reload: bool): + """Фоновая задача для запланированной миграции одной таблицы""" + try: + migration_logger.info(f"Запуск запланированной миграции для {table_name}") + + migrator.run_migration( + tables=[table_name], + full_reload=full_reload, + send_email=True + ) + + # Обновляем время последнего запуска в расписании + replication_metadata_repo.update_schedule_last_run(table_name) + + # Логируем успешный запуск + replication_metadata_repo.log_operation( + table_name=table_name, + operation='SCHEDULED', + records_count=0, + status='SUCCESS' + ) + + migration_logger.info(f"Запланированная миграция для {table_name} завершена") + + except Exception as e: + error_msg = f"Ошибка в запланированной миграции для {table_name}: {e}" + migration_logger.error(error_msg) + migration_logger.exception(e) + + # Логируем ошибку + replication_metadata_repo.log_operation( + table_name=table_name, + operation='SCHEDULED', + records_count=0, + status='ERROR', + error_message=str(e)[:500] + ) \ No newline at end of file diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..c7d2121 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,85 @@ +import logging +import os +from typing import List, Literal, Optional, Dict +from urllib.parse import quote_plus +from pydantic_settings import BaseSettings +from pydantic import Field +from dotenv import load_dotenv + +load_dotenv() + + +class Settings(BaseSettings): + """Конфигурация приложения из переменных окружения""" + + # Настройки MSSQL + MSSQL_SERVER: str = Field(..., env='MSSQL_SERVER') + MSSQL_DATABASE: str = Field(..., env='MSSQL_DATABASE') + MSSQL_USERNAME: str = Field(..., env='MSSQL_USERNAME') + MSSQL_PASSWORD: str = Field(..., env='MSSQL_PASSWORD') + MSSQL_DRIVER: str = 'pymssql' + + # Настройки PostgreSQL + POSTGRES_HOST: str = Field(..., env='POSTGRES_HOST') + POSTGRES_PORT: int = Field(5432, env='POSTGRES_PORT') + POSTGRES_DATABASE: str = Field(..., env='POSTGRES_DATABASE') + POSTGRES_USERNAME: str = Field(..., env='POSTGRES_USERNAME') + POSTGRES_PASSWORD: str = Field(..., env='POSTGRES_PASSWORD') + + # Настройки email + EMAIL_HOST: str = Field(..., env='EMAIL_HOST') + EMAIL_PORT: int = Field(465, env='EMAIL_PORT') + EMAIL_USER: str = Field(..., env='EMAIL_USER') + EMAIL_PASSWORD: str = Field(..., env='EMAIL_PASSWORD') + EMAIL_FROM: str = Field(..., env='EMAIL_FROM') + EMAIL_TO: List[str] = Field(default_factory=lambda: ['andrew.brusnitsyn@gmail.com']) + EMAIL_SUBJECT: str = 'Результат миграции данных MSSQL → PostgreSQL' + + # Настройки миграции + CHUNK_SIZE: int = Field(1000, env='CHUNK_SIZE') + BATCH_SIZE: int = Field(10, env='BATCH_SIZE') + TABLES_TO_COPY: List[str] = Field( + default_factory=lambda: ['oms_Department'] + ) + LIFE_TABLES: List[str] = Field( + default_factory=lambda: ['oms_Department'] + ) + + # Колонки дат для инкрементальной загрузки + DEFAULT_DATE_COLUMNS: List[str] = Field( + default_factory=lambda: [ + 'DateExtract', 'DateDirection', 'DateRecipient', 'CreateDate', + 'UpdateDate', 'ModifiedDate', 'ChangeDate', 'LastModified' + ] + ) + + # Файлы состояния + LAST_REPLICATION_FILE: str = 'last_replication.json' + LOG_DIR: str = 'logs' + + # Настройки API + API_V1_PREFIX: str = '/api/v1' + DEBUG: bool = Field(False, env='DEBUG') + + @property + def MSSQL_CONNECTION_STRING(self) -> str: + """Формирование строки подключения к MSSQL""" + # Для pymssql с Windows аутентификацией используем формат: + # mssql+pymssql://domain\\username:password@server:port/database + # ИЛИ для trusted connection: + # mssql+pymssql://username:password@server:port/database?charset=utf8 + + return rf'mssql+{self.MSSQL_DRIVER}://{self.MSSQL_USERNAME}:{self.MSSQL_PASSWORD}@{self.MSSQL_SERVER}/{self.MSSQL_DATABASE}' + + @property + def POSTGRES_CONNECTION_STRING(self) -> str: + """Формирование строки подключения к PostgreSQL""" + return rf'postgresql://{self.POSTGRES_USERNAME}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DATABASE}' + + class Config: + env_file = '.env' + case_sensitive = False + + +# Глобальный экземпляр настроек +settings = Settings() \ No newline at end of file diff --git a/app/core/database.py b/app/core/database.py new file mode 100644 index 0000000..70d5cb5 --- /dev/null +++ b/app/core/database.py @@ -0,0 +1,68 @@ +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine +from sqlalchemy.orm import sessionmaker +from contextlib import contextmanager +from typing import Optional +from app.core.config import settings + + +class DatabaseConnector: + """Управление подключениями к базам данных""" + + def __init__(self): + self._src_engine: Optional[Engine] = None + self._dst_engine: Optional[Engine] = None + self.dst_session = None + self.schedule_session = None + + @property + def src_engine(self) -> Engine: + """Подключение к MSSQL""" + if not self._src_engine: + self._src_engine = create_engine( + settings.MSSQL_CONNECTION_STRING, + pool_pre_ping=True, + echo=settings.DEBUG + ) + return self._src_engine + + @property + def dst_engine(self) -> Engine: + """Подключение к PostgreSQL (основная БД)""" + if not self._dst_engine: + self._dst_engine = create_engine( + settings.POSTGRES_CONNECTION_STRING, + pool_pre_ping=True, + echo=settings.DEBUG + ) + self.dst_session = sessionmaker(bind=self._dst_engine) + return self._dst_engine + + @contextmanager + def src_connection(self): + """Контекстный менеджер для MSSQL соединения""" + conn = self.src_engine.connect() + try: + yield conn + finally: + conn.close() + + @contextmanager + def dst_connection(self): + """Контекстный менеджер для PostgreSQL соединения""" + conn = self.dst_engine.connect() + try: + yield conn + finally: + conn.close() + + def dispose_engines(self): + """Закрытие всех соединений""" + if self._src_engine: + self._src_engine.dispose() + if self._dst_engine: + self._dst_engine.dispose() + + +# Глобальный экземпляр подключений +db_connector = DatabaseConnector() \ No newline at end of file diff --git a/app/core/logging.py b/app/core/logging.py new file mode 100644 index 0000000..964d8c3 --- /dev/null +++ b/app/core/logging.py @@ -0,0 +1,159 @@ +import logging +import os +import json +import traceback +from datetime import datetime +from typing import Dict, Any, Optional, List +from pathlib import Path +from app.core.config import settings + + +class MigrationLogger: + """Класс для логирования процесса миграции""" + + def __init__(self): + self.start_time = datetime.now() + self.timestamp = self.start_time.strftime("%Y%m%d_%H%M%S") + + # Создаем директорию для логов + log_dir = Path(settings.LOG_DIR) + log_dir.mkdir(exist_ok=True) + + self.log_file = log_dir / f"migration_log_{self.timestamp}.log" + self.stats = { + 'total_tables': len(settings.TABLES_TO_COPY), + 'copied_tables': [], + 'failed_tables': [], + 'schema_changes': [], + 'total_rows': 0, + 'start_time': self.start_time, + 'end_time': None, + 'errors': [] + } + + # Настраиваем logging + self._setup_logging() + + def _setup_logging(self): + """Настройка системы логирования""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(self.log_file, encoding='utf-8'), + logging.StreamHandler() + ] + ) + self.logger = logging.getLogger(__name__) + + def info(self, message: str): + """Логирование информационного сообщения""" + self.logger.info(message) + + def error(self, message: str, exception: Optional[Exception] = None): + """Логирование ошибки""" + error_details = { + 'message': message, + 'exception': str(exception) if exception else None, + 'traceback': traceback.format_exc() if exception else None, + 'timestamp': datetime.now().isoformat() + } + self.stats['errors'].append(error_details) + self.logger.error(f"{message}. Ошибка: {exception}") + + def warning(self, message: str): + """Логирование предупреждения""" + self.logger.warning(message) + + def debug(self, message: str): + """Логирование отладочной информации""" + self.logger.debug(message) + + def table_start(self, table_name: str): + """Логирование начала обработки таблицы""" + self.logger.info(f"{'='*60}") + self.logger.info(f"Начало обработки таблицы: {table_name}") + self.logger.info(f"{'='*60}") + + def table_success(self, table_name: str, row_count: int): + """Логирование успешного копирования таблицы""" + self.stats['copied_tables'].append({ + 'name': table_name, + 'row_count': row_count, + 'timestamp': datetime.now().isoformat() + }) + self.stats['total_rows'] += row_count + self.logger.info(f"Таблица {table_name} успешно скопирована ({row_count} строк)") + + def table_failure(self, table_name: str, error: str): + """Логирование ошибки при копировании таблицы""" + self.stats['failed_tables'].append({ + 'name': table_name, + 'error': error, + 'timestamp': datetime.now().isoformat() + }) + self.logger.error(f"Ошибка при копировании таблицы {table_name}: {error}") + + def progress(self, table_name: str, chunk_num: int, total_rows: int): + """Логирование прогресса загрузки""" + self.logger.info(f"Таблица {table_name}: загружено чанков {chunk_num}, строк {total_rows}") + + def schema_change(self, table_name: str, new_columns: List[str]): + """Логирование изменения схемы""" + self.stats['schema_changes'].append({ + 'table': table_name, + 'new_columns': new_columns, + 'timestamp': datetime.now().isoformat() + }) + self.logger.info(f"В таблице {table_name} обнаружены новые колонки: {new_columns}") + + def generate_report(self) -> Dict[str, Any]: + """Генерация итогового отчета""" + self.stats['end_time'] = datetime.now() + duration = self.stats['end_time'] - self.stats['start_time'] + self.stats['duration_seconds'] = duration.total_seconds() + self.stats['duration_human'] = str(duration) + + report = { + 'summary': { + 'total_tables': self.stats['total_tables'], + 'successful_tables': len(self.stats['copied_tables']), + 'failed_tables': len(self.stats['failed_tables']), + 'schema_changes': len(self.stats['schema_changes']), + 'success_rate': (len(self.stats['copied_tables']) / self.stats['total_tables'] * 100) + if self.stats['total_tables'] > 0 else 0, + 'total_rows': self.stats['total_rows'], + 'start_time': self.stats['start_time'].isoformat(), + 'end_time': self.stats['end_time'].isoformat(), + 'duration': self.stats['duration_human'] + }, + 'successful_tables': [ + {'name': t['name'], 'rows': t['row_count']} + for t in self.stats['copied_tables'] + ], + 'failed_tables': [ + {'name': t['name'], 'error': t['error']} + for t in self.stats['failed_tables'] + ], + 'schema_changes': self.stats['schema_changes'], + 'errors': self.stats['errors'] + } + + # Сохраняем отчет в JSON + report_file = Path(settings.LOG_DIR) / f"migration_report_{self.timestamp}.json" + with open(report_file, 'w', encoding='utf-8') as f: + json.dump(report, f, ensure_ascii=False, indent=2, default=str) + + return report + + def get_log_content(self) -> str: + """Получение содержимого лог-файла""" + try: + with open(self.log_file, 'r', encoding='utf-8') as f: + return f.read() + except Exception as e: + return f"Ошибка при чтении лог-файла: {e}" + + +# Глобальный экземпляр логгера +migration_logger = MigrationLogger() \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..542e7a8 --- /dev/null +++ b/app/main.py @@ -0,0 +1,28 @@ +from datetime import datetime, time +import os +from fastapi import FastAPI +from contextlib import asynccontextmanager +from app.api.routes import router +from app.core.database import db_connector +from app.core.logging import migration_logger +from app.services.replication_state import replication_state + + +@asynccontextmanager +async def lifespan(app: FastAPI): + migration_logger.info("Запуск приложения") + yield + migration_logger.info("Завершение работы") + db_connector.dispose_engines() + + +app = FastAPI(title="Сервис репликации", lifespan=lifespan) +app.include_router(router) + +@app.get("/") +async def root(): + return { + "service": "Сервис репликации", + "status": "онлайн", + "last_replication": replication_state.get_last_replication_info() + } \ No newline at end of file diff --git a/app/models/migration.py b/app/models/migration.py new file mode 100644 index 0000000..af3d087 --- /dev/null +++ b/app/models/migration.py @@ -0,0 +1,68 @@ +from pydantic import BaseModel, Field +from datetime import datetime +from typing import List, Dict, Any, Optional + + +class MigrationStatus(BaseModel): + """Статус миграции""" + is_running: bool = False + current_table: Optional[str] = None + progress: float = 0.0 + start_time: Optional[datetime] = None + estimated_completion: Optional[datetime] = None + + +class TableInfo(BaseModel): + """Информация о таблице""" + name: str + row_count: int + last_replication: Optional[datetime] = None + columns: List[str] = Field(default_factory=list) + has_pk: bool = False + has_date_column: bool = False + + +class MigrationRequest(BaseModel): + """Запрос на миграцию""" + tables: Optional[List[str]] = None + full_reload: bool = False + send_email: bool = True + parallel_workers: int = 1 + + +class MigrationResponse(BaseModel): + """Ответ на запрос миграции""" + success: bool + message: str + task_id: Optional[str] = None + start_time: datetime = Field(default_factory=datetime.now) + + +class MigrationReport(BaseModel): + """Отчет о миграции""" + summary: Dict[str, Any] + successful_tables: List[Dict[str, Any]] + failed_tables: List[Dict[str, Any]] + schema_changes: List[Dict[str, Any]] + errors: List[Dict[str, Any]] + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + + +class ColumnInfo(BaseModel): + """Информация о колонке""" + name: str + type: str + nullable: bool + max_length: Optional[int] = None + default: Optional[str] = None + + +class SchemaDiff(BaseModel): + """Различия в схемах""" + new_columns: List[ColumnInfo] = Field(default_factory=list) + missing_columns: List[str] = Field(default_factory=list) + type_mismatches: List[Dict[str, str]] = Field(default_factory=list) \ No newline at end of file diff --git a/app/models/replication.py b/app/models/replication.py new file mode 100644 index 0000000..2d3c22c --- /dev/null +++ b/app/models/replication.py @@ -0,0 +1,187 @@ +# app/models/replication.py + +from typing import List, Optional +from sqlalchemy import JSON, Column, ForeignKey, String, DateTime, BigInteger, Integer, Boolean, Time +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship +from datetime import datetime + +Base = declarative_base() + + +class ReplicationMetadata(Base): + """Модель для хранения метаданных репликации""" + + __tablename__ = 'replication_metadata' + + table_name = Column(String(100), primary_key=True, nullable=False) + last_sync_time = Column(DateTime, nullable=True) + last_id = Column(BigInteger, nullable=True) + total_rows = Column(BigInteger, default=0) + last_error = Column(String(500), nullable=True) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + schedule = relationship("ReplicationSchedule", back_populates="table", cascade="all, delete-orphan") + + def __repr__(self): + return f"" + + @classmethod + def create_if_not_exists(cls, session, table_name: str): + """Создает запись метаданных, если её нет""" + record = session.query(cls).filter_by(table_name=table_name).first() + if not record: + record = cls( + table_name=table_name, + last_sync_time=datetime(1900, 1, 1), # Начало времен + last_id=0, + total_rows=0 + ) + session.add(record) + session.commit() + return record + + def update_sync_time(self, session): + """Обновляет время последней синхронизации""" + self.last_sync_time = datetime.now() + self.updated_at = datetime.now() + session.commit() + + def update_last_id(self, session, last_id: int): + """Обновляет последний обработанный ID""" + self.last_id = last_id + self.updated_at = datetime.now() + session.commit() + + def increment_total_rows(self, session, count: int = 1): + """Увеличивает счетчик общего количества строк""" + self.total_rows += count + self.updated_at = datetime.now() + session.commit() + +class ReplicationSchedule(Base): + """Модель для расписания миграции таблицы""" + + __tablename__ = 'replication_schedules' + + id = Column(Integer, primary_key=True, autoincrement=True) + table_name = Column(String(100), ForeignKey('replication_metadata.table_name', ondelete='CASCADE'), nullable=False) + schedule_time = Column(Time, nullable=False, default=datetime.strptime("00:00", "%H:%M").time()) + days = Column(JSON, nullable=False, default=list) # Храним список дней как JSON + full_reload = Column(Boolean, default=False) + enabled = Column(Boolean, default=True) + last_run = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + name = Column(String(100), nullable=True) + description = Column(String(500), nullable=True) + + # Связь с метаданными + table = relationship("ReplicationMetadata", back_populates="schedule") + + # Маппинг дней для обратной совместимости + DAYS_MAP = { + 'monday': 0, 'mon': 0, 'пн': 0, 'понедельник': 0, + 'tuesday': 1, 'tue': 1, 'вт': 1, 'вторник': 1, + 'wednesday': 2, 'wed': 2, 'ср': 2, 'среда': 2, + 'thursday': 3, 'thu': 3, 'чт': 3, 'четверг': 3, + 'friday': 4, 'fri': 4, 'пт': 4, 'пятница': 4, + 'saturday': 5, 'sat': 5, 'сб': 5, 'суббота': 5, + 'sunday': 6, 'sun': 6, 'вс': 6, 'воскресенье': 6 + } + + @property + def days_list(self) -> List[int]: + """Получить список дней как числа (0-6)""" + if not self.days: + return list(range(7)) # Все дни по умолчанию + + # Преобразуем названия в числа + result = [] + for day in self.days: + day_lower = day.lower().strip() + if day_lower in self.DAYS_MAP: + result.append(self.DAYS_MAP[day_lower]) + elif isinstance(day, int) and 0 <= day <= 6: + result.append(day) + return result if result else list(range(7)) + + @property + def days_display(self) -> List[str]: + """Получить отображаемые названия дней""" + reverse_map = {v: k for k, v in self.DAYS_MAP.items()} + days_list = [] + for day_num in sorted(self.days_list): + # Ищем русское название + for name, num in self.DAYS_MAP.items(): + if num == day_num and name in ['пн', 'вт', 'ср', 'чт', 'пт', 'сб', 'вс']: + days_list.append(name) + break + else: + days_list.append(str(day_num)) + return days_list + + def should_run_today(self, check_date: Optional[datetime] = None) -> bool: + """Проверить, должна ли таблица запускаться сегодня""" + if check_date is None: + check_date = datetime.now() + + today = check_date.weekday() + return today in self.days_list + + def to_dict(self) -> dict: + """Безопасное преобразование в словарь (работает и с отсоединенными объектами)""" + # Используем __dict__, но исключаем служебные поля SQLAlchemy + data = { + 'id': self.id, + 'table_name': self.table_name, + 'schedule_time': self.schedule_time.strftime("%H:%M") if self.schedule_time else "00:00", + 'days': self.days, # Сохраняем как есть, а days_display вычислим отдельно + 'full_reload': self.full_reload, + 'enabled': self.enabled, + 'last_run': self.last_run.isoformat() if self.last_run else None, + 'name': self.name, + 'description': self.description, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'updated_at': self.updated_at.isoformat() if self.updated_at else None + } + + # Вычисляем days_display на основе days + if hasattr(self, 'days') and self.days: + data['days_display'] = self._days_to_str_from_list(self.days) + else: + data['days_display'] = [] + + return data + + def _days_to_str_from_list(self, days_list): + """Вычисляет отображаемые дни из списка""" + result = [] + for day_num in sorted(days_list if days_list else []): + for name, num in self.DAYS_MAP.items(): + if num == day_num and name in ['пн', 'вт', 'ср', 'чт', 'пт', 'сб', 'вс']: + result.append(name) + break + return result + + def __repr__(self): + return f"" + +class ReplicationLog(Base): + """Модель для логирования операций репликации""" + + __tablename__ = 'replication_logs' + + id = Column(Integer, primary_key=True, autoincrement=True) + table_name = Column(String(100), nullable=False) + operation = Column(String(20), nullable=False) # INSERT, UPDATE, DELETE + records_count = Column(Integer, default=0) + status = Column(String(20), nullable=False) # SUCCESS, ERROR + error_message = Column(String(500), nullable=True) + created_at = Column(DateTime, default=datetime.now) + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/app/models/schemas.py b/app/models/schemas.py new file mode 100644 index 0000000..f728d46 --- /dev/null +++ b/app/models/schemas.py @@ -0,0 +1,42 @@ +from pydantic import BaseModel, ConfigDict +from typing import Optional, Any +from datetime import datetime + + +class MedicalHistoryBase(BaseModel): + """Базовая схема для stt_medicalhistory""" + Address: Optional[str] = None + Allergy: Optional[str] = None + BD: Optional[datetime] = None + BloodRhGroupCheked: Optional[bool] = None + CauseDeath: Optional[str] = None + DateDirection: Optional[datetime] = None + DateExtract: Optional[datetime] = None + DateRecipient: Optional[datetime] = None + DateRecipientHS: Optional[datetime] = None + DurationHosp: Optional[int] = None + FAMILY: Optional[str] = None + Flag: Optional[int] = None + GestationalAge: Optional[int] = None + InspectedAIDS: Optional[int] = None + InspectedRW: Optional[int] = None + isWorker: Optional[bool] = None + LiveAddress: Optional[str] = None + MedCardNum: Optional[str] = None + MedicalHistoryID: int + # ... остальные поля + + +class MedicalHistoryCreate(MedicalHistoryBase): + """Схема для создания записи""" + pass + + +class MedicalHistoryUpdate(MedicalHistoryBase): + """Схема для обновления записи""" + pass + + +class MedicalHistoryInDB(MedicalHistoryBase): + """Схема для записи из БД""" + model_config = ConfigDict(from_attributes=True) \ No newline at end of file diff --git a/app/repository/replication_metadata_repo.py b/app/repository/replication_metadata_repo.py new file mode 100644 index 0000000..ed18fa8 --- /dev/null +++ b/app/repository/replication_metadata_repo.py @@ -0,0 +1,461 @@ +# app/services/replication_metadata_repo.py + +from typing import Optional, List +from datetime import datetime +from sqlalchemy.orm import Session +from app.core.database import db_connector +from app.models.replication import ReplicationMetadata, ReplicationLog, ReplicationSchedule +from app.core.logging import migration_logger + + +class ReplicationMetadataRepo: + """Репозиторий для работы с метаданными репликации""" + + def __init__(self): + self.engine = db_connector.dst_engine + self.SessionLocal = db_connector.dst_session + + def init_metadata_table(self): + """Создает таблицу метаданных, если её нет""" + from app.models.replication import Base + Base.metadata.create_all(self.engine) + migration_logger.info("Таблица replication_metadata создана/проверена") + + def get_table_metadata(self, table_name: str) -> Optional[ReplicationMetadata]: + """Получает метаданные для таблицы""" + session = self.SessionLocal() + try: + metadata = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + # Если нет, создаем + if not metadata: + metadata = ReplicationMetadata.create_if_not_exists(session, table_name) + + return metadata + finally: + session.close() + + def get_last_sync_time(self, table_name: str) -> Optional[datetime]: + """Получает время последней синхронизации таблицы""" + metadata = self.get_table_metadata(table_name) + return metadata.last_sync_time if metadata else None + + def get_last_id(self, table_name: str) -> Optional[int]: + """Получает последний обработанный ID для таблицы""" + metadata = self.get_table_metadata(table_name) + return metadata.last_id if metadata else None + + def update_sync_time(self, table_name: str) -> bool: + """Обновляет время синхронизации таблицы""" + session = self.SessionLocal() + try: + metadata = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + if not metadata: + metadata = ReplicationMetadata( + table_name=table_name, + last_sync_time=datetime.now(), + last_id=0 + ) + session.add(metadata) + else: + metadata.last_sync_time = datetime.now() + metadata.updated_at = datetime.now() + + session.commit() + migration_logger.debug(f"Updated sync time for {table_name} to {metadata.last_sync_time}") + return True + except Exception as e: + session.rollback() + migration_logger.error(f"Error updating sync time for {table_name}: {e}") + return False + finally: + session.close() + + def update_last_id(self, table_name: str, last_id: int) -> bool: + """Обновляет последний обработанный ID""" + session = self.SessionLocal() + try: + metadata = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + if not metadata: + metadata = ReplicationMetadata( + table_name=table_name, + last_sync_time=datetime.now(), + last_id=last_id + ) + session.add(metadata) + else: + metadata.last_id = last_id + metadata.updated_at = datetime.now() + + session.commit() + migration_logger.debug(f"Updated last_id for {table_name} to {last_id}") + return True + except Exception as e: + session.rollback() + migration_logger.error(f"Error updating last_id for {table_name}: {e}") + return False + finally: + session.close() + + def update_table_stats(self, table_name: str, added_rows: int) -> bool: + """Обновляет статистику таблицы""" + session = self.SessionLocal() + try: + metadata = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + if not metadata: + metadata = ReplicationMetadata( + table_name=table_name, + last_sync_time=datetime.now(), + total_rows=added_rows + ) + session.add(metadata) + else: + metadata.total_rows += added_rows + metadata.updated_at = datetime.now() + + session.commit() + return True + except Exception as e: + session.rollback() + migration_logger.error(f"Error updating stats for {table_name}: {e}") + return False + finally: + session.close() + + def log_operation(self, table_name: str, operation: str, records_count: int, + status: str = "SUCCESS", error_message: Optional[str] = None): + """Логирует операцию репликации""" + session = self.SessionLocal() + try: + log = ReplicationLog( + table_name=table_name, + operation=operation, + records_count=records_count, + status=status, + error_message=error_message + ) + session.add(log) + session.commit() + except Exception as e: + session.rollback() + migration_logger.error(f"Error logging operation: {e}") + finally: + session.close() + + def get_all_stats(self) -> dict: + """Получает статистику по всем таблицам""" + session = self.SessionLocal() + try: + metadata_list = session.query(ReplicationMetadata).all() + + # Обрабатываем None значения в total_rows + total_rows = sum(m.total_rows for m in metadata_list if m.total_rows is not None) + active_tables = sum(1 for m in metadata_list if m.is_active) + + return { + 'total_rows': total_rows, + 'tables_count': len(metadata_list), + 'active_tables': active_tables, + 'tables': [ + { + 'name': m.table_name, + 'last_sync': m.last_sync_time, + 'last_id': m.last_id, + 'rows': m.total_rows, + 'active': m.is_active + } + for m in metadata_list + ] + } + finally: + session.close() + + def get_session(self) -> Session: + return self.SessionLocal() + + # ========== Методы для расписаний ========== + + def init_default_schedules(self, table_names: List[str]): + """Инициализирует расписания по умолчанию для списка таблиц""" + session = self.get_session() + try: + for table_name in table_names: + # Проверяем, есть ли уже расписание + schedule = session.query(ReplicationSchedule).filter_by( + table_name=table_name + ).first() + + if not schedule: + # Создаем расписание по умолчанию (каждый день в 00:00) + schedule = ReplicationSchedule( + table_name=table_name, + schedule_time=datetime.strptime("00:00", "%H:%M").time(), + days=[], # Пустой список = все дни + full_reload=False, + enabled=True + ) + session.add(schedule) + migration_logger.debug(f"Создано расписание по умолчанию для {table_name}") + + session.commit() + migration_logger.info(f"Инициализированы расписания по умолчанию для {len(table_names)} таблиц") + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка инициализации расписаний: {e}") + finally: + session.close() + + def add_schedule(self, table_name: str, schedule_time: str, days: Optional[List[str]] = None, + full_reload: bool = False, enabled: bool = True, + name: Optional[str] = None, description: Optional[str] = None) -> Optional[ReplicationSchedule]: + """Добавить НОВОЕ расписание для таблицы""" + session = self.get_session() + try: + # Проверяем, существует ли метаданные + metadata = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + if not metadata: + metadata = self._create_metadata(session, table_name) + + # Парсим время + time_obj = datetime.strptime(schedule_time, "%H:%M").time() + + # Создаем новое расписание + schedule = ReplicationSchedule( + table_name=table_name, + schedule_time=time_obj, + days=days if days else [], + full_reload=full_reload, + enabled=enabled, + name=name or f"{schedule_time} - {'полная' if full_reload else 'инкремент'}", + description=description + ) + session.add(schedule) + session.commit() + + migration_logger.info(f"Добавлено новое расписание для {table_name} в {schedule_time}") + return schedule.to_dict() + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка добавления расписания для {table_name}: {e}") + return None + finally: + session.close() + + def delete_schedule(self, schedule_id: int) -> bool: + """Удалить расписание по ID""" + session = self.get_session() + try: + schedule = session.query(ReplicationSchedule).filter_by( + id=schedule_id + ).first() + + if schedule: + session.delete(schedule) + session.commit() + migration_logger.info(f"Удалено расписание ID={schedule_id} для {schedule.table_name}") + return True + return False + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка удаления расписания: {e}") + return False + finally: + session.close() + + def update_schedule(self, schedule_id: int, **kwargs) -> bool: + """Обновить существующее расписание по ID""" + session = self.get_session() + try: + schedule = session.query(ReplicationSchedule).filter_by( + id=schedule_id + ).first() + + if schedule: + for key, value in kwargs.items(): + if hasattr(schedule, key) and value is not None: + if key == 'schedule_time' and isinstance(value, str): + value = datetime.strptime(value, "%H:%M").time() + setattr(schedule, key, value) + + schedule.updated_at = datetime.now() + session.commit() + migration_logger.info(f"Обновлено расписание ID={schedule_id}") + return True + return False + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка обновления расписания: {e}") + return False + finally: + session.close() + + def get_schedule(self, table_name: str) -> List[ReplicationSchedule]: + """Получает расписание для таблицы""" + session = self.get_session() + try: + schedule = session.query(ReplicationSchedule).filter_by( + table_name=table_name + ).all() + return schedule + finally: + session.close() + + def get_all_schedules(self) -> List[ReplicationSchedule]: + """Получает все расписания""" + session = self.get_session() + try: + return session.query(ReplicationSchedule).all() + finally: + session.close() + + def get_due_schedules(self, current_time: Optional[datetime] = None) -> List[ReplicationSchedule]: + """Получает расписания, которые должны запуститься сейчас""" + if current_time is None: + current_time = datetime.now() + + current_time_obj = current_time.time() + current_weekday = current_time.weekday() + + session = self.get_session() + try: + schedules = session.query(ReplicationSchedule).filter_by( + enabled=True + ).all() + + due = [] + for schedule in schedules: + # Проверяем время и день + time_diff = abs( + (schedule.schedule_time.hour * 60 + schedule.schedule_time.minute) - + (current_time_obj.hour * 60 + current_time_obj.minute) + ) + + if time_diff <= 1 and current_weekday in schedule.days_list: + due.append(schedule) + + return due + finally: + session.close() + + def update_schedule_last_run(self, table_name: str) -> bool: + """Обновляет время последнего запуска расписания""" + session = self.get_session() + try: + schedule = session.query(ReplicationSchedule).filter_by( + table_name=table_name + ).first() + + if schedule: + schedule.last_run = datetime.now() + schedule.updated_at = datetime.now() + session.commit() + return True + return False + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка обновления last_run для {table_name}: {e}") + return False + finally: + session.close() + + def disable_schedule(self, table_name: str) -> bool: + """Отключает расписание""" + session = self.get_session() + try: + schedule = session.query(ReplicationSchedule).filter_by( + table_name=table_name + ).first() + + if schedule: + schedule.enabled = False + schedule.updated_at = datetime.now() + session.commit() + migration_logger.info(f"Отключено расписание для {table_name}") + return True + return False + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка отключения расписания для {table_name}: {e}") + return False + finally: + session.close() + + def enable_schedule(self, table_name: str) -> bool: + """Включает расписание""" + session = self.get_session() + try: + schedule = session.query(ReplicationSchedule).filter_by( + table_name=table_name + ).first() + + if schedule: + schedule.enabled = True + schedule.updated_at = datetime.now() + session.commit() + migration_logger.info(f"Включено расписание для {table_name}") + return True + return False + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка включения расписания для {table_name}: {e}") + return False + finally: + session.close() + + # ========== Методы для статистики ========== + + def sync_table_stats(self, table_name: str, id_column: str): + """ + Синхронизирует статистику таблицы с реальными данными в PostgreSQL + """ + from app.services.data_reader import data_reader + + stats = data_reader.get_table_stats(table_name, id_column) + + session = self.get_session() + try: + metadata = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + if metadata: + metadata.total_rows = stats['total_rows'] + metadata.last_id = stats['max_id'] + metadata.updated_at = datetime.now() + session.commit() + + migration_logger.info(f"Синхронизирована статистика {table_name}: {stats['total_rows']} строк, max_id={stats['max_id']}") + else: + migration_logger.warning(f"Метаданные для {table_name} не найдены") + finally: + session.close() + + def sync_all_tables_stats(self, table_names: List[str], id_columns: dict): + """ + Синхронизирует статистику для всех таблиц + id_columns: словарь {table_name: id_column_name} + """ + migration_logger.info("Синхронизация статистики всех таблиц...") + + for table_name in table_names: + id_column = id_columns.get(table_name, f"{table_name.split('_')[-1]}ID") + self.sync_table_stats(table_name, id_column) + + migration_logger.info("Синхронизация статистики завершена") + +# Глобальный экземпляр +replication_metadata_repo = ReplicationMetadataRepo() \ No newline at end of file diff --git a/app/scheduler.py b/app/scheduler.py new file mode 100644 index 0000000..47dc5d5 --- /dev/null +++ b/app/scheduler.py @@ -0,0 +1,73 @@ +import asyncio +import redis.asyncio as redis +from taskiq import ScheduledTask + +from app.taskiq.broker import broker, scheduler, schedule_source, migrate_table_task +from app.core.logging import migration_logger + +REDIS_URL = "redis://127.0.0.1:6379" +PREFIX = "migration_schedule" + +async def sync_schedules_to_redis(): + """Полная перезапись расписаний в Redis на основе данных из БД.""" + migration_logger.info("Синхронизация расписаний с Redis...") + + # 1. Получаем все активные расписания из БД через database.py + from app.core.database import db_connector + from app.models.replication import ReplicationSchedule + + # Инициализируем engine и session (обращение к свойству создает sessionmaker) + _ = db_connector.dst_engine + SessionLocal = db_connector.dst_session + session = SessionLocal() + + try: + schedules = session.query(ReplicationSchedule).filter( + ReplicationSchedule.enabled == True + ).all() + + # 2. Очищаем все старые ключи расписаний в Redis + r = await redis.from_url(REDIS_URL) + keys = await r.keys(f"{PREFIX}:*") + if keys: + await r.delete(*keys) + await r.close() + + # 3. Добавляем каждое расписание через add_schedule + added = 0 + for schedule in schedules: + hour = schedule.schedule_time.hour + minute = schedule.schedule_time.minute + days_list = schedule.days_list # список чисел 0..6 + + # Формируем cron-выражение + if days_list: + cron_expr = f"{minute} {hour} * * {','.join(map(str, days_list))}" + else: + cron_expr = f"{minute} {hour} * * *" + + # Используем timezone string для cron_offset (UTC+9 = Asia/Tokyo) + task = ScheduledTask( + task_name="migrate_table_task", + labels={}, + cron=cron_expr, + cron_offset='Asia/Tokyo', # Timezone name для UTC+9 + args=[schedule.table_name], + kwargs={"full_reload": schedule.full_reload} + ) + + # Добавляем в Redis через источник + await schedule_source.add_schedule(task) + added += 1 + migration_logger.info(f"Добавлено: {schedule.table_name} в {hour:02d}:{minute:02d} (cron: {cron_expr})") + + migration_logger.info(f"Синхронизировано {added} активных расписаний") + finally: + session.close() + + +async def on_scheduler_startup(): + """Хук для синхронизации расписаний при запуске планировщика.""" + migration_logger.info("Запуск планировщика taskiq...") + await sync_schedules_to_redis() + migration_logger.info("Планировщик готов к работе") \ No newline at end of file diff --git a/app/services/data_reader.py b/app/services/data_reader.py new file mode 100644 index 0000000..ea5b5ad --- /dev/null +++ b/app/services/data_reader.py @@ -0,0 +1,184 @@ +import pandas as pd +from typing import Any, Iterator, List, Optional, Generator + +from sqlalchemy import text +from app.core.database import db_connector +from app.core.logging import migration_logger +from app.core.config import settings + + +class DataReader: + """Чтение данных по ID с поддержкой чанков""" + + def read_by_id_chunked(self, table_name: str, id_column: str, last_id: Optional[int]) -> Generator[pd.DataFrame, None, None]: + """ + Чтение записей порциями (чанками) для больших таблиц + Возвращает генератор DataFrame'ов + """ + try: + if last_id is None: + migration_logger.info(f"Первая загрузка {table_name}") + # Получаем минимальный ID + min_max_query = f"SELECT MIN({id_column}) as min_id, MAX({id_column}) as max_id, COUNT(*) as total FROM {table_name}" + stats_df = pd.read_sql_query(min_max_query, db_connector.src_engine) + + if stats_df.empty or stats_df.iloc[0]['total'] == 0: + migration_logger.warning(f"Таблица {table_name} пуста") + return + + min_id = stats_df.iloc[0]['min_id'] or 0 + max_id = stats_df.iloc[0]['max_id'] + total = stats_df.iloc[0]['total'] + + migration_logger.info(f"Статистика {table_name}: ID от {min_id} до {max_id}, всего {total} строк") + + # Загружаем порциями + current_id = min_id + chunk_num = 0 + + while current_id <= max_id: + chunk_num += 1 + chunk_query = f""" + SELECT * FROM {table_name} + WHERE {id_column} >= {current_id} AND {id_column} < {current_id + settings.CHUNK_SIZE} + ORDER BY {id_column} ASC + """ + + chunk_df = pd.read_sql_query(chunk_query, db_connector.src_engine) + + if not chunk_df.empty: + migration_logger.info(f" Чанк {chunk_num}: загружено {len(chunk_df)} строк (ID: {current_id} - {current_id + settings.CHUNK_SIZE})") + yield chunk_df + + current_id += settings.CHUNK_SIZE + + if chunk_num % settings.BATCH_SIZE == 0: + migration_logger.info(f" Прогресс: обработано {chunk_num} чанков, загружено {total_loaded} строк") + + # Небольшая задержка для снижения нагрузки + if chunk_num % 10 == 0: + import time + time.sleep(0.1) + + else: + # Инкрементальная загрузка - тоже порциями + migration_logger.info(f"📖 Инкрементальная загрузка {table_name} с ID > {last_id}") + + # Получаем максимальный ID для оценки прогресса + max_query = f"SELECT MAX({id_column}) as max_id FROM {table_name}" + max_df = pd.read_sql_query(max_query, db_connector.src_engine) + max_id = max_df.iloc[0]['max_id'] if not max_df.empty else last_id + + current_id = last_id + 1 + chunk_num = 0 + total_loaded = 0 + + while current_id <= max_id: + chunk_num += 1 + chunk_query = f""" + SELECT * FROM {table_name} + WHERE {id_column} >= {current_id} AND {id_column} < {current_id + settings.CHUNK_SIZE} + ORDER BY {id_column} ASC + """ + + chunk_df = pd.read_sql_query(chunk_query, db_connector.src_engine) + + if not chunk_df.empty: + yield chunk_df + total_loaded += len(chunk_df) + migration_logger.info(f" Чанк {chunk_num}: загружено {len(chunk_df)} строк (всего {total_loaded})") + + current_id += settings.CHUNK_SIZE + + if chunk_num % settings.BATCH_SIZE == 0: + migration_logger.info(f" Прогресс: обработано {chunk_num} чанков, загружено {total_loaded} строк") + + # Небольшая задержка + if chunk_num % 10 == 0: + import time + time.sleep(0.1) + + migration_logger.info(f"Инкрементальная загрузка завершена, всего {total_loaded} новых строк") + + except Exception as e: + migration_logger.error(f"Ошибка чтения {table_name}: {e}") + raise + + def prepare_query_for_pymssql(self, query: str) -> str: + """Конвертирует ? плейсхолдеры в %s для pymssql""" + return query.replace('?', '%s') + + def read_custom_query_chunked(self, query: str, params: Optional[tuple] = None, + chunksize: int = 5000) -> Iterator[pd.DataFrame]: + """ + Читает данные по произвольному SQL-запросу пачками. + + Args: + query: SQL-запрос с плейсхолдерами (?) + params: Параметры для запроса (список) + chunksize: Размер пачки + + Returns: + Iterator[pd.DataFrame]: Итератор по пачкам данных + """ + migration_logger.debug(f"Executing custom query: {query[:200]}...") + migration_logger.debug(f"Params: {params}") + + # Используем pandas для чанкованного чтения + if params: + preparedQuery = self.prepare_query_for_pymssql(query) + return pd.read_sql_query(preparedQuery, db_connector.src_engine, params=params, chunksize=chunksize) + else: + return pd.read_sql_query(query, db_connector.src_engine, chunksize=chunksize) + + def get_row_count(self, table_name: str) -> int: + """Получить количество строк в таблице""" + try: + query = f"SELECT COUNT(*) as cnt FROM {table_name}" + df = pd.read_sql_query(query, db_connector.src_engine) + return int(df.iloc[0]['cnt']) if not df.empty else 0 + except Exception as e: + migration_logger.error(f"Ошибка подсчета строк в {table_name}: {e}") + return 0 + + def get_last_id(self, table_name: str, id_column: str) -> int: + """ + Получает максимальный ID из таблицы + """ + try: + table_name = table_name.lower() + query = text(f'SELECT MAX("{id_column}") as max_id FROM {table_name}') + with db_connector.dst_engine as conn: + result = conn.execute(query).scalar() + return int(result) if result is not None else None + except Exception as e: + migration_logger.error(f"Ошибка получения MAX ID для {table_name}: {e}") + return None + + def get_table_stats(self, table_name: str, id_column: str) -> dict: + """ + Получает статистику по таблице назначения (PostgreSQL) + """ + try: + table_name = table_name.lower() + query = text(f""" + SELECT + COUNT(*) as total_rows, + MIN("{id_column}") as min_id, + MAX("{id_column}") as max_id + FROM {table_name} + """) + + with db_connector.dst_engine.connect() as conn: + result = conn.execute(query).first() + + return { + 'total_rows': result[0] or 0, + 'min_id': int(result[1]) if result[1] is not None else None, + 'max_id': int(result[2]) if result[2] is not None else None + } + except Exception as e: + migration_logger.error(f"Ошибка получения статистики для {table_name} в PG: {e}") + return {'total_rows': 0, 'min_id': None, 'max_id': None} + +data_reader = DataReader() \ No newline at end of file diff --git a/app/services/data_writer.py b/app/services/data_writer.py new file mode 100644 index 0000000..e507212 --- /dev/null +++ b/app/services/data_writer.py @@ -0,0 +1,239 @@ +import pandas as pd +from sqlalchemy import text, inspect +from typing import List, Dict, Any, Optional +from app.core.database import db_connector +from app.core.logging import migration_logger + + +class DataWriter: + """Запись данных в PostgreSQL""" + + def table_exists(self, table_name: str) -> bool: + """Проверка существования таблицы""" + try: + inspector = inspect(db_connector.dst_engine) + return inspector.has_table(table_name) + except Exception as e: + migration_logger.error(f"Ошибка проверки таблицы {table_name}: {e}") + return False + + def create_table(self, table_name: str, df: pd.DataFrame): + """Создание таблицы из DataFrame""" + try: + if self.table_exists(table_name.lower()): + migration_logger.info(f"Таблица {table_name} существует, удаляем...") + with db_connector.dst_connection() as conn: + conn.execute(text(f'DROP TABLE IF EXISTS "{table_name.lower()}" CASCADE')) + conn.commit() + + df.to_sql( + table_name.lower(), + db_connector.dst_engine, + if_exists='replace', + index=False, + chunksize=10000 + ) + migration_logger.info(f"Таблица {table_name} создана, {len(df)} строк") + + except Exception as e: + migration_logger.error(f"Ошибка создания таблицы {table_name}: {e}") + raise + + def append_data(self, table_name: str, df: pd.DataFrame): + """Добавление данных с игнорированием дубликатов""" + try: + if df.empty: + return + + # Отключаем проверку внешних ключей на время вставки + with db_connector.dst_engine.connect() as conn: + conn.execute(text("SET session_replication_role = 'replica';")) + + df.to_sql( + table_name.lower(), + conn, + if_exists='append', + index=False, + chunksize=10000, + method='multi' + ) + + conn.execute(text("SET session_replication_role = 'origin';")) + conn.commit() + + migration_logger.info(f"Добавлено {len(df)} строк в {table_name}") + + except Exception as e: + migration_logger.error(f"Ошибка добавления данных в {table_name}: {e}") + raise + + def create_indexes(self, table_name: str, indexes: List[Dict[str, Any]]): + """Создание индексов""" + if not indexes: + return + + pg_table = table_name.lower() + + for index_info in indexes: + try: + index_name = f"idx_{pg_table}_{index_info['name'].lower()}" + index_name = ''.join(c for c in index_name if c.isalnum() or c == '_') + + columns = ', '.join([f'"{col}"' for col in index_info['columns']]) + unique_str = 'UNIQUE ' if index_info.get('unique') else '' + + sql = f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" ON "{pg_table}" ({columns})' + + with db_connector.dst_connection() as conn: + conn.execute(text(sql)) + conn.commit() + + migration_logger.info(f"Создан индекс: {index_name}") + + except Exception as e: + migration_logger.error(f"Ошибка создания индекса {index_info.get('name')}", e) + + def create_foreign_keys(self, table_name: str, foreign_keys: List[Dict[str, str]]): + """Создание внешних ключей""" + if not foreign_keys: + return + + pg_table = table_name.lower() + + for fk_info in foreign_keys: + try: + referenced_table = fk_info['referenced_table'].lower() + + # Проверяем существование таблицы + with db_connector.dst_connection() as conn: + check_table_sql = """ + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = :table_name + ); + """ + result = conn.execute(text(check_table_sql), {"table_name": referenced_table}) + table_exists = result.scalar() + + if not table_exists: + migration_logger.error(f"Таблица '{referenced_table}' не существует. Пропускаем создание внешнего ключа") + continue + + + fk_name = f"fk_{pg_table}_{fk_info['parent_column'].lower()}" + fk_name = ''.join(c for c in fk_name if c.isalnum() or c == '_') + + create_sql = f""" + ALTER TABLE "{pg_table}" + ADD CONSTRAINT "{fk_name}" + FOREIGN KEY ("{fk_info['parent_column']}") + REFERENCES "{referenced_table}" ("{fk_info['referenced_column']}") + """ + + with db_connector.dst_connection() as conn: + conn.execute(text(create_sql)) + conn.commit() + + migration_logger.info(f"Создан внешний ключ: {fk_name}") + + except Exception as e: + migration_logger.error(f"Ошибка создания внешнего ключа {fk_info.get('name', 'unknown')}", e) + + def analyze_table(self, table_name: str): + """Выполнить ANALYZE""" + try: + with db_connector.dst_connection() as conn: + conn.execute(text(f'ANALYZE "{table_name.lower()}"')) + conn.commit() + migration_logger.info(f"ANALYZE выполнен для {table_name}") + except Exception as e: + migration_logger.error(f"Ошибка ANALYZE для {table_name}: {e}") + + def upsert_data(self, table_name: str, df: pd.DataFrame, id_column: str): + """ + Вставляет или обновляет данные в PostgreSQL. + + Args: + table_name: Имя таблицы + df: DataFrame с данными + id_column: Колонка с ID для определения конфликта + """ + if df.empty: + return + + table_name = table_name.lower() + + # Создаем подключение + with db_connector.dst_engine.begin() as conn: + # Получаем существующие ID + existing_ids_query = f"SELECT \"{id_column}\" FROM {table_name}" + existing_ids = pd.read_sql_query(existing_ids_query, conn) + existing_set = set(existing_ids[id_column].tolist()) + + # Разделяем на новые и существующие + new_records = df[~df[id_column].isin(existing_set)] + update_records = df[df[id_column].isin(existing_set)] + print(f"new_records: {len(new_records)}, update_records: {len(update_records)}") + + # Вставляем новые + if not new_records.empty: + new_records.to_sql(table_name, conn, if_exists='append', index=False, method='multi') + migration_logger.info(f" Вставлено {len(new_records)} записей") + + # Обновляем существующие + if not update_records.empty: + print(f"Обновляем {len(update_records)} записей") + + # Преобразуем DataFrame в список словарей для bulk update + records = update_records.to_dict('records') + + # Обновляем каждую запись + for record in records: + id_value = record[id_column] + update_dict = {k: v for k, v in record.items() if k != id_column and v is not None and not pd.isna(v)} + + if update_dict: + set_clause = ', '.join([f'"{k}" = :{k}' for k in update_dict.keys()]) + update_query = text(f'UPDATE {table_name} SET {set_clause} WHERE "{id_column}" = :id_value') + params = {**update_dict, 'id_value': id_value} + conn.execute(update_query, params) + + migration_logger.info(f" Обновлено {len(update_records)} записей") + + def delete_data(self, table_name: str, id_column: str, ids: List[Any]): + """Удаляет данные из PostgreSQL по списку ID""" + if not ids: + return + + table_name = table_name.lower() + total_deleted = 0 + + # Разбиваем на чанки по 1000 ID для избежания проблем с длиной запроса + chunk_size = 1000 + for i in range(0, len(ids), chunk_size): + chunk_ids = ids[i:i+chunk_size] + + # Для одного ID + if len(chunk_ids) == 1: + delete_query = text(f'DELETE FROM {table_name} WHERE "{id_column}" = :id') + params = {"id": chunk_ids[0]} + print(f"query DELETE FROM {table_name} WHERE {id_column} = :id [{chunk_ids[0]}]") + + with db_connector.dst_engine.begin() as conn: + result = conn.execute(delete_query, params) + total_deleted += result.rowcount + + # Для нескольких ID + else: + # Используем ANY (более эффективно для PostgreSQL) + delete_query = text(f'DELETE FROM {table_name} WHERE "{id_column}" = ANY(:ids)') + params = {"ids": chunk_ids} + + with db_connector.dst_engine.begin() as conn: + result = conn.execute(delete_query, params) + total_deleted += result.rowcount + + migration_logger.info(f" Всего удалено: {total_deleted}") + +data_writer = DataWriter() \ No newline at end of file diff --git a/app/services/migrator.py b/app/services/migrator.py new file mode 100644 index 0000000..77f4364 --- /dev/null +++ b/app/services/migrator.py @@ -0,0 +1,607 @@ +import re +from typing import Optional, List, Dict, Any +import traceback +from datetime import datetime +import pandas as pd +from app.services.replication_state import replication_state +from app.services.data_reader import data_reader +from app.services.data_writer import data_writer +from app.services.schema_manager import schema_manager +from app.utils.index_helpers import get_primary_key, get_max_id_from_postgres, get_foreign_keys, get_indexes +from app.utils.email_sender import email_sender +from app.core.logging import migration_logger +from app.core.config import settings +from app.core.database import db_connector + + +class DatabaseMigrator: + """Мигратор данных по ID и по Life-таблицам""" + + def __init__(self): + self.state = replication_state + self.reader = data_reader + self.writer = data_writer + self.schema = schema_manager + self.is_running = False + self.current_table = None + self.start_time = None + self.all_foreign_keys = {} + self.errors = [] + + # Таблицы, которые используют Life-механизм + self.life_tables = getattr(settings, 'LIFE_TABLES', []) + + # Карта соответствия: основная таблица -> Life-таблица + self.life_mapping = {} + + def _parse_table_name(self, table_name: str) -> Dict[str, str]: + """ + Парсит имя таблицы и возвращает компоненты. + + Примеры: + "oms_kl_VisitResult" -> { + 'schema': 'oms', + 'basename': 'kl_VisitResult', + 'full_name': 'oms_kl_VisitResult' + } + "stt_MedicalHistory" -> { + 'schema': 'stt', + 'basename': 'MedicalHistory', + 'full_name': 'stt_MedicalHistory' + } + """ + # Ищем префикс (oms_, stt_, и т.д.) + match = re.match(r'^([A-Za-z]+)_(.*)$', table_name) + if match: + schema = match.group(1) + basename = match.group(2) + return { + 'schema': schema, + 'basename': basename, + 'full_name': table_name + } + else: + # Если нет префикса + return { + 'schema': '', + 'basename': table_name, + 'full_name': table_name + } + + def _get_life_table_name(self, table_name: str) -> Optional[str]: + """Получает имя Life-таблицы для основной таблицы""" + parsed = self._parse_table_name(table_name) + + if parsed['schema']: + return f"Life_{parsed['schema']}_{parsed['basename']}" + else: + return f"Life_{table_name}" + + def _get_life_id_field(self, table_name: str) -> str: + """Получает имя LifeID поля""" + parsed = self._parse_table_name(table_name) + + return f"{parsed['basename']}LifeID" + + def _get_base_id_field(self, table_name: str) -> str: + """Получает имя базового ID поля""" + parsed = self._parse_table_name(table_name) + return f"{parsed['basename']}ID" + + def migrate_table_by_time(self, table_name: str, last_sync_time: datetime) -> Dict[str, int]: + """Миграция таблицы через Life-механизм по времени""" + life_table = self._get_life_table_name(table_name) + base_id_field = self._get_base_id_field(table_name) + life_id_field = self._get_life_id_field(table_name) + + migration_logger.info(f"Миграция {table_name} через {life_table} с {last_sync_time}") + + stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0} + + try: + # Получаем последние версии из Life-таблицы + query = f""" + WITH LatestLife AS ( + SELECT + {base_id_field}, + MAX({life_id_field}) as MaxLifeID + FROM {life_table} + WHERE x_DateTime > CAST(? AS datetime) + GROUP BY {base_id_field} + ) + SELECT dl.* + FROM {life_table} dl + INNER JOIN LatestLife ll + ON dl.{life_id_field} = ll.MaxLifeID + """ + + # Читаем данные чанками + chunk_size = getattr(settings, 'CHUNK_SIZE', 1000) + + for chunk in self.reader.read_custom_query_chunked(query, params=(last_sync_time,), chunksize=chunk_size): + if chunk.empty: + continue + + # Разделяем по операциям + inserts = chunk[chunk['x_Operation'] == 'i'] + updates = chunk[chunk['x_Operation'] == 'u'] + deletes = chunk[chunk['x_Operation'] == 'd'] + + # Обрабатываем вставки + if not inserts.empty: + inserts_to_write = self._prepare_data_for_write(inserts, table_name) + if not inserts_to_write.empty: + self.state.log_operation( + table_name=table_name, + operation='INSERT', + records_count=len(inserts) + ) + self.writer.upsert_data(table_name, inserts_to_write, base_id_field) + stats['inserted'] += len(inserts) + + # Обрабатываем обновления + if not updates.empty: + updates_to_write = self._prepare_data_for_write(updates, table_name) + if not updates_to_write.empty: + self.state.log_operation( + table_name=table_name, + operation='UPDATE', + records_count=len(updates) + ) + self.writer.upsert_data(table_name, updates_to_write, base_id_field) + stats['updated'] += len(updates) + + # Обрабатываем удаления + if not deletes.empty: + self.state.log_operation( + table_name=table_name, + operation='DELETE', + records_count=len(deletes) + ) + delete_ids = deletes[base_id_field].tolist() + self.writer.delete_data(table_name, base_id_field, delete_ids) + stats['deleted'] += len(deletes) + + stats['total'] += len(chunk) + + migration_logger.info(f" Чанк: +{len(inserts)} вставок, ~{len(updates)} обновлений, -{len(deletes)} удалений") + + if stats['total'] > 0: + migration_logger.info(f"{table_name}: +{stats['inserted']} вставок, ~{stats['updated']} обновлений, -{stats['deleted']} удалений") + else: + migration_logger.info(f"ℹ️ {table_name}: изменений нет") + + except Exception as e: + error_msg = f"Ошибка при миграции {table_name} через Life: {e}" + migration_logger.error(error_msg) + migration_logger.error(e.args) + self.state.log_operation( + table_name=table_name, + operation='ERROR', + records_count=0, + status='ERROR', + error_message=str(e)[:500] + ) + self.errors.append({ + 'table': table_name, + 'error': error_msg, + 'traceback': traceback.format_exc(), + 'time': datetime.now() + }) + raise + + return stats + + def _prepare_data_for_write(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame: + """Подготавливает данные из Life-таблицы для записи в основную таблицу""" + # Исключаем служебные поля + exclude_fields = {'x_Operation', 'x_DateTime', 'x_Seance', 'x_User'} + + # Определяем, какие поля оставить + fields_to_keep = [] + for col in df.columns: + if col not in exclude_fields and not col.endswith('LifeID'): + fields_to_keep.append(col) + + result = df[fields_to_keep].copy() + + # Убеждаемся, что нет дубликатов по ID + base_id_field = self._get_base_id_field(table_name) + result = result.drop_duplicates(subset=[base_id_field]) + + return result + + def migrate_table(self, table_name: str, full_reload: bool = False) -> bool: + """Миграция одной таблицы (поддерживает и ID, и Life)""" + migration_logger.table_start(table_name) + self.current_table = table_name + table_start_time = datetime.now() + + try: + # Получаем ID колонку для статистики + id_column = get_primary_key(table_name) + + # Определяем, использует ли таблица Life-механизм + uses_life = table_name in self.life_tables + + if uses_life and not full_reload: + # МИГРАЦИЯ ЧЕРЕЗ LIFE-ТАБЛИЦУ ПО ВРЕМЕНИ + last_sync = self.state.get_table_last_sync(table_name) + + if last_sync: + stats = self.migrate_table_by_time(table_name, last_sync) + + # Обновляем время синхронизации + self.state.update_table_sync_time(table_name) + + # Обновляем статистику + if id_column: + self._update_table_statistics(table_name, id_column) + + migration_logger.table_success(table_name, stats['total']) + return True + else: + # Если синхронизации не было - делаем полную загрузку + migration_logger.info(f"Первая синхронизация {table_name} - полная загрузка") + full_reload = True + + if full_reload: + # ПОЛНАЯ ПЕРЕЗАГРУЗКА (по ID) + result = self._full_reload_by_id(table_name) + + # Обновляем статистику после полной загрузки + if result and id_column: + self._update_table_statistics(table_name, id_column) + + return result + else: + # ИНКРЕМЕНТАЛЬНАЯ ПО ID (для таблиц без Life) + result = self._incremental_by_id(table_name) + + # Обновляем статистику после инкрементальной загрузки + if result and id_column: + self._update_table_statistics(table_name, id_column) + + return result + + except Exception as e: + error_msg = f"Критическая ошибка при обработке {table_name}: {e}" + migration_logger.error(error_msg) + self.errors.append({ + 'table': table_name, + 'error': error_msg, + 'traceback': traceback.format_exc(), + 'time': datetime.now() + }) + return False + finally: + self.current_table = None + + def _full_reload_by_id(self, table_name: str) -> bool: + """Полная перезагрузка таблицы по ID""" + migration_logger.info(f"Полная загрузка {table_name} по ID") + + try: + # Получаем ID колонку + id_column = get_primary_key(table_name) + if not id_column: + error_msg = f"Не могу найти ID колонку для {table_name}" + migration_logger.error(error_msg) + self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()}) + return False + + # Получаем метаданные + foreign_keys = get_foreign_keys(table_name) + indexes = get_indexes(table_name) + + # Загружаем данные чанками + first_chunk = True + total_rows = 0 + + for chunk in self.reader.read_by_id_chunked(table_name, id_column, None): + if first_chunk: + self.writer.create_table(table_name, chunk) + first_chunk = False + else: + self.writer.append_data(table_name, chunk) + + total_rows += len(chunk) + + if total_rows == 0: + migration_logger.warning(f"Таблица {table_name} пуста") + return True + + # Создаем индексы + if indexes: + self.writer.create_indexes(table_name, indexes) + migration_logger.info(f"📇 Создано {len(indexes)} индексов") + + # Сохраняем информацию о внешних ключах + if foreign_keys: + self.all_foreign_keys[table_name] = foreign_keys + migration_logger.info(f"🔗 Сохранено {len(foreign_keys)} внешних ключей") + + # Обновляем last_id и время синхронизации + #max_id = self._get_max_id(table_name, id_column) + #self.state.update_last_id(table_name, max_id) + #self.state.update_table_sync_time(table_name) + #self.state.update_table_stats(table_name, total_rows) + + return True + + except Exception as e: + error_msg = f"Ошибка при полной загрузке {table_name}: {e}" + migration_logger.error(error_msg) + self.errors.append({ + 'table': table_name, + 'error': error_msg, + 'traceback': traceback.format_exc(), + 'time': datetime.now() + }) + return False + + def _incremental_by_id(self, table_name: str) -> bool: + """Инкрементальная загрузка по ID (для таблиц без Life)""" + migration_logger.info(f"Инкрементальная загрузка {table_name} по ID") + + try: + id_column = get_primary_key(table_name) + if not id_column: + error_msg = f"Не могу найти ID колонку для {table_name}" + migration_logger.error(error_msg) + self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()}) + return False + + foreign_keys = get_foreign_keys(table_name) + indexes = get_indexes(table_name) + + # Проверяем новые колонки + new_columns = self.schema.detect_new_columns(table_name) + if new_columns: + self.schema.add_new_columns(table_name, new_columns) + + # Получаем последний ID + last_id = self.state.get_last_id(table_name) + migration_logger.info(f"last_id из состояния: {last_id}") + + if last_id is None: + last_id = get_max_id_from_postgres(table_name, id_column) + migration_logger.info(f"Последний ID в PG: {last_id}") + + # Загружаем новые данные + total_loaded = 0 + first_chunk = True + + for chunk in self.reader.read_by_id_chunked(table_name, id_column, last_id): + if first_chunk: + # Проверяем структуру + pg_cols = {c['name'] for c in self.schema.get_postgres_columns(table_name)} + if not pg_cols.issubset(set(chunk.columns)): + missing = pg_cols - set(chunk.columns) + migration_logger.warning(f"В PG есть колонки, которых нет в чанке: {missing}") + first_chunk = False + + self.writer.append_data(table_name, chunk) + total_loaded += len(chunk) + + #if total_loaded > 0: + # # Обновляем последний ID и время синхронизации + # max_id = self._get_max_id(table_name, id_column) + # self.state.update_last_id(table_name, max_id) + # self.state.update_table_sync_time(table_name) + # self.state.update_table_stats(table_name, total_loaded) + + # Сохраняем FK для создания позже + if foreign_keys: + if table_name not in self.all_foreign_keys: + self.all_foreign_keys[table_name] = [] + self.all_foreign_keys[table_name].extend(foreign_keys) + + return True + + except Exception as e: + error_msg = f"Ошибка при инкрементальной загрузке {table_name}: {e}" + migration_logger.error(error_msg) + self.errors.append({ + 'table': table_name, + 'error': error_msg, + 'traceback': traceback.format_exc(), + 'time': datetime.now() + }) + return False + + def _get_max_id(self, table_name: str, id_column: str) -> int: + """Получает максимальный ID из источника""" + max_id_query = f"SELECT MAX({id_column}) as max_id FROM {table_name}" + max_df = pd.read_sql_query(max_id_query, db_connector.src_engine) + return int(max_df.iloc[0]['max_id']) if not max_df.empty else 0 + + def create_all_foreign_keys(self): + """Создать все внешние ключи после завершения миграции""" + if not self.all_foreign_keys: + migration_logger.info("ℹ️ Нет внешних ключей для создания") + return + + migration_logger.info("="*60) + migration_logger.info("🔗 СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ") + migration_logger.info("="*60) + + for table_name, foreign_keys in self.all_foreign_keys.items(): + try: + existing = self.schema.check_foreign_keys_exist(table_name) + existing_names = {f['name'] for f in existing} + + to_create = [fk for fk in foreign_keys if fk['name'] not in existing_names] + + if to_create: + self.writer.create_foreign_keys(table_name, to_create) + else: + migration_logger.info(f"Все внешние ключи для {table_name} уже существуют") + + except Exception as e: + error_msg = f"Ошибка создания FK для {table_name}: {e}" + migration_logger.error(error_msg) + self.errors.append({ + 'table': table_name, + 'error': error_msg, + 'traceback': traceback.format_exc(), + 'time': datetime.now() + }) + + def run_migration(self, tables: Optional[List[str]] = None, full_reload: bool = False, send_email: bool = True): + """Запуск миграции для всех таблиц""" + self.is_running = True + self.start_time = datetime.now() + self.all_foreign_keys = {} + self.errors = [] + + if tables is None: + tables = settings.TABLES_TO_COPY + + last_replication = self.state.get_last_replication_time() + + migration_logger.info("="*70) + migration_logger.info("НАЧАЛО МИГРАЦИИ") + migration_logger.info(f"Время старта: {self.start_time}") + if last_replication: + migration_logger.info(f"Последняя миграция: {last_replication}") + migration_logger.info(f"Таблиц для обработки: {len(tables)}") + migration_logger.info(f"Режим: {'ПОЛНАЯ' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'}") + migration_logger.info(f"Таблицы с Life-механизмом: {self.life_tables}") + migration_logger.info("="*70) + + results = {} + for i, table_name in enumerate(tables, 1): + if not self.is_running: + migration_logger.warning("Миграция остановлена пользователем") + break + + migration_logger.info(f"\n[{i}/{len(tables)}] Обработка таблицы {table_name}") + results[table_name] = self.migrate_table(table_name, full_reload) + + # Создаем внешние ключи после всех таблиц + self.create_all_foreign_keys() + + total_time = (datetime.now() - self.start_time).total_seconds() + stats = self.state.get_all_stats() + + self._log_final_stats(results, stats, total_time) + + # Отправляем уведомление + if send_email: + self._send_notification(results, stats, total_time) + + self.is_running = False + return results + + def _log_final_stats(self, results: dict, stats: dict, total_time: float): + """Логирует финальную статистику""" + migration_logger.info("="*70) + migration_logger.info("ИТОГОВАЯ СТАТИСТИКА") + migration_logger.info("="*70) + migration_logger.info(f"Успешно: {sum(1 for r in results.values() if r)}/{len(results)}") + migration_logger.info(f"Ошибок: {len(self.errors)}") + migration_logger.info(f"Всего строк в БД: {stats.get('total_rows', 0)}") + migration_logger.info(f"Общее время: {total_time:.1f}с") + migration_logger.info("="*70) + + def _send_notification(self, results: dict, stats: dict, total_time: float): + """Отправляет уведомление о результате""" + if self.errors: + error_body = self._build_error_email_body(results, stats, total_time) + email_sender.send_email( + subject=f"МИГРАЦИЯ С ОШИБКАМИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}", + body=error_body + ) + # else: + # email_sender.send_success_notification(stats, total_time) + + def _build_error_email_body(self, results: dict, stats: dict, total_time: float) -> str: + """Строит тело письма с ошибками""" + body = f""" +🚨 МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ +{'='*60} + +Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +Длительность: {total_time:.1f} сек + +СТАТИСТИКА: +{'='*40} +Успешно: {sum(1 for r in results.values() if r)}/{len(results)} +Ошибок: {len(self.errors)} +Всего строк: {stats.get('total_rows', 0)} + +СПИСОК ОШИБОК: +{'='*40} +""" + for i, err in enumerate(self.errors, 1): + body += f"\n{i}. Таблица: {err.get('table', 'N/A')}\n" + body += f" Ошибка: {err['error']}\n" + body += f" Время: {err['time'].strftime('%H:%M:%S') if 'time' in err else 'N/A'}\n" + + return body + + def stop_migration(self): + self.is_running = False + migration_logger.warning("Миграция остановлена") + email_sender.send_email( + subject=f"МИГРАЦИЯ ОСТАНОВЛЕНА - {datetime.now().strftime('%Y-%m-%d %H:%M')}", + body=f"Миграция была остановлена пользователем в {datetime.now().strftime('%H:%M:%S')}" + ) + + def get_status(self) -> dict: + if not self.is_running: + return { + 'is_running': False, + 'last_errors': len(self.errors), + 'last_replication': self.state.get_last_replication_info() + } + + elapsed = (datetime.now() - self.start_time).total_seconds() if self.start_time else 0 + + return { + 'is_running': True, + 'current_table': self.current_table, + 'elapsed_seconds': elapsed, + 'errors_count': len(self.errors) + } + + def _update_table_statistics(self, table_name: str, id_column: str): + """ + Обновляет статистику таблицы на основе реальных данных в PostgreSQL + Вызывается сразу после миграции таблицы + """ + try: + migration_logger.info(f"Обновление статистики для {table_name}...") + + # Получаем реальную статистику из PostgreSQL + dst_stats = self.reader.get_table_stats(table_name, id_column) + + # Обновляем в метаданных + self.state.update_last_id(table_name, dst_stats['max_id']) + + # Для total_rows нужно установить точное значение, а не добавлять + # Поэтому используем отдельный метод для установки + replication_state._set_table_total_rows(table_name, dst_stats['total_rows']) + + migration_logger.info(f" Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}") + + # Логируем операцию + self.state.log_operation( + table_name=table_name, + operation='STATS_UPDATE', + records_count=dst_stats['total_rows'], + status='SUCCESS' + ) + + except Exception as e: + migration_logger.error(f"Ошибка обновления статистики для {table_name}: {e}") + self.state.log_operation( + table_name=table_name, + operation='STATS_UPDATE', + records_count=0, + status='ERROR', + error_message=str(e)[:500] + ) + +migrator = DatabaseMigrator() \ No newline at end of file diff --git a/app/services/replication_state.py b/app/services/replication_state.py new file mode 100644 index 0000000..df68c9c --- /dev/null +++ b/app/services/replication_state.py @@ -0,0 +1,89 @@ +import json +from datetime import datetime +from pathlib import Path +from typing import Dict, Any, Optional +from app.repository.replication_metadata_repo import replication_metadata_repo +from app.core.logging import migration_logger +from app.core.config import settings + + +class ReplicationState: + """Хранение состояния репликации по ID""" + + def __init__(self): + self.replication_repo = replication_metadata_repo + self.replication_repo.init_metadata_table() + + def get_last_replication_time(self) -> Optional[datetime]: + """Получает время последней репликации (максимальное по всем таблицам)""" + stats = self.replication_repo.get_all_stats() + if stats['tables']: + times = [t['last_sync'] for t in stats['tables'] if t['last_sync']] + return max(times) if times else None + return None + + def get_last_replication_info(self) -> dict: + """Получает информацию о последней репликации""" + stats = self.replication_repo.get_all_stats() + return { + 'last_replication': self.get_last_replication_time(), + 'total_rows': stats['total_rows'], + 'tables_count': stats['tables_count'] + } + + def get_table_last_sync(self, table_name: str) -> Optional[datetime]: + """Получает время последней синхронизации таблицы""" + return self.replication_repo.get_last_sync_time(table_name) + + def get_last_id(self, table_name: str) -> Optional[int]: + """Получает последний обработанный ID для таблицы""" + return self.replication_repo.get_last_id(table_name) + + def update_last_id(self, table_name: str, last_id: int): + """Обновляет последний обработанный ID""" + self.replication_repo.update_last_id(table_name, last_id) + + def update_table_sync_time(self, table_name: str): + """Обновляет время синхронизации таблицы""" + self.replication_repo.update_sync_time(table_name) + + def update_table_stats(self, table_name: str, added_rows: int): + """Обновляет статистику таблицы""" + self.replication_repo.update_table_stats(table_name, added_rows) + + def get_all_stats(self) -> dict: + """Получает статистику по всем таблицам""" + return self.replication_repo.get_all_stats() + + def log_operation(self, table_name: str, operation: str, records_count: int, + status: str = "SUCCESS", error_message: Optional[str] = None): + """Логирует операцию""" + self.replication_repo.log_operation(table_name, operation, records_count, status, error_message) + + def _set_table_total_rows(self, table_name: str, total_rows: int): + """ + Устанавливает точное количество строк для таблицы + (в отличие от update_table_stats, который добавляет) + """ + from app.models.replication import ReplicationMetadata + + session = self.replication_repo.get_session() + try: + metadata = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + if metadata: + metadata.total_rows = total_rows + metadata.updated_at = datetime.now() + session.commit() + migration_logger.debug(f" Установлено total_rows={total_rows} для {table_name}") + else: + migration_logger.warning(f"Метаданные для {table_name} не найдены") + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка установки total_rows для {table_name}: {e}") + finally: + session.close() + +replication_state = ReplicationState() \ No newline at end of file diff --git a/app/services/scheduler.py b/app/services/scheduler.py new file mode 100644 index 0000000..b5c1dc1 --- /dev/null +++ b/app/services/scheduler.py @@ -0,0 +1,166 @@ +# app/services/scheduler.py + +import asyncio +from datetime import datetime +from typing import List, Optional +import time as time_module + +from app.core.logging import migration_logger +from app.core.config import settings +from app.services.migrator import migrator +from app.repository.replication_metadata_repo import replication_metadata_repo +from app.utils.email_sender import email_sender + + +class MigrationScheduler: + """Планировщик миграций на базе PostgreSQL""" + + def __init__(self): + self.running = False + self.repo = replication_metadata_repo + + # Инициализируем расписания по умолчанию + self._init_default_schedules() + + def _init_default_schedules(self): + """Инициализация расписаний по умолчанию""" + self.repo.init_default_schedules(settings.TABLES_TO_COPY) + migration_logger.info("Расписания по умолчанию инициализированы") + + def set_schedule(self, table_name: str, schedule_time: str = "00:00", + days: Optional[List[str]] = None, full_reload: bool = False, + enabled: bool = True): + """Установить расписание для таблицы""" + if table_name not in settings.TABLES_TO_COPY: + raise ValueError(f"Таблица {table_name} не найдена в списке для миграции") + + # Валидация формата времени + try: + datetime.strptime(schedule_time, "%H:%M") + except ValueError: + raise ValueError(f"Неверный формат времени: {schedule_time}. Используйте HH:MM") + + schedule = self.repo.set_schedule( + table_name=table_name, + schedule_time=schedule_time, + days=days, + full_reload=full_reload, + enabled=enabled + ) + + if schedule: + days_str = ', '.join(schedule.days_display) if days else 'все дни' + migration_logger.info( + f"Установлено расписание для {table_name}: " + f"{schedule_time} [{days_str}] (full_reload={full_reload})" + ) + + def get_schedule(self, table_name: str): + """Получить расписание для таблицы""" + return self.repo.get_schedule(table_name) + + def get_all_schedules(self): + """Получить все расписания""" + return self.repo.get_all_schedules() + + def disable_schedule(self, table_name: str): + """Отключить расписание""" + self.repo.disable_schedule(table_name) + + def enable_schedule(self, table_name: str): + """Включить расписание""" + self.repo.enable_schedule(table_name) + + def delete_schedule(self, table_name: str): + """Удалить расписание""" + self.repo.delete_schedule(table_name) + + def get_due_tables(self, current_time: Optional[datetime] = None) -> List: + """Получить таблицы для запуска сейчас""" + due = self.repo.get_due_schedules(current_time) + return due + + async def run_due_migrations(self): + """Запустить миграции по расписанию""" + due_schedules = self.get_due_tables() + + if not due_schedules: + return + + migration_logger.info(f"Найдено {len(due_schedules)} таблиц для миграции по расписанию") + + for schedule in due_schedules: + try: + days_str = ', '.join(schedule.days_display) + migration_logger.info( + f"Запуск миграции по расписанию для {schedule.table_name} " + f"в {schedule.schedule_time.strftime('%H:%M')} [{days_str}]" + ) + + # Запускаем миграцию + result = await asyncio.to_thread( + migrator.run_migration, + tables=[schedule.table_name], + full_reload=schedule.full_reload, + send_email=True + ) + + # Обновляем время последнего запуска + self.repo.update_schedule_last_run(schedule.table_name) + + # Логируем успешный запуск + self.repo.log_operation( + table_name=schedule.table_name, + operation='SCHEDULED', + records_count=0, + status='SUCCESS' + ) + + migration_logger.info(f"Миграция по расписанию для {schedule.table_name} завершена") + + except Exception as e: + error_msg = f"Ошибка при миграции по расписанию для {schedule.table_name}: {e}" + migration_logger.error(error_msg) + + # Логируем ошибку + self.repo.log_operation( + table_name=schedule.table_name, + operation='SCHEDULED', + records_count=0, + status='ERROR', + error_message=str(e)[:500] + ) + + email_sender.send_error_notification( + error_message=error_msg, + table_name=schedule.table_name + ) + + def start_scheduler(self): + """Запустить планировщик""" + self.running = True + migration_logger.info("Планировщик миграций запущен") + + while self.running: + try: + # Проверяем, есть ли таблицы для миграции + asyncio.run(self.run_due_migrations()) + + # Ждем 60 секунд до следующей проверки + time_module.sleep(60) + + except KeyboardInterrupt: + self.stop_scheduler() + break + except Exception as e: + migration_logger.error(f"Ошибка в планировщике: {e}") + time_module.sleep(60) + + def stop_scheduler(self): + """Остановить планировщик""" + self.running = False + migration_logger.info("Планировщик миграций остановлен") + + +# Глобальный экземпляр +scheduler = MigrationScheduler() \ No newline at end of file diff --git a/app/services/schema_manager.py b/app/services/schema_manager.py new file mode 100644 index 0000000..32eafb5 --- /dev/null +++ b/app/services/schema_manager.py @@ -0,0 +1,150 @@ +from typing import List, Dict, Any, Optional +import pandas as pd +from sqlalchemy import inspect, text +from app.core.database import db_connector +from app.core.logging import migration_logger + + +class SchemaManager: + """Управление схемой таблиц""" + + def get_mssql_columns(self, table_name: str) -> List[Dict[str, Any]]: + """Получить структуру колонок из MSSQL""" + try: + query = f""" + SELECT + COLUMN_NAME, + DATA_TYPE, + IS_NULLABLE, + CHARACTER_MAXIMUM_LENGTH + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = '{table_name}' + ORDER BY ORDINAL_POSITION + """ + + df = pd.read_sql_query(query, db_connector.src_engine) + return df.to_dict('records') + except Exception as e: + migration_logger.error(f"Ошибка получения колонок из MSSQL для {table_name}: {e}") + return [] + + def get_postgres_columns(self, table_name: str) -> List[Dict[str, Any]]: + """Получить структуру колонок из PostgreSQL""" + try: + inspector = inspect(db_connector.dst_engine) + columns = inspector.get_columns(table_name.lower()) + return [{'name': c['name'], 'type': str(c['type']), 'nullable': c['nullable']} for c in columns] + except Exception as e: + migration_logger.error(f"Ошибка получения колонок из PG для {table_name}: {e}") + return [] + + def detect_new_columns(self, table_name: str) -> List[Dict[str, Any]]: + """Обнаружить новые колонки в MSSQL, которых нет в PostgreSQL""" + mssql_cols = self.get_mssql_columns(table_name) + pg_cols = self.get_postgres_columns(table_name) + + pg_col_names = {c['name'] for c in pg_cols} + + new_columns = [] + for col in mssql_cols: + if col['COLUMN_NAME'] not in pg_col_names: + new_columns.append({ + 'name': col['COLUMN_NAME'], + 'type': self._mssql_to_postgres_type(col['DATA_TYPE'], col['CHARACTER_MAXIMUM_LENGTH']), + 'nullable': True # Всегда создаем как NULL для новых колонок + }) + + if new_columns: + migration_logger.info(f"🔍 Обнаружено {len(new_columns)} новых колонок в {table_name}: {[c['name'] for c in new_columns]}") + + return new_columns + + def _mssql_to_postgres_type(self, mssql_type: str, max_length: Optional[int]) -> str: + """Конвертация типа MSSQL в PostgreSQL""" + mssql_type = mssql_type.lower() + + type_map = { + 'int': 'INTEGER', + 'bigint': 'BIGINT', + 'smallint': 'SMALLINT', + 'tinyint': 'SMALLINT', + 'bit': 'BOOLEAN', + 'float': 'DOUBLE PRECISION', + 'real': 'REAL', + 'decimal': 'NUMERIC', + 'numeric': 'NUMERIC', + 'datetime': 'TIMESTAMP', + 'datetime2': 'TIMESTAMP', + 'date': 'DATE', + 'time': 'TIME', + 'char': f'CHAR({max_length})' if max_length else 'CHAR(1)', + 'nchar': f'CHAR({max_length})' if max_length else 'CHAR(1)', + 'varchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT', + 'nvarchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT', + 'text': 'TEXT', + 'ntext': 'TEXT', + 'uniqueidentifier': 'UUID', + } + + return type_map.get(mssql_type, 'TEXT') + + def add_new_columns(self, table_name: str, new_columns: List[Dict[str, Any]]): + """Добавить новые колонки в PostgreSQL""" + if not new_columns: + return + + pg_table = table_name.lower() + + for col in new_columns: + try: + alter_sql = f'ALTER TABLE "{pg_table}" ADD COLUMN "{col["name"]}" {col["type"]} NULL' + + with db_connector.dst_connection() as conn: + conn.execute(text(alter_sql)) + conn.commit() + + migration_logger.info(f"Добавлена новая колонка {col['name']} ({col['type']}) в {pg_table}") + + except Exception as e: + migration_logger.error(f"Ошибка добавления колонки {col['name']} в {table_name}: {e}") + + def check_foreign_keys_exist(self, table_name: str) -> List[Dict[str, Any]]: + """Проверить, какие внешние ключи уже существуют в PostgreSQL""" + try: + query = f""" + SELECT + tc.constraint_name, + kcu.column_name, + ccu.table_name AS foreign_table_name, + ccu.column_name AS foreign_column_name + FROM + information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' + AND tc.table_name = '{table_name.lower()}' + """ + + df = pd.read_sql_query(query, db_connector.dst_engine) + + existing_fks = [] + for _, row in df.iterrows(): + existing_fks.append({ + 'name': row['constraint_name'], + 'column': row['column_name'], + 'references_table': row['foreign_table_name'], + 'references_column': row['foreign_column_name'] + }) + + return existing_fks + + except Exception as e: + migration_logger.error(f"Ошибка проверки FK в PostgreSQL для {table_name}: {e}") + return [] + + +schema_manager = SchemaManager() \ No newline at end of file diff --git a/app/taskiq/broker.py b/app/taskiq/broker.py new file mode 100644 index 0000000..21627c9 --- /dev/null +++ b/app/taskiq/broker.py @@ -0,0 +1,140 @@ +import asyncio +from datetime import datetime, time +import os +from typing import Any, Dict + +from taskiq import ScheduledTask, TaskiqScheduler +from taskiq_redis import ListQueueBroker, ListRedisScheduleSource, RedisAsyncResultBackend, RedisStreamBroker, RedisScheduleSource + +import logging +logging.getLogger("taskiq.scheduler").setLevel(logging.DEBUG) +logging.getLogger("taskiq.broker").setLevel(logging.DEBUG) + +# ---------- Настройка результата ---------- +result_backend = RedisAsyncResultBackend( + redis_url="redis://127.0.0.1:6379/0", + result_ex_time=86400, # результаты хранятся 24 часа +) + +# ---------- Настройка брокера (очередь с подтверждениями) ---------- +broker = ListQueueBroker( + url="redis://127.0.0.1:6379/0", + queue_name="migration_queue", # имя очереди + #consumer_group_name="migration_group", # группа потребителей + #maxlen=1000, # максимальная длина очереди +).with_result_backend(result_backend) + +# ---------- Источник расписаний (динамический, на Redis) ---------- +schedule_source = ListRedisScheduleSource( + url="redis://127.0.0.1:6379/0", + prefix="migration_schedule", # префикс для ключей в Redis +) + + +# ---------- Планировщик с кастомным startup ---------- +class SchedulerWithStartup(TaskiqScheduler): + """Планировщик с синхронизацией расписаний при запуске.""" + + async def startup(self) -> None: + """Запуск планировщика с синхронизацией расписаний.""" + from app.scheduler import sync_schedules_to_redis + from app.core.logging import migration_logger + + migration_logger.info("Запуск планировщика taskiq...") + # Синхронизируем расписания из БД в Redis + await sync_schedules_to_redis() + migration_logger.info("Планировщик готов к работе") + await super().startup() + + +scheduler = SchedulerWithStartup( + broker=broker, + sources=[schedule_source], +) + +# ---------- Задача для миграции ---------- +@broker.task( + task_name="migrate_table_task", # Короткое имя для удобства + retry_on_error=True, + max_retries=3, + retry_delay=60 +) +async def migrate_table_task(table_name: str, full_reload: bool = False) -> Dict[str, Any]: + """ + Асинхронная задача миграции. + Здесь вызывается синхронный код миграции (через asyncio.to_thread, чтобы не блокировать). + """ + from app.services.migrator import migrator + from app.core.logging import migration_logger + + migration_logger.info(f"Запуск миграции для {table_name} (full_reload={full_reload})") + + try: + # Запускаем синхронную функцию в отдельном потоке, чтобы не блокировать event loop + result = await asyncio.to_thread( + migrator.run_migration, + tables=[table_name], + full_reload=full_reload, + send_email=True + ) + complex.info(f"Миграция {table_name} завершена") + return {"success": True, "table": table_name, "result": result} + except Exception as e: + migration_logger.error(f"Ошибка миграции {table_name}: {e}") + raise # для retry + +# Функция синхронизации расписаний из БД в Redis (общая для startup и API) +async def refresh_schedules(): + from app.services.scheduler import scheduler as db_scheduler + from app.core.logging import migration_logger + migration_logger.info("Обновление расписаний в Redis...") + # Получаем все активные расписания из БД + schedules = db_scheduler.get_all_schedules() + # Очищаем старые ключи в Redis (можно через прямой redis) + import redis.asyncio as redis + r = await redis.from_url("redis://127.0.0.1:6379/0") + keys = await r.keys("migration_schedule:*") + if keys: + await r.delete(*keys) + await r.close() + + for s in schedules: + if not s.enabled: + continue + hour = s.schedule_time.hour + minute = s.schedule_time.minute + days = s.days_list + cron = f"{minute} {hour} * * {','.join(map(str, days))}" if len(days) < 6 else f"{minute} {hour} * * *" + # await migrate_table_task.schedule_by_cron( + # schedule_source, + # cron, + # s.table_name, + # s.full_reload + # ) + task = ScheduledTask( + task_name="migrate_table_task", + labels={}, + cron=cron, + cron_offset='Asia/Tokyo', # UTC+9 + args=[s.table_name], + kwargs={"full_reload": s.full_reload} + ) + await schedule_source.add_schedule(task) + # schedules_itms = await schedule_source.get_schedules() + # for itm in schedules_itms: + # print(itm) + migration_logger.info("Расписания обновлены") + +@broker.task +async def test_task(message: str = "Hello, world!"): + """Простая тестовая задача.""" + import logging + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + logger.info(f"Тестовая задача выполнена: {message}") + return {"status": "ok", "message": message} + +# Startup для планировщика (выполняется при запуске через CLI) +# @broker.on_event('on_startup') +# async def on_startup(): + # await refresh_schedules() \ No newline at end of file diff --git a/app/utils/email_sender.py b/app/utils/email_sender.py new file mode 100644 index 0000000..05fc13c --- /dev/null +++ b/app/utils/email_sender.py @@ -0,0 +1,138 @@ +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication +from typing import List, Optional +from datetime import datetime +from app.core.config import settings +from app.core.logging import migration_logger + + +class EmailSender: + """Класс для отправки email уведомлений""" + + def __init__(self): + self.smtp_server = settings.EMAIL_HOST + self.smtp_port = settings.EMAIL_PORT + self.username = settings.EMAIL_USER + self.password = settings.EMAIL_PASSWORD + self.from_addr = settings.EMAIL_FROM + self.to_addrs = settings.EMAIL_TO + + def send_email(self, subject: str, body: str, attachments: Optional[List[str]] = None) -> bool: + """Отправка email с вложениями""" + if not all([self.smtp_server, self.username, self.password, self.from_addr]): + migration_logger.warning("Настройки email не заполнены. Отправка email пропущена.") + return False + + try: + # Создаем сообщение + msg = MIMEMultipart() + msg['From'] = self.from_addr + msg['To'] = ', '.join(self.to_addrs) + msg['Subject'] = subject + msg['Date'] = datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z') + + # Добавляем текст + msg.attach(MIMEText(body, 'plain', 'utf-8')) + + # Добавляем вложения + if attachments: + for file_path in attachments: + try: + with open(file_path, 'rb') as f: + part = MIMEApplication(f.read(), Name=file_path.split('/')[-1]) + part['Content-Disposition'] = f'attachment; filename="{file_path.split("/")[-1]}"' + msg.attach(part) + except Exception as e: + migration_logger.error(f"Ошибка при добавлении вложения {file_path}: {e}") + + # Отправляем + with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: + server.login(self.username, self.password) + server.send_message(msg) + + migration_logger.info(f"Email успешно отправлен на {', '.join(self.to_addrs)}") + return True + + except Exception as e: + migration_logger.error(f"Ошибка при отправке email: {e}") + return False + + def send_error_notification(self, error_message: str, traceback_str: str = None, table_name: str = None): + """Отправить уведомление об ошибке""" + subject = f"ОШИБКА МИГРАЦИИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}" + + body = f""" +🚨 ОШИБКА В ПРОЦЕССЕ МИГРАЦИИ ДАННЫХ +{'='*60} + +Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +Сервис: Migration Service + +""" + if table_name: + body += f"Таблица: {table_name}\n" + + body += f""" +Ошибка: +{error_message} + +""" + if traceback_str: + body += f""" +📎 Детали: +{traceback_str} + +""" + + body += """ +🔧 Необходимо проверить логи и устранить проблему. +""" + + return self.send_email(subject, body) + + def send_success_notification(self, stats: dict, duration: float): + """Отправить уведомление об успешной миграции""" + subject = f"УСПЕШНАЯ МИГРАЦИЯ - {datetime.now().strftime('%Y-%m-%d %H:%M')}" + + body = f""" +МИГРАЦИЯ ДАННЫХ УСПЕШНО ЗАВЕРШЕНА +{'='*60} + +Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +Длительность: {duration:.1f} сек + +СТАТИСТИКА: +{'='*40} +Всего строк в БД: {stats.get('total_rows', 0)} +Таблиц обработано: {stats.get('total_tables', 0)} +Последняя репликация: {stats.get('last_replication', 'Нет данных')} + +Миграция выполнена успешно! +""" + + return self.send_email(subject, body) + + def send_start_notification(self, tables: List[str], full_reload: bool): + """Отправить уведомление о начале миграции""" + subject = f"НАЧАЛО МИГРАЦИИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}" + + body = f""" +НАЧАЛО МИГРАЦИИ ДАННЫХ +{'='*60} + +Время старта: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +Режим: {'ПОЛНАЯ ПЕРЕЗАГРУЗКА' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'} +Таблиц для обработки: {len(tables)} + +СПИСОК ТАБЛИЦ: +{chr(10).join([f' • {t}' for t in tables])} + +Миграция запущена... +""" + + return self.send_email(subject, body) + + +email_sender = EmailSender() \ No newline at end of file diff --git a/app/utils/index_helpers.py b/app/utils/index_helpers.py new file mode 100644 index 0000000..7abe4bc --- /dev/null +++ b/app/utils/index_helpers.py @@ -0,0 +1,153 @@ +from typing import Optional, List, Dict, Any +import pandas as pd +from app.core.database import db_connector +from app.core.logging import migration_logger + + +def get_primary_key(table_name: str) -> Optional[str]: + """Получить колонку первичного ключа""" + try: + query = f""" + SELECT TOP 1 + c.name as column_name + FROM sys.indexes i + INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id + INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id + WHERE i.object_id = OBJECT_ID('{table_name}') + AND i.is_primary_key = 1 + """ + + pk_df = pd.read_sql_query(query, db_connector.src_engine) + + if not pk_df.empty: + pk_column = pk_df.iloc[0]['column_name'] + migration_logger.info(f"PRIMARY KEY для {table_name}: {pk_column}") + return pk_column + + migration_logger.warning(f"В {table_name} нет PRIMARY KEY, ищем ID колонку...") + + columns_query = f""" + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = '{table_name}' + """ + + columns_df = pd.read_sql_query(columns_query, db_connector.src_engine) + + id_keywords = ['ID', 'Id', 'id', 'Code', 'KEY'] + for col in columns_df['COLUMN_NAME']: + for keyword in id_keywords: + if keyword in col: + migration_logger.info(f"ID колонка для {table_name}: {col}") + return col + + if not columns_df.empty: + first_col = columns_df.iloc[0]['COLUMN_NAME'] + migration_logger.warning(f"Используем первую колонку для {table_name}: {first_col}") + return first_col + + return None + + except Exception as e: + migration_logger.error(f"Ошибка поиска PK для {table_name}: {e}") + return None + + +def get_foreign_keys(table_name: str) -> List[Dict[str, str]]: + """Получить внешние ключи без проверки связанных таблиц""" + try: + query = f""" + SELECT + fk.name as fk_name, + pc.name as parent_column, + rc.name as referenced_column, + rt.name as referenced_table + FROM sys.foreign_keys fk + INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id + INNER JOIN sys.columns pc ON fkc.parent_object_id = pc.object_id AND fkc.parent_column_id = pc.column_id + INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id + INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id + WHERE fk.parent_object_id = OBJECT_ID('{table_name}') + ORDER BY fk.name, fkc.constraint_column_id + """ + + fk_df = pd.read_sql_query(query, db_connector.src_engine) + + if fk_df.empty: + return [] + + foreign_keys = {} + for _, row in fk_df.iterrows(): + fk_name = row['fk_name'] + if fk_name not in foreign_keys: + foreign_keys[fk_name] = { + 'name': fk_name, + 'parent_column': row['parent_column'], + 'referenced_table': row['referenced_table'], + 'referenced_column': row['referenced_column'] + } + + result = list(foreign_keys.values()) + migration_logger.info(f"Найдено {len(result)} внешних ключей для {table_name}") + return result + + except Exception as e: + migration_logger.error(f"Ошибка получения внешних ключей для {table_name}: {e}") + return [] + + +def get_indexes(table_name: str) -> List[Dict[str, Any]]: + """Получить индексы таблицы""" + try: + query = f""" + SELECT + i.name as index_name, + i.is_unique, + c.name as column_name + FROM sys.indexes i + INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id + INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id + WHERE i.object_id = OBJECT_ID('{table_name}') + AND i.is_primary_key = 0 + ORDER BY i.name, ic.key_ordinal + """ + + idx_df = pd.read_sql_query(query, db_connector.src_engine) + + if idx_df.empty: + return [] + + indexes = {} + for _, row in idx_df.iterrows(): + idx_name = row['index_name'] + if idx_name not in indexes: + indexes[idx_name] = { + 'name': idx_name, + 'unique': bool(row['is_unique']), + 'columns': [] + } + indexes[idx_name]['columns'].append(row['column_name']) + + result = list(indexes.values()) + migration_logger.info(f"Найдено {len(result)} индексов для {table_name}") + return result + + except Exception as e: + migration_logger.error(f"Ошибка получения индексов для {table_name}: {e}") + return [] + + +def get_max_id_from_postgres(table_name: str, id_column: str) -> Optional[int]: + """Получить максимальный ID из PostgreSQL""" + try: + query = f'SELECT MAX("{id_column}") as max_id FROM "{table_name.lower()}"' + df = pd.read_sql_query(query, db_connector.dst_engine) + + if not df.empty and df.iloc[0]['max_id'] is not None: + return int(df.iloc[0]['max_id']) + + return 0 + + except Exception as e: + migration_logger.error(f"Ошибка получения max ID из PG для {table_name}: {e}") + return 0 \ No newline at end of file diff --git a/app/utils/type_mapper.py b/app/utils/type_mapper.py new file mode 100644 index 0000000..a2a1b08 --- /dev/null +++ b/app/utils/type_mapper.py @@ -0,0 +1,45 @@ +from typing import Optional + + +def mssql_to_postgres_type(mssql_type: str, max_length: Optional[int] = None) -> str: + """Преобразование типа MSSQL в PostgreSQL""" + mssql_type = mssql_type.lower() + + type_map = { + 'int': 'INTEGER', + 'bigint': 'BIGINT', + 'smallint': 'SMALLINT', + 'tinyint': 'SMALLINT', + 'bit': 'BOOLEAN', + 'float': 'DOUBLE PRECISION', + 'real': 'REAL', + 'decimal': 'NUMERIC', + 'numeric': 'NUMERIC', + 'money': 'NUMERIC(19,4)', + 'smallmoney': 'NUMERIC(10,4)', + 'datetime': 'TIMESTAMP', + 'datetime2': 'TIMESTAMP', + 'smalldatetime': 'TIMESTAMP', + 'date': 'DATE', + 'time': 'TIME', + 'char': f'CHAR({max_length})' if max_length else 'CHAR(1)', + 'nchar': f'CHAR({max_length})' if max_length else 'CHAR(1)', + 'varchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT', + 'nvarchar': f'VARCHAR({max_length})' if max_length and max_length < 8000 else 'TEXT', + 'text': 'TEXT', + 'ntext': 'TEXT', + 'uniqueidentifier': 'UUID', + 'timestamp': 'BYTEA', + 'rowversion': 'BYTEA', + 'binary': 'BYTEA', + 'varbinary': 'BYTEA', + 'image': 'BYTEA', + 'xml': 'XML', + 'json': 'JSONB', + 'sql_variant': 'TEXT', + 'geometry': 'TEXT', + 'geography': 'TEXT', + 'hierarchyid': 'VARCHAR(255)' + } + + return type_map.get(mssql_type, 'TEXT') \ No newline at end of file diff --git a/req.txt b/req.txt new file mode 100644 index 0000000..7361d15 --- /dev/null +++ b/req.txt @@ -0,0 +1,47 @@ +aiofiles==25.1.0 +aiohappyeyeballs==2.6.1 +aiohttp==3.13.3 +aiosignal==1.4.0 +annotated-doc==0.0.4 +annotated-types==0.7.0 +anyio==4.12.1 +attrs==25.4.0 +click==8.3.1 +dnspython==2.8.0 +email-validator==2.3.0 +fastapi==0.135.1 +frozenlist==1.8.0 +greenlet==3.3.2 +h11==0.16.0 +httptools==0.7.1 +idna==3.11 +izulu==0.50.0 +multidict==6.7.1 +numpy==2.4.2 +packaging==26.0 +pandas==3.0.1 +propcache==0.4.1 +psycopg2-binary==2.9.11 +pycron==3.2.0 +pydantic==2.12.5 +pydantic-settings==2.13.1 +pydantic_core==2.41.5 +pymssql==2.3.13 +python-dateutil==2.9.0.post0 +python-dotenv==1.2.2 +python-multipart==0.0.22 +PyYAML==6.0.3 +redis==7.2.1 +six==1.17.0 +SQLAlchemy==2.0.48 +starlette==0.52.1 +taskiq==0.12.1 +taskiq-dependencies==1.5.7 +taskiq-redis==1.2.2 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +uvicorn==0.41.0 +uvloop==0.22.1 +watchfiles==1.1.1 +websockets==16.0 +yarl==1.23.0 diff --git a/run.py b/run.py new file mode 100644 index 0000000..1ba8111 --- /dev/null +++ b/run.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Точка входа для запуска FastAPI приложения +""" + +import uvicorn +from app.core.config import settings + +if __name__ == "__main__": + uvicorn.run( + "app.main:app", + host="0.0.0.0", + port=8000, + reload=settings.DEBUG, + log_level="info" + ) \ No newline at end of file