125 lines
5.3 KiB
Python
125 lines
5.3 KiB
Python
# app/services/batch_runner.py
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Dict, Optional
|
||
import uuid
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.models.replication import ReplicationSchedule
|
||
from app.core.logging import migration_logger
|
||
from app.services.task_tracker import task_tracker
|
||
from app.taskiq.broker import migrate_table_task
|
||
|
||
|
||
async def get_tables_to_run(session: AsyncSession, check_time: Optional[datetime] = None) -> List[Dict]:
|
||
"""
|
||
Получить список таблиц, которые должны запуститься сейчас.
|
||
|
||
Возвращает список словарей с параметрами для задачи миграции.
|
||
"""
|
||
if check_time is None:
|
||
check_time = datetime.now()
|
||
|
||
current_hour = check_time.hour
|
||
current_minute = check_time.minute
|
||
current_weekday = check_time.weekday() # 0=Monday, 6=Sunday
|
||
|
||
migration_logger.info(
|
||
f"Поиск расписаний для запуска: "
|
||
f"время={check_time.strftime('%H:%M')}, день недели={current_weekday}"
|
||
)
|
||
|
||
# Загружаем все активные расписания с связанными метаданными
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
result = await session.execute(
|
||
select(ReplicationSchedule)
|
||
.options(selectinload(ReplicationSchedule.table))
|
||
.where(ReplicationSchedule.enabled == True)
|
||
)
|
||
schedules = result.scalars().all()
|
||
|
||
tables_to_run = []
|
||
|
||
for schedule in schedules:
|
||
# Проверяем день недели
|
||
if current_weekday not in schedule.days_list:
|
||
migration_logger.debug(f"Пропуск {schedule.id}: не сегодня (days={schedule.days_list})")
|
||
continue
|
||
|
||
# Проверяем время
|
||
schedule_hour = schedule.schedule_time.hour
|
||
schedule_minute = schedule.schedule_time.minute
|
||
|
||
time_match = (
|
||
current_hour == schedule_hour and
|
||
current_minute == schedule_minute
|
||
)
|
||
|
||
if not time_match:
|
||
migration_logger.debug(
|
||
f"Пропуск {schedule.id}: время не совпадает "
|
||
f"({schedule_hour}:{schedule_minute} vs {current_hour}:{current_minute})"
|
||
)
|
||
continue
|
||
|
||
# Порог для last_run (5 минут назад)
|
||
recent_threshold = check_time - timedelta(minutes=5)
|
||
|
||
if schedule.last_run and schedule.last_run >= recent_threshold:
|
||
migration_logger.debug(f"⏭️ Пропуск {schedule.id}: последний запуск {schedule.last_run}")
|
||
continue
|
||
|
||
# Таблица должна запуститься сейчас
|
||
metadata = schedule.table
|
||
tables_to_run.append({
|
||
"table_name": str(metadata.table_name) if metadata.table_name else "",
|
||
"schedule_id": int(schedule.id) if schedule.id else 0,
|
||
"metadata_id": int(schedule.metadata_id) if schedule.metadata_id else 0,
|
||
"life_table_name": str(metadata.life_table_name) if metadata.life_table_name else None,
|
||
"uses_life": bool(metadata.life_table_name is not None),
|
||
"full_reload": bool(schedule.full_reload) if schedule.full_reload is not None else False,
|
||
})
|
||
|
||
migration_logger.info(f"Добавлено в батч: {metadata.table_name} (schedule_id={schedule.id})")
|
||
|
||
migration_logger.info(f"Всего таблиц для запуска: {len(tables_to_run)}")
|
||
return tables_to_run
|
||
|
||
async def run_migration_batch(tables: List[Dict], batch_id: Optional[str] = None) -> Optional[str]:
|
||
"""
|
||
Запустить миграцию нескольких таблиц в одном батче.
|
||
|
||
Возвращает batch_id для отслеживания.
|
||
"""
|
||
if not tables:
|
||
migration_logger.warning("Нет таблиц для запуска")
|
||
return None
|
||
|
||
if batch_id is None:
|
||
batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
|
||
|
||
migration_logger.info(f"Запуск батча {batch_id} ({len(tables)} таблиц)")
|
||
|
||
# Инициализируем трекер
|
||
await task_tracker.init_batch(batch_id, len(tables))
|
||
|
||
# Запускаем все задачи
|
||
for i, table_config in enumerate(tables):
|
||
try:
|
||
await migrate_table_task.kiq(
|
||
**table_config,
|
||
batch_id=batch_id,
|
||
send_email=True # Последняя задача отправит email
|
||
)
|
||
migration_logger.debug(f"Задача {i+1}/{len(tables)} отправлена: {table_config['table_name']}")
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка отправки задачи {table_config['table_name']}: {e}")
|
||
# Отмечаем как выполненную с ошибкой
|
||
await task_tracker.mark_completed(
|
||
batch_id,
|
||
{"success": False, "table": table_config["table_name"], "error": str(e)}
|
||
)
|
||
|
||
migration_logger.info(f"Батч {batch_id} запущен")
|
||
return batch_id |