Files
replicator/app/taskiq/broker.py
2026-03-13 17:11:39 +09:00

208 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from typing import Any, Dict, Optional
from taskiq import ScheduledTask, TaskiqScheduler
from taskiq_redis import ListQueueBroker, ListRedisScheduleSource, RedisAsyncResultBackend
from app.models.replication import ReplicationSchedule
from app.services.task_tracker import task_tracker
# ---------- Настройка результата ----------
result_backend = RedisAsyncResultBackend(
redis_url="redis://127.0.0.1:6379/0",
result_ex_time=86400, # результаты хранятся 24 часа
)
# ---------- Настройка брокера (очередь с подтверждениями) ----------
broker = ListQueueBroker(
url="redis://127.0.0.1:6379/0",
queue_name="migration_queue", # имя очереди
).with_result_backend(result_backend)
# ---------- Источник расписаний (динамический, на Redis) ----------
schedule_source = ListRedisScheduleSource(
url="redis://127.0.0.1:6379/0",
prefix="migration_schedule", # префикс для ключей в Redis
)
# ---------- Планировщик с кастомным startup ----------
class SchedulerWithStartup(TaskiqScheduler):
"""Планировщик с синхронизацией расписаний при запуске."""
async def startup(self) -> None:
"""Запуск планировщика с синхронизацией расписаний."""
from app.scheduler import sync_schedules_to_redis
from app.core.logging import migration_logger
migration_logger.info("Запуск планировщика taskiq...")
# Синхронизируем расписания из БД в Redis
await sync_schedules_to_redis()
migration_logger.info("Планировщик готов к работе")
await super().startup()
scheduler = SchedulerWithStartup(
broker=broker,
sources=[schedule_source],
)
# Функция синхронизации расписаний из БД в Redis (общая для startup и API)
async def refresh_schedules():
from app.services.scheduler import scheduler as db_scheduler
from app.core.logging import migration_logger
migration_logger.info("Обновление расписаний в Redis...")
# Получаем все активные расписания из БД
schedules = db_scheduler.get_all_schedules()
# Очищаем старые ключи в Redis (можно через прямой redis)
import redis.asyncio as redis
r = await redis.from_url("redis://127.0.0.1:6379/0")
keys = await r.keys("migration_schedule:*")
if keys:
await r.delete(*keys)
await r.close()
# Задача для проверки расписания
checker_task = ScheduledTask(
task_name="check_schedules_task",
labels={},
cron="* * * * *", # ← Каждую минуту
cron_offset='Asia/Tokyo',
args=[],
kwargs={}
)
await schedule_source.add_schedule(checker_task)
# for s in schedules:
# if s.enabled is not True:
# continue
# hour = s.schedule_time.hour
# minute = s.schedule_time.minute
# days = s.days_list
# cron = f"{minute} {hour} * * {','.join(map(str, days))}" if len(days) < 6 else f"{minute} {hour} * * *"
# task = ScheduledTask(
# task_name="migrate_table_task",
# labels={},
# cron=cron,
# cron_offset='Asia/Tokyo', # UTC+9
# args=[],
# kwargs={
# "table_name": s.table.table_name,
# "schedule_id": s.id,
# "metadata_id": s.metadata_id,
# "life_table_name": s.table.life_table_name,
# "uses_life": s.table.has_use_life(),
# "full_reload": s.full_reload
# }
# )
# await schedule_source.add_schedule(task)
migration_logger.info("Расписание запущено")
# ---------- Задача для миграции ----------
@broker.task(
task_name="migrate_table_task",
retry_on_error=True,
max_retries=3,
retry_delay=60
)
async def migrate_table_task(
table_name: str,
schedule_id: int,
metadata_id: int,
life_table_name: Optional[str] = None,
uses_life: bool = False,
full_reload: bool = False,
batch_id: Optional[str] = None,
send_email: bool = False
) -> Dict[str, Any]:
"""
Асинхронная задача миграции.
Здесь вызывается синхронный код миграции (через asyncio.to_thread, чтобы не блокировать).
"""
from app.services.migrator import migrator
from app.core.logging import migration_logger
migration_logger.info(f"Запуск миграции: {table_name} (life_table_name={life_table_name}, uses_life={uses_life}, full_reload={full_reload}, batch_id={batch_id})")
try:
# Запускаем синхронную функцию в отдельном потоке, чтобы не блокировать event loop
result = await asyncio.to_thread(
migrator.run_migration,
table_name=table_name,
schedule_id=schedule_id,
metadata_id=metadata_id,
life_table_name=life_table_name,
uses_life=uses_life,
full_reload=full_reload,
send_email=False
)
task_result = {"success": result, "table": table_name, "schedule_id": schedule_id}
# Если есть batch_id — отмечаем выполнение
if batch_id:
completed = await task_tracker.mark_completed(batch_id, task_result)
status = await task_tracker.get_batch_status(batch_id)
migration_logger.info(f"Задача завершена: {completed}/{status['total']} в батче {batch_id}")
# Если все задачи выполнены — запускаем отправку email
if completed >= status["total"] and send_email:
await send_batch_email_task.kiq(batch_id=batch_id)
return task_result
except Exception as e:
migration_logger.error(f"Ошибка миграции {table_name}: {e}")
# Даже при ошибке отмечаем задачу
if batch_id:
await task_tracker.mark_completed(
batch_id,
{"success": False, "table": table_name, "error": str(e)}
)
raise # для retry
# ---------- Задача для отправки email ----------
@broker.task(task_name="send_batch_email_task")
async def send_batch_email_task(batch_id: str) -> Dict[str, Any]:
"""Отправить сводный email после завершения всех задач"""
from app.core.logging import migration_logger
from app.services.task_tracker import task_tracker
from app.utils.email_sender import email_sender
migration_logger.info(f"📧 Отправка сводного email для батча {batch_id}")
status = await task_tracker.get_batch_status(batch_id)
# Формируем отчёт
successful = sum(1 for r in status["results"] if r.get("success"))
failed = len(status["results"]) - successful
email_data = {
"batch_id": batch_id,
"total": status["total"],
"completed": status["completed"],
"successful": successful,
"failed": failed,
"results": status["results"]
}
# Отправляем email
await email_sender.send_migration_summary_email(email_data)
migration_logger.info(f"Сводный email отправлен: {successful} успешно, {failed} ошибок")
return {"email_sent": True, "batch_id": batch_id}
# ---------- Задача для проверки расписания ----------
@broker.task(task_name="check_schedules_task")
async def check_schedules_task():
"""
Задача-планировщик: проверяет расписания и запускает батчи.
Запускается по cron (каждую минуту).
"""
from app.services.scheduler import scheduler
await scheduler.check_and_run_schedules()