208 lines
8.3 KiB
Python
208 lines
8.3 KiB
Python
import asyncio
|
||
from typing import Any, Dict, Optional
|
||
|
||
from taskiq import ScheduledTask, TaskiqScheduler
|
||
from taskiq_redis import ListQueueBroker, ListRedisScheduleSource, RedisAsyncResultBackend
|
||
|
||
from app.models.replication import ReplicationSchedule
|
||
from app.services.task_tracker import task_tracker
|
||
|
||
# ---------- Настройка результата ----------
|
||
result_backend = RedisAsyncResultBackend(
|
||
redis_url="redis://127.0.0.1:6379/0",
|
||
result_ex_time=86400, # результаты хранятся 24 часа
|
||
)
|
||
|
||
# ---------- Настройка брокера (очередь с подтверждениями) ----------
|
||
broker = ListQueueBroker(
|
||
url="redis://127.0.0.1:6379/0",
|
||
queue_name="migration_queue", # имя очереди
|
||
).with_result_backend(result_backend)
|
||
|
||
# ---------- Источник расписаний (динамический, на Redis) ----------
|
||
schedule_source = ListRedisScheduleSource(
|
||
url="redis://127.0.0.1:6379/0",
|
||
prefix="migration_schedule", # префикс для ключей в Redis
|
||
)
|
||
|
||
|
||
# ---------- Планировщик с кастомным startup ----------
|
||
class SchedulerWithStartup(TaskiqScheduler):
|
||
"""Планировщик с синхронизацией расписаний при запуске."""
|
||
|
||
async def startup(self) -> None:
|
||
"""Запуск планировщика с синхронизацией расписаний."""
|
||
from app.scheduler import sync_schedules_to_redis
|
||
from app.core.logging import migration_logger
|
||
|
||
migration_logger.info("Запуск планировщика taskiq...")
|
||
# Синхронизируем расписания из БД в Redis
|
||
await sync_schedules_to_redis()
|
||
migration_logger.info("Планировщик готов к работе")
|
||
await super().startup()
|
||
|
||
|
||
scheduler = SchedulerWithStartup(
|
||
broker=broker,
|
||
sources=[schedule_source],
|
||
)
|
||
|
||
|
||
# Функция синхронизации расписаний из БД в Redis (общая для startup и API)
|
||
async def refresh_schedules():
|
||
from app.services.scheduler import scheduler as db_scheduler
|
||
from app.core.logging import migration_logger
|
||
migration_logger.info("Обновление расписаний в Redis...")
|
||
# Получаем все активные расписания из БД
|
||
schedules = db_scheduler.get_all_schedules()
|
||
# Очищаем старые ключи в Redis (можно через прямой redis)
|
||
import redis.asyncio as redis
|
||
r = await redis.from_url("redis://127.0.0.1:6379/0")
|
||
keys = await r.keys("migration_schedule:*")
|
||
if keys:
|
||
await r.delete(*keys)
|
||
await r.close()
|
||
|
||
# Задача для проверки расписания
|
||
checker_task = ScheduledTask(
|
||
task_name="check_schedules_task",
|
||
labels={},
|
||
cron="* * * * *", # ← Каждую минуту
|
||
cron_offset='Asia/Tokyo',
|
||
args=[],
|
||
kwargs={}
|
||
)
|
||
await schedule_source.add_schedule(checker_task)
|
||
|
||
# for s in schedules:
|
||
# if s.enabled is not True:
|
||
# continue
|
||
# hour = s.schedule_time.hour
|
||
# minute = s.schedule_time.minute
|
||
# days = s.days_list
|
||
# cron = f"{minute} {hour} * * {','.join(map(str, days))}" if len(days) < 6 else f"{minute} {hour} * * *"
|
||
|
||
# task = ScheduledTask(
|
||
# task_name="migrate_table_task",
|
||
# labels={},
|
||
# cron=cron,
|
||
# cron_offset='Asia/Tokyo', # UTC+9
|
||
# args=[],
|
||
# kwargs={
|
||
# "table_name": s.table.table_name,
|
||
# "schedule_id": s.id,
|
||
# "metadata_id": s.metadata_id,
|
||
# "life_table_name": s.table.life_table_name,
|
||
# "uses_life": s.table.has_use_life(),
|
||
# "full_reload": s.full_reload
|
||
# }
|
||
# )
|
||
# await schedule_source.add_schedule(task)
|
||
|
||
migration_logger.info("Расписание запущено")
|
||
|
||
# ---------- Задача для миграции ----------
|
||
@broker.task(
|
||
task_name="migrate_table_task",
|
||
retry_on_error=True,
|
||
max_retries=3,
|
||
retry_delay=60
|
||
)
|
||
async def migrate_table_task(
|
||
table_name: str,
|
||
schedule_id: int,
|
||
metadata_id: int,
|
||
life_table_name: Optional[str] = None,
|
||
uses_life: bool = False,
|
||
full_reload: bool = False,
|
||
batch_id: Optional[str] = None,
|
||
send_email: bool = False
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Асинхронная задача миграции.
|
||
Здесь вызывается синхронный код миграции (через asyncio.to_thread, чтобы не блокировать).
|
||
"""
|
||
from app.services.migrator import migrator
|
||
from app.core.logging import migration_logger
|
||
|
||
migration_logger.info(f"Запуск миграции: {table_name} (life_table_name={life_table_name}, uses_life={uses_life}, full_reload={full_reload}, batch_id={batch_id})")
|
||
|
||
try:
|
||
# Запускаем синхронную функцию в отдельном потоке, чтобы не блокировать event loop
|
||
result = await asyncio.to_thread(
|
||
migrator.run_migration,
|
||
table_name=table_name,
|
||
schedule_id=schedule_id,
|
||
metadata_id=metadata_id,
|
||
life_table_name=life_table_name,
|
||
uses_life=uses_life,
|
||
full_reload=full_reload,
|
||
send_email=False
|
||
)
|
||
|
||
task_result = {"success": result, "table": table_name, "schedule_id": schedule_id}
|
||
|
||
# Если есть batch_id — отмечаем выполнение
|
||
if batch_id:
|
||
completed = await task_tracker.mark_completed(batch_id, task_result)
|
||
status = await task_tracker.get_batch_status(batch_id)
|
||
migration_logger.info(f"Задача завершена: {completed}/{status['total']} в батче {batch_id}")
|
||
|
||
# Если все задачи выполнены — запускаем отправку email
|
||
if completed >= status["total"] and send_email:
|
||
await send_batch_email_task.kiq(batch_id=batch_id)
|
||
|
||
return task_result
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка миграции {table_name}: {e}")
|
||
|
||
# Даже при ошибке отмечаем задачу
|
||
if batch_id:
|
||
await task_tracker.mark_completed(
|
||
batch_id,
|
||
{"success": False, "table": table_name, "error": str(e)}
|
||
)
|
||
raise # для retry
|
||
|
||
# ---------- Задача для отправки email ----------
|
||
@broker.task(task_name="send_batch_email_task")
|
||
async def send_batch_email_task(batch_id: str) -> Dict[str, Any]:
|
||
"""Отправить сводный email после завершения всех задач"""
|
||
from app.core.logging import migration_logger
|
||
from app.services.task_tracker import task_tracker
|
||
from app.utils.email_sender import email_sender
|
||
|
||
migration_logger.info(f"📧 Отправка сводного email для батча {batch_id}")
|
||
|
||
status = await task_tracker.get_batch_status(batch_id)
|
||
|
||
# Формируем отчёт
|
||
successful = sum(1 for r in status["results"] if r.get("success"))
|
||
failed = len(status["results"]) - successful
|
||
|
||
email_data = {
|
||
"batch_id": batch_id,
|
||
"total": status["total"],
|
||
"completed": status["completed"],
|
||
"successful": successful,
|
||
"failed": failed,
|
||
"results": status["results"]
|
||
}
|
||
|
||
# Отправляем email
|
||
await email_sender.send_migration_summary_email(email_data)
|
||
|
||
migration_logger.info(f"Сводный email отправлен: {successful} успешно, {failed} ошибок")
|
||
|
||
return {"email_sent": True, "batch_id": batch_id}
|
||
|
||
# ---------- Задача для проверки расписания ----------
|
||
@broker.task(task_name="check_schedules_task")
|
||
async def check_schedules_task():
|
||
"""
|
||
Задача-планировщик: проверяет расписания и запускает батчи.
|
||
Запускается по cron (каждую минуту).
|
||
"""
|
||
from app.services.scheduler import scheduler
|
||
await scheduler.check_and_run_schedules()
|