import asyncio from typing import Any, Dict, Optional from taskiq import ScheduledTask, TaskiqScheduler from taskiq_redis import ListQueueBroker, ListRedisScheduleSource, RedisAsyncResultBackend from app.models.replication import ReplicationSchedule from app.services.task_tracker import task_tracker # ---------- Настройка результата ---------- result_backend = RedisAsyncResultBackend( redis_url="redis://127.0.0.1:6379/0", result_ex_time=86400, # результаты хранятся 24 часа ) # ---------- Настройка брокера (очередь с подтверждениями) ---------- broker = ListQueueBroker( url="redis://127.0.0.1:6379/0", queue_name="migration_queue", # имя очереди ).with_result_backend(result_backend) # ---------- Источник расписаний (динамический, на Redis) ---------- schedule_source = ListRedisScheduleSource( url="redis://127.0.0.1:6379/0", prefix="migration_schedule", # префикс для ключей в Redis ) # ---------- Планировщик с кастомным startup ---------- class SchedulerWithStartup(TaskiqScheduler): """Планировщик с синхронизацией расписаний при запуске.""" async def startup(self) -> None: """Запуск планировщика с синхронизацией расписаний.""" from app.scheduler import sync_schedules_to_redis from app.core.logging import migration_logger migration_logger.info("Запуск планировщика taskiq...") # Синхронизируем расписания из БД в Redis await sync_schedules_to_redis() migration_logger.info("Планировщик готов к работе") await super().startup() scheduler = SchedulerWithStartup( broker=broker, sources=[schedule_source], ) # Функция синхронизации расписаний из БД в Redis (общая для startup и API) async def refresh_schedules(): from app.services.scheduler import scheduler as db_scheduler from app.core.logging import migration_logger migration_logger.info("Обновление расписаний в Redis...") # Получаем все активные расписания из БД schedules = db_scheduler.get_all_schedules() # Очищаем старые ключи в Redis (можно через прямой redis) import redis.asyncio as redis r = await redis.from_url("redis://127.0.0.1:6379/0") keys = await r.keys("migration_schedule:*") if keys: await r.delete(*keys) await r.close() # Задача для проверки расписания checker_task = ScheduledTask( task_name="check_schedules_task", labels={}, cron="* * * * *", # ← Каждую минуту cron_offset='Asia/Tokyo', args=[], kwargs={} ) await schedule_source.add_schedule(checker_task) # for s in schedules: # if s.enabled is not True: # continue # hour = s.schedule_time.hour # minute = s.schedule_time.minute # days = s.days_list # cron = f"{minute} {hour} * * {','.join(map(str, days))}" if len(days) < 6 else f"{minute} {hour} * * *" # task = ScheduledTask( # task_name="migrate_table_task", # labels={}, # cron=cron, # cron_offset='Asia/Tokyo', # UTC+9 # args=[], # kwargs={ # "table_name": s.table.table_name, # "schedule_id": s.id, # "metadata_id": s.metadata_id, # "life_table_name": s.table.life_table_name, # "uses_life": s.table.has_use_life(), # "full_reload": s.full_reload # } # ) # await schedule_source.add_schedule(task) migration_logger.info("Расписание запущено") # ---------- Задача для миграции ---------- @broker.task( task_name="migrate_table_task", retry_on_error=True, max_retries=3, retry_delay=60 ) async def migrate_table_task( table_name: str, schedule_id: int, metadata_id: int, life_table_name: Optional[str] = None, uses_life: bool = False, full_reload: bool = False, batch_id: Optional[str] = None, send_email: bool = False ) -> Dict[str, Any]: """ Асинхронная задача миграции. Здесь вызывается синхронный код миграции (через asyncio.to_thread, чтобы не блокировать). """ from app.services.migrator import migrator from app.core.logging import migration_logger migration_logger.info(f"Запуск миграции: {table_name} (life_table_name={life_table_name}, uses_life={uses_life}, full_reload={full_reload}, batch_id={batch_id})") try: # Запускаем синхронную функцию в отдельном потоке, чтобы не блокировать event loop result = await asyncio.to_thread( migrator.run_migration, table_name=table_name, schedule_id=schedule_id, metadata_id=metadata_id, life_table_name=life_table_name, uses_life=uses_life, full_reload=full_reload, send_email=False ) task_result = {"success": result, "table": table_name, "schedule_id": schedule_id} # Если есть batch_id — отмечаем выполнение if batch_id: completed = await task_tracker.mark_completed(batch_id, task_result) status = await task_tracker.get_batch_status(batch_id) migration_logger.info(f"Задача завершена: {completed}/{status['total']} в батче {batch_id}") # Если все задачи выполнены — запускаем отправку email if completed >= status["total"] and send_email: await send_batch_email_task.kiq(batch_id=batch_id) return task_result except Exception as e: migration_logger.error(f"Ошибка миграции {table_name}: {e}") # Даже при ошибке отмечаем задачу if batch_id: await task_tracker.mark_completed( batch_id, {"success": False, "table": table_name, "error": str(e)} ) raise # для retry # ---------- Задача для отправки email ---------- @broker.task(task_name="send_batch_email_task") async def send_batch_email_task(batch_id: str) -> Dict[str, Any]: """Отправить сводный email после завершения всех задач""" from app.core.logging import migration_logger from app.services.task_tracker import task_tracker from app.utils.email_sender import email_sender migration_logger.info(f"📧 Отправка сводного email для батча {batch_id}") status = await task_tracker.get_batch_status(batch_id) # Формируем отчёт successful = sum(1 for r in status["results"] if r.get("success")) failed = len(status["results"]) - successful email_data = { "batch_id": batch_id, "total": status["total"], "completed": status["completed"], "successful": successful, "failed": failed, "results": status["results"] } # Отправляем email await email_sender.send_migration_summary_email(email_data) migration_logger.info(f"Сводный email отправлен: {successful} успешно, {failed} ошибок") return {"email_sent": True, "batch_id": batch_id} # ---------- Задача для проверки расписания ---------- @broker.task(task_name="check_schedules_task") async def check_schedules_task(): """ Задача-планировщик: проверяет расписания и запускает батчи. Запускается по cron (каждую минуту). """ from app.services.scheduler import scheduler await scheduler.check_and_run_schedules()