Files
replicator/app/services/scheduler.py
2026-03-13 17:11:39 +09:00

110 lines
4.5 KiB
Python

# app/services/scheduler.py
import asyncio
from datetime import datetime
from typing import List, Optional
import time as time_module
from app.core.logging import migration_logger
from app.core.config import settings
from app.models.replication import ReplicationSchedule
from app.services.batch_runner import get_tables_to_run, run_migration_batch
from app.services.migrator import migrator
from app.repository.replication_metadata_repo import replication_metadata_repo
from app.utils.email_sender import email_sender
from app.core.database import db_connector
class MigrationScheduler:
"""Планировщик миграций на базе PostgreSQL"""
def __init__(self):
self.running = False
self.repo = replication_metadata_repo
# Инициализируем расписания по умолчанию
self._init_default_schedules()
def _init_default_schedules(self):
"""Инициализация расписаний по умолчанию"""
metadatas = self.repo.get_all_metadata()
self.repo.init_default_schedules(metadatas)
migration_logger.info("Расписания по умолчанию инициализированы")
def set_schedule(self, table_name: str, schedule_time: str = "00:00",
days: Optional[List[str]] = None, full_reload: bool = False,
enabled: bool = True):
"""Установить расписание для таблицы"""
if table_name not in settings.TABLES_TO_COPY:
raise ValueError(f"Таблица {table_name} не найдена в списке для миграции")
# Валидация формата времени
try:
datetime.strptime(schedule_time, "%H:%M")
except ValueError:
raise ValueError(f"Неверный формат времени: {schedule_time}. Используйте HH:MM")
schedule = self.repo.set_schedule(
table_name=table_name,
schedule_time=schedule_time,
days=days,
full_reload=full_reload,
enabled=enabled
)
if schedule:
days_str = ', '.join(schedule.days_display) if days else 'все дни'
migration_logger.info(
f"Установлено расписание для {table_name}: "
f"{schedule_time} [{days_str}] (full_reload={full_reload})"
)
def get_schedule(self, table_name: str):
"""Получить расписание для таблицы"""
return self.repo.get_schedule(table_name)
def get_all_schedules(self):
"""Получить все расписания"""
return self.repo.get_all_schedules()
def disable_schedule(self, table_name: str):
"""Отключить расписание"""
self.repo.disable_schedule(table_name)
def enable_schedule(self, table_name: str):
"""Включить расписание"""
self.repo.enable_schedule(table_name)
def delete_schedule(self, schedule_id: int):
"""Удалить расписание"""
self.repo.delete_schedule(schedule_id)
def get_due_tables(self, current_time: Optional[datetime] = None) -> List[ReplicationSchedule]:
"""Получить таблицы для запуска сейчас"""
due = self.repo.get_due_schedules(current_time)
return due
async def check_and_run_schedules(self):
"""
Проверить расписания и запустить задачи.
Вызывается по cron (например, каждую минуту).
"""
migration_logger.info("Проверка расписаний миграции...")
try:
async with db_connector.async_dst_session() as session:
tables = await get_tables_to_run(session)
if tables:
batch_id = await run_migration_batch(tables)
migration_logger.info(f"Батч запущен: {batch_id}")
else:
migration_logger.info("Нет задач для запуска в это время")
except Exception as e:
migration_logger.error(f"Ошибка проверки расписаний: {e}")
raise
# Глобальный экземпляр
scheduler = MigrationScheduler()