From de2dd82fa1a91f7b322f7018b5d25b3d9d2fa32f Mon Sep 17 00:00:00 2001 From: brusnitsyn Date: Fri, 13 Mar 2026 17:11:39 +0900 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=B5=D0=B1=D0=BE=D0=BB=D1=8C=D1=88?= =?UTF-8?q?=D0=B8=D0=B5=20=D0=B8=D0=B7=D0=BC=D0=B5=D0=BD=D0=B5=D0=BD=D0=B8?= =?UTF-8?q?=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/routes.py | 269 +++++++++----------- app/core/config.py | 5 + app/core/database.py | 62 ++++- app/main.py | 4 +- app/models/replication.py | 60 +++-- app/repository/replication_metadata_repo.py | 195 ++++++++------ app/scheduler.py | 68 +++-- app/services/batch_runner.py | 125 +++++++++ app/services/migrator.py | 72 +++--- app/services/replication_state.py | 24 +- app/services/scheduler.py | 108 ++------ app/services/task_tracker.py | 56 ++++ app/taskiq/broker.py | 219 ++++++++++------ app/taskiq/tasks.py | 6 + app/utils/email_sender.py | 219 +++++++++++++++- req.txt | 1 + run_scheduler.py | 68 +++++ run_worker.py | 70 +++++ 18 files changed, 1140 insertions(+), 491 deletions(-) create mode 100644 app/services/batch_runner.py create mode 100644 app/services/task_tracker.py create mode 100644 app/taskiq/tasks.py create mode 100644 run_scheduler.py create mode 100644 run_worker.py diff --git a/app/api/routes.py b/app/api/routes.py index 986c368..1e6d589 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -13,27 +13,6 @@ from app.taskiq.broker import refresh_schedules router = APIRouter(prefix="/api/v1") - -@router.post("/migrate/start") -async def start_migration(background_tasks: BackgroundTasks, full_reload: bool = False): - """Запуск миграции""" - if migrator.is_running: - raise HTTPException(status_code=400, detail="Миграция уже выполняется") - - background_tasks.add_task(run_migration_task, full_reload) - return {"message": "Миграция запущена", "full_reload": full_reload} - - -@router.post("/migrate/stop") -async def stop_migration(): - migrator.stop_migration() - return {"message": "Миграция останавливается"} - - -@router.get("/migrate/status") -async def get_status(): - return migrator.get_status() - @router.get("/replication/last") async def get_last_replication(): """Получить информацию о последней репликации (максимальное время по всем таблицам)""" @@ -64,14 +43,14 @@ async def get_tables_status(): } -@router.get("/replication/tables/{table_name}") -async def get_table_status(table_name: str): +@router.get("/replication/tables/{metadata_id}") +async def get_table_status(metadata_id: int): """Получить статус конкретной таблицы""" from app.repository.replication_metadata_repo import replication_metadata_repo - metadata = replication_metadata_repo.get_table_metadata(table_name) + metadata = replication_metadata_repo.get_table_metadata(metadata_id) if not metadata: - raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена") + raise HTTPException(status_code=404, detail=f"Метадата с ID={metadata_id} не найдена") return { "table": metadata.table_name, @@ -85,29 +64,34 @@ async def get_table_status(table_name: str): } -@router.post("/replication/tables/{table_name}/reset") -async def reset_table(table_name: str): - """Сбросить состояние таблицы (обнулить last_id и last_sync_time)""" - from app.repository.replication_metadata_repo import replication_metadata_repo - - session = replication_metadata_repo.get_session() +@router.post("/replication/tables") +async def add_metadata( + table_name: str = Query(None, description="Наименование таблицы"), + life_table_name: Optional[str] = Query(None, description="Наименование Life таблицы"), + description: Optional[str] = Query(None, description="Описание"), + enabled: bool = Query(True, description="Включено"), +): + """Добавить метадату""" try: - metadata = session.query(ReplicationMetadata).filter_by(table_name=table_name).first() + from app.repository.replication_metadata_repo import replication_metadata_repo + + metadata = replication_metadata_repo.add_metadata( + table_name=table_name, + life_table_name=life_table_name, + description=description, + enabled=enabled, + ) + if metadata: - metadata.last_id = 0 - metadata.last_sync_time = datetime(1900, 1, 1) - metadata.total_rows = 0 - metadata.last_error = None - metadata.updated_at = datetime.now() - session.commit() - - migration_logger.info(f"Сброшено состояние таблицы {table_name}") - return {"message": f"Состояние таблицы {table_name} сброшено"} + return { + "message": f"Метадата для таблицы {metadata.table_name} создана", + "metadata": metadata + } else: - raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена") - finally: - session.close() - + raise HTTPException(status_code=500, detail="Ошибка создания метадаты") + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) @router.get("/replication/logs") async def get_replication_logs( @@ -146,26 +130,6 @@ async def get_replication_logs( finally: session.close() -@router.post("/test-email") -async def test_email(): - """Тест отправки email""" - success = email_sender.send_email( - subject="Тестовое письмо", - body=f"Это тестовое письмо от Migration Service\n\nВремя: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - ) - if success: - return {"message": "Тестовое письмо отправлено"} - else: - raise HTTPException(status_code=500, detail="Ошибка отправки письма") - - -def run_migration_task(full_reload: bool): - try: - migrator.run_migration(full_reload=full_reload) - except Exception as e: - migration_logger.error("Ошибка в фоновой задаче", e) - - # ==================== РАСПИСАНИЯ ==================== @router.get("/schedules") @@ -203,10 +167,10 @@ async def get_next_runs(limit: int = 10): if time_diff <= 1 and check_weekday in schedule.days_list: # Получаем статистику таблицы - metadata = replication_metadata_repo.get_table_metadata(schedule.table_name) + metadata = schedule.table runs.append({ - 'table': schedule.table_name, + 'table': metadata.table_name, 'time': check_time.strftime('%Y-%m-%d %H:%M'), 'day': check_time.strftime('%A'), 'days_schedule': schedule.days_display, @@ -224,37 +188,37 @@ async def get_next_runs(limit: int = 10): return runs[:limit] -@router.post("/schedules/run-now") -async def run_scheduled_now(background_tasks: BackgroundTasks): - """Принудительно запустить все запланированные на текущее время миграции""" - due = scheduler.get_due_tables() - if not due: - return {'message': 'Нет таблиц для миграции в текущее время и день'} +# @router.post("/schedules/run-now") +# async def run_scheduled_now(background_tasks: BackgroundTasks): +# """Принудительно запустить все запланированные на текущее время миграции""" +# due = scheduler.get_due_tables() +# if not due: +# return {'message': 'Нет таблиц для миграции в текущее время и день'} - for schedule in due: - background_tasks.add_task( - run_scheduled_migration, - schedule.table_name, - schedule.full_reload - ) +# for schedule in due: +# background_tasks.add_task( +# run_scheduled_migration, +# schedule.table.table_name, +# schedule.full_reload +# ) - return { - 'message': f'Запущено {len(due)} миграций', - 'tables': [ - { - 'name': s.table_name, - 'time': s.schedule_time.strftime("%H:%M"), - 'days': s.days_display, - 'full_reload': s.full_reload - } - for s in due - ] - } +# return { +# 'message': f'Запущено {len(due)} миграций', +# 'tables': [ +# { +# 'name': s.table.table_name, +# 'time': s.schedule_time.strftime("%H:%M"), +# 'days': s.days_display, +# 'full_reload': s.full_reload +# } +# for s in due +# ] +# } @router.post("/schedules/{table_name}") async def set_schedule( - table_name: str, + metadata_id: int = Query('ID', description="ID метадаты таблицы"), schedule_time: str = Query("00:00", description="Время в формате HH:MM"), days: Optional[str] = Query(None, description="Дни недели через запятую: пн,вт,ср,чт,пт,сб,вс"), full_reload: bool = Query(False, description="Полная перезагрузка"), @@ -264,8 +228,8 @@ async def set_schedule( ): """Добавить новое расписание для таблицы""" try: - if table_name not in settings.TABLES_TO_COPY: - raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена") + # if table_name not in settings.TABLES_TO_COPY: + # raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена") days_list = None if days: @@ -274,7 +238,7 @@ async def set_schedule( from app.repository.replication_metadata_repo import replication_metadata_repo schedule = replication_metadata_repo.add_schedule( - table_name=table_name, + metadata_id=metadata_id, schedule_time=schedule_time, days=days_list, full_reload=full_reload, @@ -286,7 +250,7 @@ async def set_schedule( if schedule: await refresh_schedules() return { - "message": f"Расписание добавлено для {table_name} в {schedule_time}", + "message": f"Расписание добавлено", "schedule": schedule } else: @@ -295,19 +259,25 @@ async def set_schedule( except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) -@router.get("/schedules/{table_name}") -async def get_table_schedule(table_name: str): +@router.get("/schedules/{metadata_id}") +async def get_table_schedule(metadata_id: int): """Получить расписание для конкретной таблицы""" - schedule = replication_metadata_repo.get_schedule(table_name) - if not schedule: + schedules = replication_metadata_repo.get_schedule(metadata_id) + if not schedules: raise HTTPException(status_code=404, detail="Расписание не найдено") # Получаем статистику таблицы - metadata = replication_metadata_repo.get_table_metadata(table_name) + metadata = replication_metadata_repo.get_table_metadata(metadata_id) - result = schedule.to_dict() + result = {} + result['schedules'] = [] + + for schedule in schedules: + result['schedules'].append(schedule) + if metadata: result['table_stats'] = { + 'table_name': metadata.table_name, 'rows_count': metadata.total_rows, 'last_sync': metadata.last_sync_time.isoformat() if metadata.last_sync_time else None, 'last_id': metadata.last_id @@ -350,72 +320,71 @@ async def update_schedule( else: raise HTTPException(status_code=404, detail=f"Расписание {schedule_id} не найдено") -@router.post("/schedules/{table_name}/disable") -async def disable_schedule(table_name: str): +@router.post("/schedules/{schedule_id}/disable") +async def disable_schedule(schedule_id: int): """Отключить расписание""" - success = replication_metadata_repo.disable_schedule(table_name) + success = replication_metadata_repo.disable_schedule(schedule_id) if success: await refresh_schedules() - return {'message': f'Расписание для {table_name} отключено'} + return {'message': f'Расписание отключено'} else: - raise HTTPException(status_code=404, detail=f"Расписание для {table_name} не найдено") + raise HTTPException(status_code=404, detail=f"Расписание с ID={schedule_id} не найдено") -@router.post("/schedules/{table_name}/enable") -async def enable_schedule(table_name: str): +@router.post("/schedules/{schedule_id}/enable") +async def enable_schedule(schedule_id: int): """Включить расписание""" - success = replication_metadata_repo.enable_schedule(table_name) + success = replication_metadata_repo.enable_schedule(schedule_id) if success: await refresh_schedules() - return {'message': f'Расписание для {table_name} включено'} + return {'message': f'Расписание включено'} else: - raise HTTPException(status_code=404, detail=f"Расписание для {table_name} не найдено") + raise HTTPException(status_code=404, detail=f"Расписание с ID={schedule_id} не найдено") # ==================== Фоновые задачи ==================== -def run_migration_task(full_reload: bool): - """Фоновая задача для миграции всех таблиц""" - try: - migrator.run_migration(full_reload=full_reload) - except Exception as e: - migration_logger.error(f"Ошибка в фоновой задаче: {e}") - migration_logger.exception(e) +# def run_migration_task(full_reload: bool): +# """Фоновая задача для миграции всех таблиц""" +# try: +# migrator.run_migration(full_reload=full_reload) +# except Exception as e: +# migration_logger.error(f"Ошибка в фоновой задаче: {e}") -def run_scheduled_migration(table_name: str, full_reload: bool): - """Фоновая задача для запланированной миграции одной таблицы""" - try: - migration_logger.info(f"Запуск запланированной миграции для {table_name}") +# def run_scheduled_migration(table_name: str, full_reload: bool): +# """Фоновая задача для запланированной миграции одной таблицы""" +# try: +# migration_logger.info(f"Запуск запланированной миграции для {table_name}") - migrator.run_migration( - tables=[table_name], - full_reload=full_reload, - send_email=True - ) +# migrator.run_migration( +# tables=[table_name], +# full_reload=full_reload, +# send_email=True +# ) - # Обновляем время последнего запуска в расписании - replication_metadata_repo.update_schedule_last_run(table_name) +# # Обновляем время последнего запуска в расписании +# replication_metadata_repo.update_schedule_last_run(table_name) - # Логируем успешный запуск - replication_metadata_repo.log_operation( - table_name=table_name, - operation='SCHEDULED', - records_count=0, - status='SUCCESS' - ) +# # Логируем успешный запуск +# replication_metadata_repo.log_operation( +# table_name=table_name, +# operation='SCHEDULED', +# records_count=0, +# status='SUCCESS' +# ) - migration_logger.info(f"Запланированная миграция для {table_name} завершена") +# migration_logger.info(f"Запланированная миграция для {table_name} завершена") - except Exception as e: - error_msg = f"Ошибка в запланированной миграции для {table_name}: {e}" - migration_logger.error(error_msg) - migration_logger.exception(e) +# except Exception as e: +# error_msg = f"Ошибка в запланированной миграции для {table_name}: {e}" +# migration_logger.error(error_msg) +# migration_logger.exception(e) - # Логируем ошибку - replication_metadata_repo.log_operation( - table_name=table_name, - operation='SCHEDULED', - records_count=0, - status='ERROR', - error_message=str(e)[:500] - ) \ No newline at end of file +# # Логируем ошибку +# replication_metadata_repo.log_operation( +# table_name=table_name, +# operation='SCHEDULED', +# records_count=0, +# status='ERROR', +# error_message=str(e)[:500] +# ) \ No newline at end of file diff --git a/app/core/config.py b/app/core/config.py index c7d2121..ccede1b 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -76,6 +76,11 @@ class Settings(BaseSettings): """Формирование строки подключения к PostgreSQL""" return rf'postgresql://{self.POSTGRES_USERNAME}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DATABASE}' + @property + def POSTGRES_ASYNC_CONNECTION_STRING(self) -> str: + """Формирование строки подключения к PostgreSQL""" + return rf'postgresql+asyncpg://{self.POSTGRES_USERNAME}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DATABASE}' + class Config: env_file = '.env' case_sensitive = False diff --git a/app/core/database.py b/app/core/database.py index 70d5cb5..81deafd 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -1,8 +1,9 @@ from sqlalchemy import create_engine from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker -from contextlib import contextmanager -from typing import Optional +from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine, create_async_engine, async_sessionmaker +from contextlib import asynccontextmanager, contextmanager +from typing import AsyncGenerator, Optional from app.core.config import settings @@ -12,7 +13,9 @@ class DatabaseConnector: def __init__(self): self._src_engine: Optional[Engine] = None self._dst_engine: Optional[Engine] = None + self._async_dst_engine: Optional[AsyncEngine] = None self.dst_session = None + self._async_dst_session_maker = None self.schedule_session = None @property @@ -20,7 +23,7 @@ class DatabaseConnector: """Подключение к MSSQL""" if not self._src_engine: self._src_engine = create_engine( - settings.MSSQL_CONNECTION_STRING, + settings.MSSQL_CONNECTION_STRING + "?charset=cp1251", pool_pre_ping=True, echo=settings.DEBUG ) @@ -33,10 +36,57 @@ class DatabaseConnector: self._dst_engine = create_engine( settings.POSTGRES_CONNECTION_STRING, pool_pre_ping=True, - echo=settings.DEBUG + echo=settings.DEBUG, + connect_args={ + "options": "-c client_encoding=utf8" + } ) self.dst_session = sessionmaker(bind=self._dst_engine) return self._dst_engine + + @property + def async_dst_engine(self) -> AsyncEngine: + """Асинхронное подключение к PostgreSQL (основная БД)""" + if not self._async_dst_engine: + self._async_dst_engine = create_async_engine( + settings.POSTGRES_ASYNC_CONNECTION_STRING, + pool_pre_ping=True, + echo=settings.DEBUG, + connect_args={ + "server_settings": { + "client_encoding": "UTF8" + } + } + ) + self._async_dst_session_maker = async_sessionmaker( + bind=self._async_dst_engine, + class_=AsyncSession, + expire_on_commit=False, + autocommit=False, + autoflush=False, + ) + return self._async_dst_engine + + @asynccontextmanager + async def async_dst_session(self) -> AsyncGenerator[AsyncSession, None]: + """ + Асинхронный контекстный менеджер для сессии PostgreSQL. + + Использование: + async with db_connector.async_dst_session() as session: + result = await session.execute(select(...)) + """ + self.async_dst_engine + + async with self._async_dst_session_maker() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + finally: + await session.close() @contextmanager def src_connection(self): @@ -56,12 +106,14 @@ class DatabaseConnector: finally: conn.close() - def dispose_engines(self): + async def dispose_engines(self): """Закрытие всех соединений""" if self._src_engine: self._src_engine.dispose() if self._dst_engine: self._dst_engine.dispose() + if self._async_dst_engine: + await self._async_dst_engine.dispose() # Глобальный экземпляр подключений diff --git a/app/main.py b/app/main.py index 542e7a8..1c6d520 100644 --- a/app/main.py +++ b/app/main.py @@ -10,10 +10,8 @@ from app.services.replication_state import replication_state @asynccontextmanager async def lifespan(app: FastAPI): - migration_logger.info("Запуск приложения") yield - migration_logger.info("Завершение работы") - db_connector.dispose_engines() + await db_connector.dispose_engines() app = FastAPI(title="Сервис репликации", lifespan=lifespan) diff --git a/app/models/replication.py b/app/models/replication.py index 2d3c22c..77acbe2 100644 --- a/app/models/replication.py +++ b/app/models/replication.py @@ -1,9 +1,9 @@ # app/models/replication.py from typing import List, Optional -from sqlalchemy import JSON, Column, ForeignKey, String, DateTime, BigInteger, Integer, Boolean, Time +from sqlalchemy import JSON, Column, ForeignKey, Null, String, DateTime, BigInteger, Integer, Boolean, Time from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship +from sqlalchemy.orm import relationship, Mapped, mapped_column from datetime import datetime Base = declarative_base() @@ -13,17 +13,21 @@ class ReplicationMetadata(Base): """Модель для хранения метаданных репликации""" __tablename__ = 'replication_metadata' + #__table_args__ = {"schema": "replicator"} - table_name = Column(String(100), primary_key=True, nullable=False) - last_sync_time = Column(DateTime, nullable=True) - last_id = Column(BigInteger, nullable=True) - total_rows = Column(BigInteger, default=0) - last_error = Column(String(500), nullable=True) - is_active = Column(Boolean, default=True) - created_at = Column(DateTime, default=datetime.now) - updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + id: Mapped[int] = mapped_column(Integer, primary_key=True) + table_name: Mapped[str] = mapped_column(String(100), nullable=False) + life_table_name: Mapped[str] = mapped_column(String(100), nullable=True) + description: Mapped[str] = mapped_column(String(255), nullable=True) + last_sync_time: Mapped[datetime] = mapped_column(DateTime, nullable=True) + last_id: Mapped[int] = mapped_column(BigInteger, nullable=True) + total_rows: Mapped[int] = mapped_column(BigInteger, default=0) + last_error: Mapped[str] = mapped_column(String(500), nullable=True) + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now) - schedule = relationship("ReplicationSchedule", back_populates="table", cascade="all, delete-orphan") + schedule: Mapped["ReplicationSchedule"] = relationship("ReplicationSchedule", back_populates="table", cascade="all, delete-orphan") def __repr__(self): return f"" @@ -61,26 +65,29 @@ class ReplicationMetadata(Base): self.updated_at = datetime.now() session.commit() + def has_use_life(self): + """Разрешение использовать Life таблицу для миграции""" + return self.life_table_name is not None + class ReplicationSchedule(Base): """Модель для расписания миграции таблицы""" - __tablename__ = 'replication_schedules' - id = Column(Integer, primary_key=True, autoincrement=True) - table_name = Column(String(100), ForeignKey('replication_metadata.table_name', ondelete='CASCADE'), nullable=False) - schedule_time = Column(Time, nullable=False, default=datetime.strptime("00:00", "%H:%M").time()) - days = Column(JSON, nullable=False, default=list) # Храним список дней как JSON - full_reload = Column(Boolean, default=False) - enabled = Column(Boolean, default=True) - last_run = Column(DateTime, nullable=True) - created_at = Column(DateTime, default=datetime.now) - updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + metadata_id: Mapped[int] = mapped_column(Integer, ForeignKey('replication_metadata.id', ondelete='CASCADE'), nullable=False) + schedule_time: Mapped[datetime] = mapped_column(Time, nullable=False, default=datetime.strptime("00:00", "%H:%M").time()) + days: Mapped[JSON] = mapped_column(JSON, nullable=False, default=list) # Храним список дней как JSON + full_reload: Mapped[bool] = mapped_column(Boolean, default=False) + enabled: Mapped[bool] = mapped_column(Boolean, default=True) + last_run: Mapped[datetime] = mapped_column(DateTime, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now) + updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now) - name = Column(String(100), nullable=True) - description = Column(String(500), nullable=True) + name: Mapped[str] = mapped_column(String(100), nullable=True) + description: Mapped[str] = mapped_column(String(500), nullable=True) # Связь с метаданными - table = relationship("ReplicationMetadata", back_populates="schedule") + table: Mapped[ReplicationMetadata] = relationship("ReplicationMetadata", back_populates="schedule") # Маппинг дней для обратной совместимости DAYS_MAP = { @@ -137,7 +144,7 @@ class ReplicationSchedule(Base): # Используем __dict__, но исключаем служебные поля SQLAlchemy data = { 'id': self.id, - 'table_name': self.table_name, + 'metadata_id': self.metadata_id, 'schedule_time': self.schedule_time.strftime("%H:%M") if self.schedule_time else "00:00", 'days': self.days, # Сохраняем как есть, а days_display вычислим отдельно 'full_reload': self.full_reload, @@ -146,7 +153,8 @@ class ReplicationSchedule(Base): 'name': self.name, 'description': self.description, 'created_at': self.created_at.isoformat() if self.created_at else None, - 'updated_at': self.updated_at.isoformat() if self.updated_at else None + 'updated_at': self.updated_at.isoformat() if self.updated_at else None, + 'table': self.table } # Вычисляем days_display на основе days diff --git a/app/repository/replication_metadata_repo.py b/app/repository/replication_metadata_repo.py index ed18fa8..c4c4636 100644 --- a/app/repository/replication_metadata_repo.py +++ b/app/repository/replication_metadata_repo.py @@ -2,7 +2,9 @@ from typing import Optional, List from datetime import datetime -from sqlalchemy.orm import Session +from fastapi import HTTPException +from sqlalchemy import select +from sqlalchemy.orm import Session, selectinload from app.core.database import db_connector from app.models.replication import ReplicationMetadata, ReplicationLog, ReplicationSchedule from app.core.logging import migration_logger @@ -21,25 +23,25 @@ class ReplicationMetadataRepo: Base.metadata.create_all(self.engine) migration_logger.info("Таблица replication_metadata создана/проверена") - def get_table_metadata(self, table_name: str) -> Optional[ReplicationMetadata]: + def get_table_metadata(self, metadata_id: int) -> Optional[ReplicationMetadata]: """Получает метаданные для таблицы""" session = self.SessionLocal() try: metadata = session.query(ReplicationMetadata).filter_by( - table_name=table_name + id=metadata_id ).first() # Если нет, создаем - if not metadata: - metadata = ReplicationMetadata.create_if_not_exists(session, table_name) + # if not metadata: + # metadata = ReplicationMetadata.create_if_not_exists(session, table_name) return metadata finally: session.close() - def get_last_sync_time(self, table_name: str) -> Optional[datetime]: + def get_last_sync_time(self, metadata_id: int) -> Optional[datetime]: """Получает время последней синхронизации таблицы""" - metadata = self.get_table_metadata(table_name) + metadata = self.get_table_metadata(metadata_id) return metadata.last_sync_time if metadata else None def get_last_id(self, table_name: str) -> Optional[int]: @@ -47,31 +49,29 @@ class ReplicationMetadataRepo: metadata = self.get_table_metadata(table_name) return metadata.last_id if metadata else None - def update_sync_time(self, table_name: str) -> bool: + def update_sync_time(self, schedule_id: int) -> bool: """Обновляет время синхронизации таблицы""" session = self.SessionLocal() try: - metadata = session.query(ReplicationMetadata).filter_by( - table_name=table_name + schedule = session.query(ReplicationSchedule).filter_by( + id=schedule_id ).first() - if not metadata: - metadata = ReplicationMetadata( - table_name=table_name, - last_sync_time=datetime.now(), - last_id=0 - ) - session.add(metadata) + if not schedule: + return False else: - metadata.last_sync_time = datetime.now() - metadata.updated_at = datetime.now() + datenow = datetime.now() + schedule.last_run = datenow + metadata = schedule.table + metadata.last_sync_time = datenow + metadata.updated_at = datenow session.commit() - migration_logger.debug(f"Updated sync time for {table_name} to {metadata.last_sync_time}") + migration_logger.debug(f"Updated sync time for {metadata.table_name} to {metadata.last_sync_time}") return True except Exception as e: session.rollback() - migration_logger.error(f"Error updating sync time for {table_name}: {e}") + migration_logger.error(f"Ошибка при обновлении времени синхронизации распиания с ID: {schedule_id}: {e}") return False finally: session.close() @@ -80,20 +80,16 @@ class ReplicationMetadataRepo: """Обновляет последний обработанный ID""" session = self.SessionLocal() try: - metadata = session.query(ReplicationMetadata).filter_by( + metadatas = session.query(ReplicationMetadata).filter_by( table_name=table_name - ).first() + ).all() - if not metadata: - metadata = ReplicationMetadata( - table_name=table_name, - last_sync_time=datetime.now(), - last_id=last_id - ) - session.add(metadata) + if not metadatas: + return False else: - metadata.last_id = last_id - metadata.updated_at = datetime.now() + for metadata in metadatas: + metadata.last_id = last_id + metadata.updated_at = datetime.now() session.commit() migration_logger.debug(f"Updated last_id for {table_name} to {last_id}") @@ -186,37 +182,74 @@ class ReplicationMetadataRepo: # ========== Методы для расписаний ========== - def init_default_schedules(self, table_names: List[str]): + def init_default_schedules(self, metadatas: List[ReplicationMetadata]): """Инициализирует расписания по умолчанию для списка таблиц""" session = self.get_session() try: - for table_name in table_names: + init_count = 0 + for metadata in metadatas: # Проверяем, есть ли уже расписание - schedule = session.query(ReplicationSchedule).filter_by( - table_name=table_name - ).first() + query = session.query(ReplicationSchedule).filter_by( + metadata_id=metadata.id + ) + + schedule_exist = session.query(query.exists()) - if not schedule: + if not schedule_exist: + job_name = f"{metadata.table_name} - расписание по умолчанию" # Создаем расписание по умолчанию (каждый день в 00:00) schedule = ReplicationSchedule( - table_name=table_name, + metadata_id=metadata.id, + name=job_name, schedule_time=datetime.strptime("00:00", "%H:%M").time(), days=[], # Пустой список = все дни full_reload=False, - enabled=True + enabled=False ) session.add(schedule) - migration_logger.debug(f"Создано расписание по умолчанию для {table_name}") + migration_logger.debug(f"Создано расписание по умолчанию для {metadata.table_name}") + init_count += 1 session.commit() - migration_logger.info(f"Инициализированы расписания по умолчанию для {len(table_names)} таблиц") + migration_logger.info(f"Инициализированы расписания по умолчанию для {init_count} таблиц") except Exception as e: session.rollback() migration_logger.error(f"Ошибка инициализации расписаний: {e}") finally: session.close() + + def add_metadata(self, table_name: str, life_table_name: Optional[str] = None, + description: Optional[str] = None, enabled: Optional[bool] = False) -> Optional[ReplicationMetadata]: + """Добавить метадату""" + session = self.get_session() + try: + metadata_exists = session.query(ReplicationMetadata).filter_by( + table_name=table_name + ).first() + + if metadata_exists: + raise HTTPException(status_code=400, detail=f"Метадата с таблицей {metadata_exists.table_name} существует") + + metadata = ReplicationMetadata( + table_name=table_name, + life_table_name=life_table_name, + description=description, + is_active=enabled + ) + + session.add(metadata) + session.commit() + + migration_logger.info(f"Добавлена новая метадата для {metadata.table_name}") + return metadata + except Exception as e: + session.rollback() + migration_logger.error(f"Ошибка добавления расписания: {e}") + return None + finally: + session.close() - def add_schedule(self, table_name: str, schedule_time: str, days: Optional[List[str]] = None, + def add_schedule(self, metadata_id: int, schedule_time: str, days: Optional[List[str]] = None, full_reload: bool = False, enabled: bool = True, name: Optional[str] = None, description: Optional[str] = None) -> Optional[ReplicationSchedule]: """Добавить НОВОЕ расписание для таблицы""" @@ -224,33 +257,33 @@ class ReplicationMetadataRepo: try: # Проверяем, существует ли метаданные metadata = session.query(ReplicationMetadata).filter_by( - table_name=table_name + id=metadata_id ).first() if not metadata: - metadata = self._create_metadata(session, table_name) + raise HTTPException(status_code=404, detail=f"Метадата с ID {metadata_id} не найдена") # Парсим время time_obj = datetime.strptime(schedule_time, "%H:%M").time() # Создаем новое расписание schedule = ReplicationSchedule( - table_name=table_name, + metadata_id=metadata_id, schedule_time=time_obj, days=days if days else [], full_reload=full_reload, enabled=enabled, - name=name or f"{schedule_time} - {'полная' if full_reload else 'инкремент'}", + name=name or f"{metadata.table_name} - {'полная' if full_reload else 'инкремент'}", description=description ) session.add(schedule) session.commit() - migration_logger.info(f"Добавлено новое расписание для {table_name} в {schedule_time}") + migration_logger.info(f"Добавлено новое расписание для {metadata.table_name} в {schedule_time}") return schedule.to_dict() except Exception as e: session.rollback() - migration_logger.error(f"Ошибка добавления расписания для {table_name}: {e}") + migration_logger.error(f"Ошибка добавления расписания: {e}") return None finally: session.close() @@ -266,7 +299,7 @@ class ReplicationMetadataRepo: if schedule: session.delete(schedule) session.commit() - migration_logger.info(f"Удалено расписание ID={schedule_id} для {schedule.table_name}") + migration_logger.info(f"Удалено расписание ID={schedule_id} для {schedule.table.table_name}") return True return False except Exception as e: @@ -303,12 +336,12 @@ class ReplicationMetadataRepo: finally: session.close() - def get_schedule(self, table_name: str) -> List[ReplicationSchedule]: + def get_schedule(self, metadata_id: int) -> List[ReplicationSchedule]: """Получает расписание для таблицы""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( - table_name=table_name + metadata_id=metadata_id ).all() return schedule finally: @@ -318,7 +351,24 @@ class ReplicationMetadataRepo: """Получает все расписания""" session = self.get_session() try: - return session.query(ReplicationSchedule).all() + schedules = session.execute( + select(ReplicationSchedule) + .options(selectinload(ReplicationSchedule.table)) + ).scalars().all() + + # Явно пометить сессию как не expiring (чтобы объекты не expire после commit) + for schedule in schedules: + session.expunge(schedule) # Удалить из сессии, но сохранить данные + + return schedules + finally: + session.close() + + def get_all_metadata(self) -> List[ReplicationMetadata]: + """Получает все метадаты""" + session = self.get_session() + try: + return session.query(ReplicationMetadata).all() finally: session.close() @@ -351,12 +401,12 @@ class ReplicationMetadataRepo: finally: session.close() - def update_schedule_last_run(self, table_name: str) -> bool: + def update_schedule_last_run(self, schedule_id: int) -> bool: """Обновляет время последнего запуска расписания""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( - table_name=table_name + id=schedule_id ).first() if schedule: @@ -367,51 +417,51 @@ class ReplicationMetadataRepo: return False except Exception as e: session.rollback() - migration_logger.error(f"Ошибка обновления last_run для {table_name}: {e}") + migration_logger.error(f"Ошибка обновления last_run: {e}") return False finally: session.close() - def disable_schedule(self, table_name: str) -> bool: + def disable_schedule(self, schedule_id: int) -> bool: """Отключает расписание""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( - table_name=table_name + id=schedule_id ).first() if schedule: schedule.enabled = False schedule.updated_at = datetime.now() session.commit() - migration_logger.info(f"Отключено расписание для {table_name}") + migration_logger.info(f"Отключено расписание ID={schedule_id} для {schedule.table.table_name}") return True return False except Exception as e: session.rollback() - migration_logger.error(f"Ошибка отключения расписания для {table_name}: {e}") + migration_logger.error(f"Ошибка отключения расписания: {e}") return False finally: session.close() - def enable_schedule(self, table_name: str) -> bool: + def enable_schedule(self, schedule_id: int) -> bool: """Включает расписание""" session = self.get_session() try: schedule = session.query(ReplicationSchedule).filter_by( - table_name=table_name + id=schedule_id ).first() if schedule: schedule.enabled = True schedule.updated_at = datetime.now() session.commit() - migration_logger.info(f"Включено расписание для {table_name}") + migration_logger.info(f"Включено расписание ID={schedule_id} для {schedule.table.table_name}") return True return False except Exception as e: session.rollback() - migration_logger.error(f"Ошибка включения расписания для {table_name}: {e}") + migration_logger.error(f"Ошибка включения расписания: {e}") return False finally: session.close() @@ -428,15 +478,16 @@ class ReplicationMetadataRepo: session = self.get_session() try: - metadata = session.query(ReplicationMetadata).filter_by( + metadatas = session.query(ReplicationMetadata).filter_by( table_name=table_name - ).first() - - if metadata: - metadata.total_rows = stats['total_rows'] - metadata.last_id = stats['max_id'] - metadata.updated_at = datetime.now() - session.commit() + ).all() + + if metadatas: + for metadata in metadatas: + metadata.total_rows = stats['total_rows'] + metadata.last_id = stats['max_id'] + metadata.updated_at = datetime.now() + session.commit() migration_logger.info(f"Синхронизирована статистика {table_name}: {stats['total_rows']} строк, max_id={stats['max_id']}") else: diff --git a/app/scheduler.py b/app/scheduler.py index 47dc5d5..6a3090a 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -2,7 +2,7 @@ import asyncio import redis.asyncio as redis from taskiq import ScheduledTask -from app.taskiq.broker import broker, scheduler, schedule_source, migrate_table_task +from app.taskiq.broker import broker, scheduler, schedule_source from app.core.logging import migration_logger REDIS_URL = "redis://127.0.0.1:6379" @@ -33,35 +33,53 @@ async def sync_schedules_to_redis(): await r.delete(*keys) await r.close() + # Задача для проверки расписания + checker_task = ScheduledTask( + task_name="check_schedules_task", + labels={}, + cron="* * * * *", # ← Каждую минуту + cron_offset='Asia/Tokyo', + args=[], + kwargs={} + ) + await schedule_source.add_schedule(checker_task) + # 3. Добавляем каждое расписание через add_schedule - added = 0 - for schedule in schedules: - hour = schedule.schedule_time.hour - minute = schedule.schedule_time.minute - days_list = schedule.days_list # список чисел 0..6 + # added = 0 + # for schedule in schedules: + # hour = schedule.schedule_time.hour + # minute = schedule.schedule_time.minute + # days_list = schedule.days_list # список чисел 0..6 - # Формируем cron-выражение - if days_list: - cron_expr = f"{minute} {hour} * * {','.join(map(str, days_list))}" - else: - cron_expr = f"{minute} {hour} * * *" + # # Формируем cron-выражение + # if days_list: + # cron_expr = f"{minute} {hour} * * {','.join(map(str, days_list))}" + # else: + # cron_expr = f"{minute} {hour} * * *" - # Используем timezone string для cron_offset (UTC+9 = Asia/Tokyo) - task = ScheduledTask( - task_name="migrate_table_task", - labels={}, - cron=cron_expr, - cron_offset='Asia/Tokyo', # Timezone name для UTC+9 - args=[schedule.table_name], - kwargs={"full_reload": schedule.full_reload} - ) + # # Используем timezone string для cron_offset (UTC+9 = Asia/Tokyo) + # task = ScheduledTask( + # task_name="migrate_table_task", + # labels={}, + # cron=cron_expr, + # cron_offset='Asia/Tokyo', # Timezone name для UTC+9 + # args=[], + # kwargs={ + # "table_name": schedule.table.table_name, + # "schedule_id": schedule.id, + # "metadata_id": schedule.metadata_id, + # "life_table_name": schedule.table.life_table_name, + # "uses_life": schedule.table.has_use_life(), + # "full_reload": schedule.full_reload + # } + # ) - # Добавляем в Redis через источник - await schedule_source.add_schedule(task) - added += 1 - migration_logger.info(f"Добавлено: {schedule.table_name} в {hour:02d}:{minute:02d} (cron: {cron_expr})") + # # Добавляем в Redis через источник + # await schedule_source.add_schedule(task) + # added += 1 + # migration_logger.info(f"Добавлено: {schedule.table.table_name} в {hour:02d}:{minute:02d} (cron: {cron_expr})") - migration_logger.info(f"Синхронизировано {added} активных расписаний") + # migration_logger.info(f"Синхронизировано {added} активных расписаний") finally: session.close() diff --git a/app/services/batch_runner.py b/app/services/batch_runner.py new file mode 100644 index 0000000..d7fb004 --- /dev/null +++ b/app/services/batch_runner.py @@ -0,0 +1,125 @@ +# app/services/batch_runner.py +from datetime import datetime, timedelta +from typing import List, Dict, Optional +import uuid +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.replication import ReplicationSchedule +from app.core.logging import migration_logger +from app.services.task_tracker import task_tracker +from app.taskiq.broker import migrate_table_task + + +async def get_tables_to_run(session: AsyncSession, check_time: Optional[datetime] = None) -> List[Dict]: + """ + Получить список таблиц, которые должны запуститься сейчас. + + Возвращает список словарей с параметрами для задачи миграции. + """ + if check_time is None: + check_time = datetime.now() + + current_hour = check_time.hour + current_minute = check_time.minute + current_weekday = check_time.weekday() # 0=Monday, 6=Sunday + + migration_logger.info( + f"Поиск расписаний для запуска: " + f"время={check_time.strftime('%H:%M')}, день недели={current_weekday}" + ) + + # Загружаем все активные расписания с связанными метаданными + from sqlalchemy.orm import selectinload + + result = await session.execute( + select(ReplicationSchedule) + .options(selectinload(ReplicationSchedule.table)) + .where(ReplicationSchedule.enabled == True) + ) + schedules = result.scalars().all() + + tables_to_run = [] + + for schedule in schedules: + # Проверяем день недели + if current_weekday not in schedule.days_list: + migration_logger.debug(f"Пропуск {schedule.id}: не сегодня (days={schedule.days_list})") + continue + + # Проверяем время + schedule_hour = schedule.schedule_time.hour + schedule_minute = schedule.schedule_time.minute + + time_match = ( + current_hour == schedule_hour and + current_minute == schedule_minute + ) + + if not time_match: + migration_logger.debug( + f"Пропуск {schedule.id}: время не совпадает " + f"({schedule_hour}:{schedule_minute} vs {current_hour}:{current_minute})" + ) + continue + + # Порог для last_run (5 минут назад) + recent_threshold = check_time - timedelta(minutes=5) + + if schedule.last_run and schedule.last_run >= recent_threshold: + migration_logger.debug(f"⏭️ Пропуск {schedule.id}: последний запуск {schedule.last_run}") + continue + + # Таблица должна запуститься сейчас + metadata = schedule.table + tables_to_run.append({ + "table_name": str(metadata.table_name) if metadata.table_name else "", + "schedule_id": int(schedule.id) if schedule.id else 0, + "metadata_id": int(schedule.metadata_id) if schedule.metadata_id else 0, + "life_table_name": str(metadata.life_table_name) if metadata.life_table_name else None, + "uses_life": bool(metadata.life_table_name is not None), + "full_reload": bool(schedule.full_reload) if schedule.full_reload is not None else False, + }) + + migration_logger.info(f"Добавлено в батч: {metadata.table_name} (schedule_id={schedule.id})") + + migration_logger.info(f"Всего таблиц для запуска: {len(tables_to_run)}") + return tables_to_run + +async def run_migration_batch(tables: List[Dict], batch_id: Optional[str] = None) -> Optional[str]: + """ + Запустить миграцию нескольких таблиц в одном батче. + + Возвращает batch_id для отслеживания. + """ + if not tables: + migration_logger.warning("Нет таблиц для запуска") + return None + + if batch_id is None: + batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" + + migration_logger.info(f"Запуск батча {batch_id} ({len(tables)} таблиц)") + + # Инициализируем трекер + await task_tracker.init_batch(batch_id, len(tables)) + + # Запускаем все задачи + for i, table_config in enumerate(tables): + try: + await migrate_table_task.kiq( + **table_config, + batch_id=batch_id, + send_email=True # Последняя задача отправит email + ) + migration_logger.debug(f"Задача {i+1}/{len(tables)} отправлена: {table_config['table_name']}") + except Exception as e: + migration_logger.error(f"Ошибка отправки задачи {table_config['table_name']}: {e}") + # Отмечаем как выполненную с ошибкой + await task_tracker.mark_completed( + batch_id, + {"success": False, "table": table_config["table_name"], "error": str(e)} + ) + + migration_logger.info(f"Батч {batch_id} запущен") + return batch_id \ No newline at end of file diff --git a/app/services/migrator.py b/app/services/migrator.py index 77f4364..f27c18d 100644 --- a/app/services/migrator.py +++ b/app/services/migrator.py @@ -3,6 +3,7 @@ from typing import Optional, List, Dict, Any import traceback from datetime import datetime import pandas as pd +from app.models.replication import ReplicationSchedule from app.services.replication_state import replication_state from app.services.data_reader import data_reader from app.services.data_writer import data_writer @@ -88,13 +89,12 @@ class DatabaseMigrator: parsed = self._parse_table_name(table_name) return f"{parsed['basename']}ID" - def migrate_table_by_time(self, table_name: str, last_sync_time: datetime) -> Dict[str, int]: + def migrate_table_by_time(self, table_name: str, life_table_name: str, last_sync_time: datetime) -> Dict[str, int]: """Миграция таблицы через Life-механизм по времени""" - life_table = self._get_life_table_name(table_name) base_id_field = self._get_base_id_field(table_name) life_id_field = self._get_life_id_field(table_name) - migration_logger.info(f"Миграция {table_name} через {life_table} с {last_sync_time}") + migration_logger.info(f"Миграция {table_name} через {life_table_name} с {last_sync_time}") stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0} @@ -105,12 +105,12 @@ class DatabaseMigrator: SELECT {base_id_field}, MAX({life_id_field}) as MaxLifeID - FROM {life_table} + FROM {life_table_name} WHERE x_DateTime > CAST(? AS datetime) GROUP BY {base_id_field} ) SELECT dl.* - FROM {life_table} dl + FROM {life_table_name} dl INNER JOIN LatestLife ll ON dl.{life_id_field} = ll.MaxLifeID """ @@ -211,28 +211,24 @@ class DatabaseMigrator: return result - def migrate_table(self, table_name: str, full_reload: bool = False) -> bool: + def migrate_table(self, table_name: str, schedule_id: int, metadata_id: int, life_table_name: Optional[str], uses_life: bool = False, full_reload: bool = False) -> bool: """Миграция одной таблицы (поддерживает и ID, и Life)""" migration_logger.table_start(table_name) self.current_table = table_name - table_start_time = datetime.now() try: # Получаем ID колонку для статистики id_column = get_primary_key(table_name) - - # Определяем, использует ли таблица Life-механизм - uses_life = table_name in self.life_tables - if uses_life and not full_reload: + if uses_life and not full_reload and life_table_name: # МИГРАЦИЯ ЧЕРЕЗ LIFE-ТАБЛИЦУ ПО ВРЕМЕНИ - last_sync = self.state.get_table_last_sync(table_name) + last_sync = self.state.get_table_last_sync(metadata_id) if last_sync: - stats = self.migrate_table_by_time(table_name, last_sync) + stats = self.migrate_table_by_time(table_name, life_table_name, last_sync) # Обновляем время синхронизации - self.state.update_table_sync_time(table_name) + self.state.update_table_sync_time(schedule_id) # Обновляем статистику if id_column: @@ -340,7 +336,7 @@ class DatabaseMigrator: }) return False - def _incremental_by_id(self, table_name: str) -> bool: + def _incremental_by_id(self, table_name: str, metadata) -> bool: """Инкрементальная загрузка по ID (для таблиц без Life)""" migration_logger.info(f"Инкрементальная загрузка {table_name} по ID") @@ -419,11 +415,11 @@ class DatabaseMigrator: def create_all_foreign_keys(self): """Создать все внешние ключи после завершения миграции""" if not self.all_foreign_keys: - migration_logger.info("ℹ️ Нет внешних ключей для создания") + migration_logger.info("Нет внешних ключей для создания") return migration_logger.info("="*60) - migration_logger.info("🔗 СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ") + migration_logger.info("СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ") migration_logger.info("="*60) for table_name, foreign_keys in self.all_foreign_keys.items(): @@ -448,16 +444,17 @@ class DatabaseMigrator: 'time': datetime.now() }) - def run_migration(self, tables: Optional[List[str]] = None, full_reload: bool = False, send_email: bool = True): - """Запуск миграции для всех таблиц""" + def run_migration( + self, table_name: str, schedule_id: int, metadata_id: int, + life_table_name: Optional[str] = None, uses_life: bool = False, + full_reload: bool = False, send_email: bool = True + ): + """Запуск миграции таблицы""" self.is_running = True self.start_time = datetime.now() self.all_foreign_keys = {} self.errors = [] - - if tables is None: - tables = settings.TABLES_TO_COPY - + last_replication = self.state.get_last_replication_time() migration_logger.info("="*70) @@ -465,19 +462,16 @@ class DatabaseMigrator: migration_logger.info(f"Время старта: {self.start_time}") if last_replication: migration_logger.info(f"Последняя миграция: {last_replication}") - migration_logger.info(f"Таблиц для обработки: {len(tables)}") + migration_logger.info(f"Таблица для обработки: {table_name}") migration_logger.info(f"Режим: {'ПОЛНАЯ' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'}") - migration_logger.info(f"Таблицы с Life-механизмом: {self.life_tables}") migration_logger.info("="*70) - results = {} - for i, table_name in enumerate(tables, 1): - if not self.is_running: - migration_logger.warning("Миграция остановлена пользователем") - break + if not self.is_running: + migration_logger.warning("Миграция остановлена пользователем") + return - migration_logger.info(f"\n[{i}/{len(tables)}] Обработка таблицы {table_name}") - results[table_name] = self.migrate_table(table_name, full_reload) + migration_logger.info(f"Обработка таблицы {table_name}") + results = self.migrate_table(table_name, schedule_id, metadata_id, life_table_name, uses_life, full_reload) # Создаем внешние ключи после всех таблиц self.create_all_foreign_keys() @@ -494,21 +488,20 @@ class DatabaseMigrator: self.is_running = False return results - def _log_final_stats(self, results: dict, stats: dict, total_time: float): + def _log_final_stats(self, has_migrated: bool, stats: dict, total_time: float): """Логирует финальную статистику""" migration_logger.info("="*70) migration_logger.info("ИТОГОВАЯ СТАТИСТИКА") migration_logger.info("="*70) - migration_logger.info(f"Успешно: {sum(1 for r in results.values() if r)}/{len(results)}") migration_logger.info(f"Ошибок: {len(self.errors)}") migration_logger.info(f"Всего строк в БД: {stats.get('total_rows', 0)}") migration_logger.info(f"Общее время: {total_time:.1f}с") migration_logger.info("="*70) - def _send_notification(self, results: dict, stats: dict, total_time: float): + def _send_notification(self, has_migrated: bool, stats: dict, total_time: float): """Отправляет уведомление о результате""" if self.errors: - error_body = self._build_error_email_body(results, stats, total_time) + error_body = self._build_error_email_body(has_migrated, stats, total_time) email_sender.send_email( subject=f"МИГРАЦИЯ С ОШИБКАМИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}", body=error_body @@ -516,10 +509,10 @@ class DatabaseMigrator: # else: # email_sender.send_success_notification(stats, total_time) - def _build_error_email_body(self, results: dict, stats: dict, total_time: float) -> str: + def _build_error_email_body(self, has_migrated: bool, stats: dict, total_time: float) -> str: """Строит тело письма с ошибками""" body = f""" -🚨 МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ +МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ {'='*60} Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} @@ -527,7 +520,6 @@ class DatabaseMigrator: СТАТИСТИКА: {'='*40} -Успешно: {sum(1 for r in results.values() if r)}/{len(results)} Ошибок: {len(self.errors)} Всего строк: {stats.get('total_rows', 0)} @@ -584,7 +576,7 @@ class DatabaseMigrator: # Поэтому используем отдельный метод для установки replication_state._set_table_total_rows(table_name, dst_stats['total_rows']) - migration_logger.info(f" Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}") + migration_logger.info(f"Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}") # Логируем операцию self.state.log_operation( diff --git a/app/services/replication_state.py b/app/services/replication_state.py index df68c9c..844829d 100644 --- a/app/services/replication_state.py +++ b/app/services/replication_state.py @@ -2,6 +2,7 @@ import json from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional +from app.models.replication import ReplicationSchedule from app.repository.replication_metadata_repo import replication_metadata_repo from app.core.logging import migration_logger from app.core.config import settings @@ -31,9 +32,9 @@ class ReplicationState: 'tables_count': stats['tables_count'] } - def get_table_last_sync(self, table_name: str) -> Optional[datetime]: + def get_table_last_sync(self, metadata_id: int) -> Optional[datetime]: """Получает время последней синхронизации таблицы""" - return self.replication_repo.get_last_sync_time(table_name) + return self.replication_repo.get_last_sync_time(metadata_id) def get_last_id(self, table_name: str) -> Optional[int]: """Получает последний обработанный ID для таблицы""" @@ -43,9 +44,9 @@ class ReplicationState: """Обновляет последний обработанный ID""" self.replication_repo.update_last_id(table_name, last_id) - def update_table_sync_time(self, table_name: str): + def update_table_sync_time(self, schedule_id: int): """Обновляет время синхронизации таблицы""" - self.replication_repo.update_sync_time(table_name) + self.replication_repo.update_sync_time(schedule_id) def update_table_stats(self, table_name: str, added_rows: int): """Обновляет статистику таблицы""" @@ -69,15 +70,16 @@ class ReplicationState: session = self.replication_repo.get_session() try: - metadata = session.query(ReplicationMetadata).filter_by( + metadatas = session.query(ReplicationMetadata).filter_by( table_name=table_name - ).first() + ).all() - if metadata: - metadata.total_rows = total_rows - metadata.updated_at = datetime.now() - session.commit() - migration_logger.debug(f" Установлено total_rows={total_rows} для {table_name}") + if metadatas: + for metadata in metadatas: + metadata.total_rows = total_rows + metadata.updated_at = datetime.now() + session.commit() + migration_logger.debug(f"Установлено total_rows={total_rows} для {table_name}") else: migration_logger.warning(f"Метаданные для {table_name} не найдены") except Exception as e: diff --git a/app/services/scheduler.py b/app/services/scheduler.py index b5c1dc1..e43cafb 100644 --- a/app/services/scheduler.py +++ b/app/services/scheduler.py @@ -7,9 +7,12 @@ import time as time_module from app.core.logging import migration_logger from app.core.config import settings +from app.models.replication import ReplicationSchedule +from app.services.batch_runner import get_tables_to_run, run_migration_batch from app.services.migrator import migrator from app.repository.replication_metadata_repo import replication_metadata_repo from app.utils.email_sender import email_sender +from app.core.database import db_connector class MigrationScheduler: @@ -24,7 +27,8 @@ class MigrationScheduler: def _init_default_schedules(self): """Инициализация расписаний по умолчанию""" - self.repo.init_default_schedules(settings.TABLES_TO_COPY) + metadatas = self.repo.get_all_metadata() + self.repo.init_default_schedules(metadatas) migration_logger.info("Расписания по умолчанию инициализированы") def set_schedule(self, table_name: str, schedule_time: str = "00:00", @@ -71,95 +75,35 @@ class MigrationScheduler: """Включить расписание""" self.repo.enable_schedule(table_name) - def delete_schedule(self, table_name: str): + def delete_schedule(self, schedule_id: int): """Удалить расписание""" - self.repo.delete_schedule(table_name) + self.repo.delete_schedule(schedule_id) - def get_due_tables(self, current_time: Optional[datetime] = None) -> List: + def get_due_tables(self, current_time: Optional[datetime] = None) -> List[ReplicationSchedule]: """Получить таблицы для запуска сейчас""" due = self.repo.get_due_schedules(current_time) return due - async def run_due_migrations(self): - """Запустить миграции по расписанию""" - due_schedules = self.get_due_tables() + async def check_and_run_schedules(self): + """ + Проверить расписания и запустить задачи. + Вызывается по cron (например, каждую минуту). + """ + migration_logger.info("Проверка расписаний миграции...") - if not due_schedules: - return + try: + async with db_connector.async_dst_session() as session: + tables = await get_tables_to_run(session) + + if tables: + batch_id = await run_migration_batch(tables) + migration_logger.info(f"Батч запущен: {batch_id}") + else: + migration_logger.info("Нет задач для запуска в это время") - migration_logger.info(f"Найдено {len(due_schedules)} таблиц для миграции по расписанию") - - for schedule in due_schedules: - try: - days_str = ', '.join(schedule.days_display) - migration_logger.info( - f"Запуск миграции по расписанию для {schedule.table_name} " - f"в {schedule.schedule_time.strftime('%H:%M')} [{days_str}]" - ) - - # Запускаем миграцию - result = await asyncio.to_thread( - migrator.run_migration, - tables=[schedule.table_name], - full_reload=schedule.full_reload, - send_email=True - ) - - # Обновляем время последнего запуска - self.repo.update_schedule_last_run(schedule.table_name) - - # Логируем успешный запуск - self.repo.log_operation( - table_name=schedule.table_name, - operation='SCHEDULED', - records_count=0, - status='SUCCESS' - ) - - migration_logger.info(f"Миграция по расписанию для {schedule.table_name} завершена") - - except Exception as e: - error_msg = f"Ошибка при миграции по расписанию для {schedule.table_name}: {e}" - migration_logger.error(error_msg) - - # Логируем ошибку - self.repo.log_operation( - table_name=schedule.table_name, - operation='SCHEDULED', - records_count=0, - status='ERROR', - error_message=str(e)[:500] - ) - - email_sender.send_error_notification( - error_message=error_msg, - table_name=schedule.table_name - ) - - def start_scheduler(self): - """Запустить планировщик""" - self.running = True - migration_logger.info("Планировщик миграций запущен") - - while self.running: - try: - # Проверяем, есть ли таблицы для миграции - asyncio.run(self.run_due_migrations()) - - # Ждем 60 секунд до следующей проверки - time_module.sleep(60) - - except KeyboardInterrupt: - self.stop_scheduler() - break - except Exception as e: - migration_logger.error(f"Ошибка в планировщике: {e}") - time_module.sleep(60) - - def stop_scheduler(self): - """Остановить планировщик""" - self.running = False - migration_logger.info("Планировщик миграций остановлен") + except Exception as e: + migration_logger.error(f"Ошибка проверки расписаний: {e}") + raise # Глобальный экземпляр diff --git a/app/services/task_tracker.py b/app/services/task_tracker.py new file mode 100644 index 0000000..2b8a7b9 --- /dev/null +++ b/app/services/task_tracker.py @@ -0,0 +1,56 @@ +# app/services/task_tracker.py +import redis.asyncio as redis +from typing import Optional +import json + +class TaskTracker: + def __init__(self, redis_url: str = "redis://127.0.0.1:6379/0"): + self.redis_url = redis_url + + async def _get_redis(self): + return await redis.from_url(self.redis_url) + + async def init_batch(self, batch_id: str, total_tasks: int): + """Инициализировать пакет задач""" + r = await self._get_redis() + await r.set(f"replicator_batch:{batch_id}:total", total_tasks) + await r.set(f"replicator_batch:{batch_id}:completed", 0) + await r.set(f"replicator_batch:{batch_id}:results", json.dumps([])) + await r.expire(f"replicator_batch:{batch_id}:total", 86400) + await r.expire(f"replicator_batch:{batch_id}:completed", 86400) + await r.expire(f"replicator_batch:{batch_id}:results", 86400) + await r.close() + + async def mark_completed(self, batch_id: str, result: dict) -> int: + """Отметить задачу как завершённую и вернуть количество выполненных""" + r = await self._get_redis() + completed = await r.incr(f"replicator_batch:{batch_id}:completed") + + # Сохраняем результат задачи + results = json.loads(await r.get(f"replicator_batch:{batch_id}:results") or "[]") + results.append(result) + await r.set(f"replicator_batch:{batch_id}:results", json.dumps(results)) + + await r.close() + return completed + + async def get_batch_status(self, batch_id: str) -> dict: + """Получить статус пакета""" + r = await self._get_redis() + total = await r.get(f"replicator_batch:{batch_id}:total") + completed = await r.get(f"replicator_batch:{batch_id}:completed") + results = await r.get(f"replicator_batch:{batch_id}:results") + await r.close() + + return { + "total": int(total) if total else 0, + "completed": int(completed) if completed else 0, + "results": json.loads(results) if results else [] + } + + async def is_batch_complete(self, batch_id: str) -> bool: + """Проверить, завершены ли все задачи""" + status = await self.get_batch_status(batch_id) + return status["completed"] >= status["total"] + +task_tracker = TaskTracker() \ No newline at end of file diff --git a/app/taskiq/broker.py b/app/taskiq/broker.py index 21627c9..5ce0148 100644 --- a/app/taskiq/broker.py +++ b/app/taskiq/broker.py @@ -1,14 +1,11 @@ import asyncio -from datetime import datetime, time -import os -from typing import Any, Dict +from typing import Any, Dict, Optional from taskiq import ScheduledTask, TaskiqScheduler -from taskiq_redis import ListQueueBroker, ListRedisScheduleSource, RedisAsyncResultBackend, RedisStreamBroker, RedisScheduleSource +from taskiq_redis import ListQueueBroker, ListRedisScheduleSource, RedisAsyncResultBackend -import logging -logging.getLogger("taskiq.scheduler").setLevel(logging.DEBUG) -logging.getLogger("taskiq.broker").setLevel(logging.DEBUG) +from app.models.replication import ReplicationSchedule +from app.services.task_tracker import task_tracker # ---------- Настройка результата ---------- result_backend = RedisAsyncResultBackend( @@ -20,8 +17,6 @@ result_backend = RedisAsyncResultBackend( broker = ListQueueBroker( url="redis://127.0.0.1:6379/0", queue_name="migration_queue", # имя очереди - #consumer_group_name="migration_group", # группа потребителей - #maxlen=1000, # максимальная длина очереди ).with_result_backend(result_backend) # ---------- Источник расписаний (динамический, на Redis) ---------- @@ -52,36 +47,6 @@ scheduler = SchedulerWithStartup( sources=[schedule_source], ) -# ---------- Задача для миграции ---------- -@broker.task( - task_name="migrate_table_task", # Короткое имя для удобства - retry_on_error=True, - max_retries=3, - retry_delay=60 -) -async def migrate_table_task(table_name: str, full_reload: bool = False) -> Dict[str, Any]: - """ - Асинхронная задача миграции. - Здесь вызывается синхронный код миграции (через asyncio.to_thread, чтобы не блокировать). - """ - from app.services.migrator import migrator - from app.core.logging import migration_logger - - migration_logger.info(f"Запуск миграции для {table_name} (full_reload={full_reload})") - - try: - # Запускаем синхронную функцию в отдельном потоке, чтобы не блокировать event loop - result = await asyncio.to_thread( - migrator.run_migration, - tables=[table_name], - full_reload=full_reload, - send_email=True - ) - complex.info(f"Миграция {table_name} завершена") - return {"success": True, "table": table_name, "result": result} - except Exception as e: - migration_logger.error(f"Ошибка миграции {table_name}: {e}") - raise # для retry # Функция синхронизации расписаний из БД в Redis (общая для startup и API) async def refresh_schedules(): @@ -98,43 +63,145 @@ async def refresh_schedules(): await r.delete(*keys) await r.close() - for s in schedules: - if not s.enabled: - continue - hour = s.schedule_time.hour - minute = s.schedule_time.minute - days = s.days_list - cron = f"{minute} {hour} * * {','.join(map(str, days))}" if len(days) < 6 else f"{minute} {hour} * * *" - # await migrate_table_task.schedule_by_cron( - # schedule_source, - # cron, - # s.table_name, - # s.full_reload - # ) - task = ScheduledTask( - task_name="migrate_table_task", - labels={}, - cron=cron, - cron_offset='Asia/Tokyo', # UTC+9 - args=[s.table_name], - kwargs={"full_reload": s.full_reload} + # Задача для проверки расписания + checker_task = ScheduledTask( + task_name="check_schedules_task", + labels={}, + cron="* * * * *", # ← Каждую минуту + cron_offset='Asia/Tokyo', + args=[], + kwargs={} + ) + await schedule_source.add_schedule(checker_task) + + # for s in schedules: + # if s.enabled is not True: + # continue + # hour = s.schedule_time.hour + # minute = s.schedule_time.minute + # days = s.days_list + # cron = f"{minute} {hour} * * {','.join(map(str, days))}" if len(days) < 6 else f"{minute} {hour} * * *" + + # task = ScheduledTask( + # task_name="migrate_table_task", + # labels={}, + # cron=cron, + # cron_offset='Asia/Tokyo', # UTC+9 + # args=[], + # kwargs={ + # "table_name": s.table.table_name, + # "schedule_id": s.id, + # "metadata_id": s.metadata_id, + # "life_table_name": s.table.life_table_name, + # "uses_life": s.table.has_use_life(), + # "full_reload": s.full_reload + # } + # ) + # await schedule_source.add_schedule(task) + + migration_logger.info("Расписание запущено") + +# ---------- Задача для миграции ---------- +@broker.task( + task_name="migrate_table_task", + retry_on_error=True, + max_retries=3, + retry_delay=60 +) +async def migrate_table_task( + table_name: str, + schedule_id: int, + metadata_id: int, + life_table_name: Optional[str] = None, + uses_life: bool = False, + full_reload: bool = False, + batch_id: Optional[str] = None, + send_email: bool = False + ) -> Dict[str, Any]: + """ + Асинхронная задача миграции. + Здесь вызывается синхронный код миграции (через asyncio.to_thread, чтобы не блокировать). + """ + from app.services.migrator import migrator + from app.core.logging import migration_logger + + migration_logger.info(f"Запуск миграции: {table_name} (life_table_name={life_table_name}, uses_life={uses_life}, full_reload={full_reload}, batch_id={batch_id})") + + try: + # Запускаем синхронную функцию в отдельном потоке, чтобы не блокировать event loop + result = await asyncio.to_thread( + migrator.run_migration, + table_name=table_name, + schedule_id=schedule_id, + metadata_id=metadata_id, + life_table_name=life_table_name, + uses_life=uses_life, + full_reload=full_reload, + send_email=False ) - await schedule_source.add_schedule(task) - # schedules_itms = await schedule_source.get_schedules() - # for itm in schedules_itms: - # print(itm) - migration_logger.info("Расписания обновлены") -@broker.task -async def test_task(message: str = "Hello, world!"): - """Простая тестовая задача.""" - import logging - logging.basicConfig(level=logging.INFO) - logger = logging.getLogger(__name__) - logger.info(f"Тестовая задача выполнена: {message}") - return {"status": "ok", "message": message} + task_result = {"success": result, "table": table_name, "schedule_id": schedule_id} -# Startup для планировщика (выполняется при запуске через CLI) -# @broker.on_event('on_startup') -# async def on_startup(): - # await refresh_schedules() \ No newline at end of file + # Если есть batch_id — отмечаем выполнение + if batch_id: + completed = await task_tracker.mark_completed(batch_id, task_result) + status = await task_tracker.get_batch_status(batch_id) + migration_logger.info(f"Задача завершена: {completed}/{status['total']} в батче {batch_id}") + + # Если все задачи выполнены — запускаем отправку email + if completed >= status["total"] and send_email: + await send_batch_email_task.kiq(batch_id=batch_id) + + return task_result + except Exception as e: + migration_logger.error(f"Ошибка миграции {table_name}: {e}") + + # Даже при ошибке отмечаем задачу + if batch_id: + await task_tracker.mark_completed( + batch_id, + {"success": False, "table": table_name, "error": str(e)} + ) + raise # для retry + +# ---------- Задача для отправки email ---------- +@broker.task(task_name="send_batch_email_task") +async def send_batch_email_task(batch_id: str) -> Dict[str, Any]: + """Отправить сводный email после завершения всех задач""" + from app.core.logging import migration_logger + from app.services.task_tracker import task_tracker + from app.utils.email_sender import email_sender + + migration_logger.info(f"📧 Отправка сводного email для батча {batch_id}") + + status = await task_tracker.get_batch_status(batch_id) + + # Формируем отчёт + successful = sum(1 for r in status["results"] if r.get("success")) + failed = len(status["results"]) - successful + + email_data = { + "batch_id": batch_id, + "total": status["total"], + "completed": status["completed"], + "successful": successful, + "failed": failed, + "results": status["results"] + } + + # Отправляем email + await email_sender.send_migration_summary_email(email_data) + + migration_logger.info(f"Сводный email отправлен: {successful} успешно, {failed} ошибок") + + return {"email_sent": True, "batch_id": batch_id} + +# ---------- Задача для проверки расписания ---------- +@broker.task(task_name="check_schedules_task") +async def check_schedules_task(): + """ + Задача-планировщик: проверяет расписания и запускает батчи. + Запускается по cron (каждую минуту). + """ + from app.services.scheduler import scheduler + await scheduler.check_and_run_schedules() diff --git a/app/taskiq/tasks.py b/app/taskiq/tasks.py new file mode 100644 index 0000000..0fb2152 --- /dev/null +++ b/app/taskiq/tasks.py @@ -0,0 +1,6 @@ +# app/tasks.py +import asyncio +from typing import Any, Dict, Optional +from app.services.task_tracker import task_tracker +from app.taskiq.broker import broker + diff --git a/app/utils/email_sender.py b/app/utils/email_sender.py index 05fc13c..a254a30 100644 --- a/app/utils/email_sender.py +++ b/app/utils/email_sender.py @@ -1,8 +1,9 @@ +import asyncio import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication -from typing import List, Optional +from typing import Dict, List, Optional from datetime import datetime from app.core.config import settings from app.core.logging import migration_logger @@ -134,5 +135,221 @@ class EmailSender: return self.send_email(subject, body) + async def send_migration_summary_email(self, email_data: Dict) -> bool: + """ + Отправить сводный email о результатах миграции. + + Args: + email_data: Словарь с данными: + - batch_id: ID батча + - total: Всего таблиц + - completed: Завершено + - successful: Успешно + - failed: Ошибок + - results: Список результатов по таблицам + - timestamp: Время отчёта + + Returns: + bool: True если email отправлен успешно + """ + try: + # Формируем HTML-отчёт + html_content = self._create_email_html(email_data) + + # Создаём письмо + msg = MIMEMultipart('alternative') + msg['Subject'] = f"Отчёт о миграции: {email_data.get('successful', 0)} успешно, {email_data.get('failed', 0)} ошибок" + msg['From'] = self.from_addr + msg['To'] = ', '.join(self.to_addrs) + + # Текстовая версия + text_content = self._create_email_text(email_data) + msg.attach(MIMEText(text_content, 'plain', 'utf-8')) + + # HTML версия + msg.attach(MIMEText(html_content, 'html', 'utf-8')) + + # Отправляем + await asyncio.to_thread( + self._send_email_sync, + smtp_server=self.smtp_server, + smtp_port=self.smtp_port, + login=self.username, + password=self.password, + from_addr=self.from_addr, + to_addrs=self.to_addrs, + message=msg.as_string() + ) + + migration_logger.info(f"Email отправлен: {len(self.to_addrs)} получателей") + return True + + except Exception as e: + migration_logger.error(f"Ошибка отправки email: {e}") + return False + + def _create_email_html(self, data: Dict) -> str: + """Создать HTML-версию письма""" + + batch_id = data.get('batch_id', 'N/A') + total = data.get('total', 0) + successful = data.get('successful', 0) + failed = data.get('failed', 0) + timestamp = data.get('timestamp', 'N/A') + results = data.get('results', []) + + # Цвет статуса + if failed == 0: + status_color = "#28a745" + status_text = "✅ Все успешно" + elif failed < total: + status_color = "#ffc107" + status_text = "⚠️ Частичный успех" + else: + status_color = "#dc3545" + status_text = "❌ Все плохо" + + # Таблица результатов + rows = "" + for r in results: + table_name = r.get('table', 'Unknown') + success = r.get('success', False) + error = r.get('error', '') + + if success: + row_color = "#d4edda" + status_icon = "✅" + else: + row_color = "#f8d7da" + status_icon = "❌" + + rows += f""" + + {table_name} + {"Успешно" if success else "Ошибка"} + {error[:50] if error else "-"} + + """ + + html = f""" + + + + + + + +
+
+

Репликация данных

+
+ +
+

Batch ID: {batch_id}

+

Время: {timestamp}

+

{status_text}

+
    +
  • Всего таблиц: {total}
  • +
  • Успешно: {successful}
  • +
  • Ошибок: {failed}
  • +
+
+ +

Детали по таблицам

+ + + + + + + + + + {rows} + +
ТаблицаРезультатОшибка
+ + +
+ + + """ + return html + + def _create_email_text(self, data: Dict) -> str: + """Создать текстовую версию письма""" + + batch_id = data.get('batch_id', 'N/A') + total = data.get('total', 0) + successful = data.get('successful', 0) + failed = data.get('failed', 0) + timestamp = data.get('timestamp', 'N/A') + results = data.get('results', []) + + text = f""" + ОТЧЁТ О МИГРАЦИИ ДАННЫХ + ======================= + + Batch ID: {batch_id} + Время: {timestamp} + + СТАТИСТИКА: + - Всего таблиц: {total} + - Успешно: {successful} + - Ошибок: {failed} + + ДЕТАЛИ ПО ТАБЛИЦАМ: + """ + + for r in results: + table_name = r.get('table', 'Unknown') + success = r.get('success', False) + error = r.get('error', '') + + status = "✅ Успешно" if success else f"❌ Ошибка: {error}" + text += f"\n • {table_name}: {status}" + + text += """ + + ======================= + Автоматическое сообщение от системы репликации + Не отвечайте на это письмо + """ + return text + + def _send_email_sync( + self, + smtp_server: str, + smtp_port: int, + login: str, + password: str, + from_addr: str, + to_addrs: List[str], + message: str + ): + """Синхронная отправка email (запускается в потоке)""" + + with smtplib.SMTP_SSL(smtp_server, smtp_port) as server: + server.login(login, password) + server.sendmail(from_addr, to_addrs, message) + # server = smtplib.SMTP_SSL(smtp_server, smtp_port) + # try: + # server.starttls() + # server.login(login, password) + # server.sendmail(from_addr, to_addrs, message) + # finally: + # server.quit() email_sender = EmailSender() \ No newline at end of file diff --git a/req.txt b/req.txt index 7361d15..0d0c49d 100644 --- a/req.txt +++ b/req.txt @@ -45,3 +45,4 @@ uvloop==0.22.1 watchfiles==1.1.1 websockets==16.0 yarl==1.23.0 +asyncpg \ No newline at end of file diff --git a/run_scheduler.py b/run_scheduler.py new file mode 100644 index 0000000..99e1ee8 --- /dev/null +++ b/run_scheduler.py @@ -0,0 +1,68 @@ +import subprocess +import signal + +class ScheduleManager: + def __init__(self, shutdown_timeout=5): + self.process = None + self.running = False + self.shutdown_timeout = shutdown_timeout + + def start_schedule(self): + """Запуск расписания""" + cmd = [ + "taskiq", "scheduler", + "app.taskiq.broker:scheduler" + ] + + print("Запуск расписания...") + self.process = subprocess.Popen(cmd) + self.running = True + + # Установить обработчики сигналов + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + try: + return_code = self.process.wait() + print(f"Расписание завершилось с кодом: {return_code}") + except KeyboardInterrupt: + print("Получен SIGINT, останавливаем...") + self.stop_schedule() + except Exception as e: + print(f"Ошибка при ожидании расписания: {e}") + self.stop_schedule() + + def _signal_handler(self, sig, frame): + """Обработчик сигнала остановки""" + print(f"\nПолучен сигнал {sig}, останавливаю расприсание...") + self.stop_schedule() + + def stop_schedule(self): + """Корректная остановка расписания""" + if self.process and self.running: + print("Отправляем SIGTERM расприсанию...") + self.process.terminate() + + try: + # Ждать завершения в течение self.shutdown_timeout секунд + return_code = self.process.wait(timeout=self.shutdown_timeout) + print(f"Расприсание завершилось с кодом: {return_code}") + except subprocess.TimeoutExpired: + print(f"Расприсание не завершилось за {self.shutdown_timeout} секунд, отправляем SIGKILL...") + self.process.kill() + try: + self.process.wait(timeout=2) # 2 секунды на SIGKILL + print("Процесс принудительно завершен") + except subprocess.TimeoutExpired: + print("Не удалось завершить процесс принудительно") + + self.running = False + print("Расприсание остановлено") + +def main(): + # Использовать короткий таймаут + manager = ScheduleManager(shutdown_timeout=3) + manager.start_schedule() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/run_worker.py b/run_worker.py new file mode 100644 index 0000000..94b09d3 --- /dev/null +++ b/run_worker.py @@ -0,0 +1,70 @@ +import subprocess +import signal + +class WorkerManager: + def __init__(self, shutdown_timeout=5): + self.process = None + self.running = False + self.shutdown_timeout = shutdown_timeout + + def start_worker(self): + """Запуск воркера""" + cmd = [ + "taskiq", "worker", + "app.taskiq.broker:broker", + "--workers", "1", + "--max-prefetch", "1", + ] + + print("Запуск воркера...") + self.process = subprocess.Popen(cmd) + self.running = True + + # Установить обработчики сигналов + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + try: + return_code = self.process.wait() + print(f"Воркер завершился с кодом: {return_code}") + except KeyboardInterrupt: + print("Получен SIGINT, останавливаем...") + self.stop_worker() + except Exception as e: + print(f"Ошибка при ожидании воркера: {e}") + self.stop_worker() + + def _signal_handler(self, sig, frame): + """Обработчик сигнала остановки""" + print(f"\nПолучен сигнал {sig}, останавливаю воркер...") + self.stop_worker() + + def stop_worker(self): + """Корректная остановка воркера""" + if self.process and self.running: + print("Отправляем SIGTERM воркеру...") + self.process.terminate() + + try: + # Ждать завершения в течение self.shutdown_timeout секунд + return_code = self.process.wait(timeout=self.shutdown_timeout) + print(f"Воркер завершился с кодом: {return_code}") + except subprocess.TimeoutExpired: + print(f"Воркер не завершился за {self.shutdown_timeout} секунд, отправляем SIGKILL...") + self.process.kill() + try: + self.process.wait(timeout=2) # 2 секунды на SIGKILL + print("Процесс принудительно завершен") + except subprocess.TimeoutExpired: + print("Не удалось завершить процесс принудительно") + + self.running = False + print("Воркер остановлен") + +def main(): + # Использовать короткий таймаут + manager = WorkerManager(shutdown_timeout=3) + manager.start_worker() + +if __name__ == "__main__": + main() \ No newline at end of file