91 lines
3.8 KiB
Python
91 lines
3.8 KiB
Python
import asyncio
|
||
import redis.asyncio as redis
|
||
from taskiq import ScheduledTask
|
||
|
||
from app.taskiq.broker import broker, scheduler, schedule_source
|
||
from app.core.logging import migration_logger
|
||
|
||
REDIS_URL = "redis://127.0.0.1:6379"
|
||
PREFIX = "migration_schedule"
|
||
|
||
async def sync_schedules_to_redis():
|
||
"""Полная перезапись расписаний в Redis на основе данных из БД."""
|
||
migration_logger.info("Синхронизация расписаний с Redis...")
|
||
|
||
# 1. Получаем все активные расписания из БД через database.py
|
||
from app.core.database import db_connector
|
||
from app.models.replication import ReplicationSchedule
|
||
|
||
# Инициализируем engine и session (обращение к свойству создает sessionmaker)
|
||
_ = db_connector.dst_engine
|
||
SessionLocal = db_connector.dst_session
|
||
session = SessionLocal()
|
||
|
||
try:
|
||
schedules = session.query(ReplicationSchedule).filter(
|
||
ReplicationSchedule.enabled == True
|
||
).all()
|
||
|
||
# 2. Очищаем все старые ключи расписаний в Redis
|
||
r = await redis.from_url(REDIS_URL)
|
||
keys = await r.keys(f"{PREFIX}:*")
|
||
if keys:
|
||
await r.delete(*keys)
|
||
await r.close()
|
||
|
||
# Задача для проверки расписания
|
||
checker_task = ScheduledTask(
|
||
task_name="check_schedules_task",
|
||
labels={},
|
||
cron="* * * * *", # ← Каждую минуту
|
||
cron_offset='Asia/Tokyo',
|
||
args=[],
|
||
kwargs={}
|
||
)
|
||
await schedule_source.add_schedule(checker_task)
|
||
|
||
# 3. Добавляем каждое расписание через add_schedule
|
||
# added = 0
|
||
# for schedule in schedules:
|
||
# hour = schedule.schedule_time.hour
|
||
# minute = schedule.schedule_time.minute
|
||
# days_list = schedule.days_list # список чисел 0..6
|
||
|
||
# # Формируем cron-выражение
|
||
# if days_list:
|
||
# cron_expr = f"{minute} {hour} * * {','.join(map(str, days_list))}"
|
||
# else:
|
||
# cron_expr = f"{minute} {hour} * * *"
|
||
|
||
# # Используем timezone string для cron_offset (UTC+9 = Asia/Tokyo)
|
||
# task = ScheduledTask(
|
||
# task_name="migrate_table_task",
|
||
# labels={},
|
||
# cron=cron_expr,
|
||
# cron_offset='Asia/Tokyo', # Timezone name для UTC+9
|
||
# args=[],
|
||
# kwargs={
|
||
# "table_name": schedule.table.table_name,
|
||
# "schedule_id": schedule.id,
|
||
# "metadata_id": schedule.metadata_id,
|
||
# "life_table_name": schedule.table.life_table_name,
|
||
# "uses_life": schedule.table.has_use_life(),
|
||
# "full_reload": schedule.full_reload
|
||
# }
|
||
# )
|
||
|
||
# # Добавляем в Redis через источник
|
||
# await schedule_source.add_schedule(task)
|
||
# added += 1
|
||
# migration_logger.info(f"Добавлено: {schedule.table.table_name} в {hour:02d}:{minute:02d} (cron: {cron_expr})")
|
||
|
||
# migration_logger.info(f"Синхронизировано {added} активных расписаний")
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
async def on_scheduler_startup():
|
||
"""Хук для синхронизации расписаний при запуске планировщика."""
|
||
migration_logger.info("Запуск планировщика taskiq...")
|
||
await sync_schedules_to_redis()
|
||
migration_logger.info("Планировщик готов к работе") |