# app/services/batch_runner.py from datetime import datetime, timedelta from typing import List, Dict, Optional import uuid from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.replication import ReplicationSchedule from app.core.logging import migration_logger from app.services.task_tracker import task_tracker from app.taskiq.broker import migrate_table_task async def get_tables_to_run(session: AsyncSession, check_time: Optional[datetime] = None) -> List[Dict]: """ Получить список таблиц, которые должны запуститься сейчас. Возвращает список словарей с параметрами для задачи миграции. """ if check_time is None: check_time = datetime.now() current_hour = check_time.hour current_minute = check_time.minute current_weekday = check_time.weekday() # 0=Monday, 6=Sunday migration_logger.info( f"Поиск расписаний для запуска: " f"время={check_time.strftime('%H:%M')}, день недели={current_weekday}" ) # Загружаем все активные расписания с связанными метаданными from sqlalchemy.orm import selectinload result = await session.execute( select(ReplicationSchedule) .options(selectinload(ReplicationSchedule.table)) .where(ReplicationSchedule.enabled == True) ) schedules = result.scalars().all() tables_to_run = [] for schedule in schedules: # Проверяем день недели if current_weekday not in schedule.days_list: migration_logger.debug(f"Пропуск {schedule.id}: не сегодня (days={schedule.days_list})") continue # Проверяем время schedule_hour = schedule.schedule_time.hour schedule_minute = schedule.schedule_time.minute time_match = ( current_hour == schedule_hour and current_minute == schedule_minute ) if not time_match: migration_logger.debug( f"Пропуск {schedule.id}: время не совпадает " f"({schedule_hour}:{schedule_minute} vs {current_hour}:{current_minute})" ) continue # Порог для last_run (5 минут назад) recent_threshold = check_time - timedelta(minutes=5) if schedule.last_run and schedule.last_run >= recent_threshold: migration_logger.debug(f"⏭️ Пропуск {schedule.id}: последний запуск {schedule.last_run}") continue # Таблица должна запуститься сейчас metadata = schedule.table tables_to_run.append({ "table_name": str(metadata.table_name) if metadata.table_name else "", "schedule_id": int(schedule.id) if schedule.id else 0, "metadata_id": int(schedule.metadata_id) if schedule.metadata_id else 0, "life_table_name": str(metadata.life_table_name) if metadata.life_table_name else None, "uses_life": bool(metadata.life_table_name is not None), "full_reload": bool(schedule.full_reload) if schedule.full_reload is not None else False, }) migration_logger.info(f"Добавлено в батч: {metadata.table_name} (schedule_id={schedule.id})") migration_logger.info(f"Всего таблиц для запуска: {len(tables_to_run)}") return tables_to_run async def run_migration_batch(tables: List[Dict], batch_id: Optional[str] = None) -> Optional[str]: """ Запустить миграцию нескольких таблиц в одном батче. Возвращает batch_id для отслеживания. """ if not tables: migration_logger.warning("Нет таблиц для запуска") return None if batch_id is None: batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" migration_logger.info(f"Запуск батча {batch_id} ({len(tables)} таблиц)") # Инициализируем трекер await task_tracker.init_batch(batch_id, len(tables)) # Запускаем все задачи for i, table_config in enumerate(tables): try: await migrate_table_task.kiq( **table_config, batch_id=batch_id, send_email=True # Последняя задача отправит email ) migration_logger.debug(f"Задача {i+1}/{len(tables)} отправлена: {table_config['table_name']}") except Exception as e: migration_logger.error(f"Ошибка отправки задачи {table_config['table_name']}: {e}") # Отмечаем как выполненную с ошибкой await task_tracker.mark_completed( batch_id, {"success": False, "table": table_config["table_name"], "error": str(e)} ) migration_logger.info(f"Батч {batch_id} запущен") return batch_id