Небольшие изменения

This commit is contained in:
brusnitsyn
2026-03-13 17:11:39 +09:00
parent c201d36ae6
commit de2dd82fa1
18 changed files with 1140 additions and 491 deletions

View File

@@ -0,0 +1,125 @@
# app/services/batch_runner.py
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.replication import ReplicationSchedule
from app.core.logging import migration_logger
from app.services.task_tracker import task_tracker
from app.taskiq.broker import migrate_table_task
async def get_tables_to_run(session: AsyncSession, check_time: Optional[datetime] = None) -> List[Dict]:
"""
Получить список таблиц, которые должны запуститься сейчас.
Возвращает список словарей с параметрами для задачи миграции.
"""
if check_time is None:
check_time = datetime.now()
current_hour = check_time.hour
current_minute = check_time.minute
current_weekday = check_time.weekday() # 0=Monday, 6=Sunday
migration_logger.info(
f"Поиск расписаний для запуска: "
f"время={check_time.strftime('%H:%M')}, день недели={current_weekday}"
)
# Загружаем все активные расписания с связанными метаданными
from sqlalchemy.orm import selectinload
result = await session.execute(
select(ReplicationSchedule)
.options(selectinload(ReplicationSchedule.table))
.where(ReplicationSchedule.enabled == True)
)
schedules = result.scalars().all()
tables_to_run = []
for schedule in schedules:
# Проверяем день недели
if current_weekday not in schedule.days_list:
migration_logger.debug(f"Пропуск {schedule.id}: не сегодня (days={schedule.days_list})")
continue
# Проверяем время
schedule_hour = schedule.schedule_time.hour
schedule_minute = schedule.schedule_time.minute
time_match = (
current_hour == schedule_hour and
current_minute == schedule_minute
)
if not time_match:
migration_logger.debug(
f"Пропуск {schedule.id}: время не совпадает "
f"({schedule_hour}:{schedule_minute} vs {current_hour}:{current_minute})"
)
continue
# Порог для last_run (5 минут назад)
recent_threshold = check_time - timedelta(minutes=5)
if schedule.last_run and schedule.last_run >= recent_threshold:
migration_logger.debug(f"⏭️ Пропуск {schedule.id}: последний запуск {schedule.last_run}")
continue
# Таблица должна запуститься сейчас
metadata = schedule.table
tables_to_run.append({
"table_name": str(metadata.table_name) if metadata.table_name else "",
"schedule_id": int(schedule.id) if schedule.id else 0,
"metadata_id": int(schedule.metadata_id) if schedule.metadata_id else 0,
"life_table_name": str(metadata.life_table_name) if metadata.life_table_name else None,
"uses_life": bool(metadata.life_table_name is not None),
"full_reload": bool(schedule.full_reload) if schedule.full_reload is not None else False,
})
migration_logger.info(f"Добавлено в батч: {metadata.table_name} (schedule_id={schedule.id})")
migration_logger.info(f"Всего таблиц для запуска: {len(tables_to_run)}")
return tables_to_run
async def run_migration_batch(tables: List[Dict], batch_id: Optional[str] = None) -> Optional[str]:
"""
Запустить миграцию нескольких таблиц в одном батче.
Возвращает batch_id для отслеживания.
"""
if not tables:
migration_logger.warning("Нет таблиц для запуска")
return None
if batch_id is None:
batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
migration_logger.info(f"Запуск батча {batch_id} ({len(tables)} таблиц)")
# Инициализируем трекер
await task_tracker.init_batch(batch_id, len(tables))
# Запускаем все задачи
for i, table_config in enumerate(tables):
try:
await migrate_table_task.kiq(
**table_config,
batch_id=batch_id,
send_email=True # Последняя задача отправит email
)
migration_logger.debug(f"Задача {i+1}/{len(tables)} отправлена: {table_config['table_name']}")
except Exception as e:
migration_logger.error(f"Ошибка отправки задачи {table_config['table_name']}: {e}")
# Отмечаем как выполненную с ошибкой
await task_tracker.mark_completed(
batch_id,
{"success": False, "table": table_config["table_name"], "error": str(e)}
)
migration_logger.info(f"Батч {batch_id} запущен")
return batch_id

View File

@@ -3,6 +3,7 @@ from typing import Optional, List, Dict, Any
import traceback
from datetime import datetime
import pandas as pd
from app.models.replication import ReplicationSchedule
from app.services.replication_state import replication_state
from app.services.data_reader import data_reader
from app.services.data_writer import data_writer
@@ -88,13 +89,12 @@ class DatabaseMigrator:
parsed = self._parse_table_name(table_name)
return f"{parsed['basename']}ID"
def migrate_table_by_time(self, table_name: str, last_sync_time: datetime) -> Dict[str, int]:
def migrate_table_by_time(self, table_name: str, life_table_name: str, last_sync_time: datetime) -> Dict[str, int]:
"""Миграция таблицы через Life-механизм по времени"""
life_table = self._get_life_table_name(table_name)
base_id_field = self._get_base_id_field(table_name)
life_id_field = self._get_life_id_field(table_name)
migration_logger.info(f"Миграция {table_name} через {life_table} с {last_sync_time}")
migration_logger.info(f"Миграция {table_name} через {life_table_name} с {last_sync_time}")
stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0}
@@ -105,12 +105,12 @@ class DatabaseMigrator:
SELECT
{base_id_field},
MAX({life_id_field}) as MaxLifeID
FROM {life_table}
FROM {life_table_name}
WHERE x_DateTime > CAST(? AS datetime)
GROUP BY {base_id_field}
)
SELECT dl.*
FROM {life_table} dl
FROM {life_table_name} dl
INNER JOIN LatestLife ll
ON dl.{life_id_field} = ll.MaxLifeID
"""
@@ -211,28 +211,24 @@ class DatabaseMigrator:
return result
def migrate_table(self, table_name: str, full_reload: bool = False) -> bool:
def migrate_table(self, table_name: str, schedule_id: int, metadata_id: int, life_table_name: Optional[str], uses_life: bool = False, full_reload: bool = False) -> bool:
"""Миграция одной таблицы (поддерживает и ID, и Life)"""
migration_logger.table_start(table_name)
self.current_table = table_name
table_start_time = datetime.now()
try:
# Получаем ID колонку для статистики
id_column = get_primary_key(table_name)
# Определяем, использует ли таблица Life-механизм
uses_life = table_name in self.life_tables
if uses_life and not full_reload:
if uses_life and not full_reload and life_table_name:
# МИГРАЦИЯ ЧЕРЕЗ LIFE-ТАБЛИЦУ ПО ВРЕМЕНИ
last_sync = self.state.get_table_last_sync(table_name)
last_sync = self.state.get_table_last_sync(metadata_id)
if last_sync:
stats = self.migrate_table_by_time(table_name, last_sync)
stats = self.migrate_table_by_time(table_name, life_table_name, last_sync)
# Обновляем время синхронизации
self.state.update_table_sync_time(table_name)
self.state.update_table_sync_time(schedule_id)
# Обновляем статистику
if id_column:
@@ -340,7 +336,7 @@ class DatabaseMigrator:
})
return False
def _incremental_by_id(self, table_name: str) -> bool:
def _incremental_by_id(self, table_name: str, metadata) -> bool:
"""Инкрементальная загрузка по ID (для таблиц без Life)"""
migration_logger.info(f"Инкрементальная загрузка {table_name} по ID")
@@ -419,11 +415,11 @@ class DatabaseMigrator:
def create_all_foreign_keys(self):
"""Создать все внешние ключи после завершения миграции"""
if not self.all_foreign_keys:
migration_logger.info(" Нет внешних ключей для создания")
migration_logger.info("Нет внешних ключей для создания")
return
migration_logger.info("="*60)
migration_logger.info("🔗 СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ")
migration_logger.info("СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ")
migration_logger.info("="*60)
for table_name, foreign_keys in self.all_foreign_keys.items():
@@ -448,16 +444,17 @@ class DatabaseMigrator:
'time': datetime.now()
})
def run_migration(self, tables: Optional[List[str]] = None, full_reload: bool = False, send_email: bool = True):
"""Запуск миграции для всех таблиц"""
def run_migration(
self, table_name: str, schedule_id: int, metadata_id: int,
life_table_name: Optional[str] = None, uses_life: bool = False,
full_reload: bool = False, send_email: bool = True
):
"""Запуск миграции таблицы"""
self.is_running = True
self.start_time = datetime.now()
self.all_foreign_keys = {}
self.errors = []
if tables is None:
tables = settings.TABLES_TO_COPY
last_replication = self.state.get_last_replication_time()
migration_logger.info("="*70)
@@ -465,19 +462,16 @@ class DatabaseMigrator:
migration_logger.info(f"Время старта: {self.start_time}")
if last_replication:
migration_logger.info(f"Последняя миграция: {last_replication}")
migration_logger.info(f"Таблиц для обработки: {len(tables)}")
migration_logger.info(f"Таблица для обработки: {table_name}")
migration_logger.info(f"Режим: {'ПОЛНАЯ' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'}")
migration_logger.info(f"Таблицы с Life-механизмом: {self.life_tables}")
migration_logger.info("="*70)
results = {}
for i, table_name in enumerate(tables, 1):
if not self.is_running:
migration_logger.warning("Миграция остановлена пользователем")
break
if not self.is_running:
migration_logger.warning("Миграция остановлена пользователем")
return
migration_logger.info(f"\n[{i}/{len(tables)}] Обработка таблицы {table_name}")
results[table_name] = self.migrate_table(table_name, full_reload)
migration_logger.info(f"Обработка таблицы {table_name}")
results = self.migrate_table(table_name, schedule_id, metadata_id, life_table_name, uses_life, full_reload)
# Создаем внешние ключи после всех таблиц
self.create_all_foreign_keys()
@@ -494,21 +488,20 @@ class DatabaseMigrator:
self.is_running = False
return results
def _log_final_stats(self, results: dict, stats: dict, total_time: float):
def _log_final_stats(self, has_migrated: bool, stats: dict, total_time: float):
"""Логирует финальную статистику"""
migration_logger.info("="*70)
migration_logger.info("ИТОГОВАЯ СТАТИСТИКА")
migration_logger.info("="*70)
migration_logger.info(f"Успешно: {sum(1 for r in results.values() if r)}/{len(results)}")
migration_logger.info(f"Ошибок: {len(self.errors)}")
migration_logger.info(f"Всего строк в БД: {stats.get('total_rows', 0)}")
migration_logger.info(f"Общее время: {total_time:.1f}с")
migration_logger.info("="*70)
def _send_notification(self, results: dict, stats: dict, total_time: float):
def _send_notification(self, has_migrated: bool, stats: dict, total_time: float):
"""Отправляет уведомление о результате"""
if self.errors:
error_body = self._build_error_email_body(results, stats, total_time)
error_body = self._build_error_email_body(has_migrated, stats, total_time)
email_sender.send_email(
subject=f"МИГРАЦИЯ С ОШИБКАМИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
body=error_body
@@ -516,10 +509,10 @@ class DatabaseMigrator:
# else:
# email_sender.send_success_notification(stats, total_time)
def _build_error_email_body(self, results: dict, stats: dict, total_time: float) -> str:
def _build_error_email_body(self, has_migrated: bool, stats: dict, total_time: float) -> str:
"""Строит тело письма с ошибками"""
body = f"""
🚨 МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ
МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ
{'='*60}
Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
@@ -527,7 +520,6 @@ class DatabaseMigrator:
СТАТИСТИКА:
{'='*40}
Успешно: {sum(1 for r in results.values() if r)}/{len(results)}
Ошибок: {len(self.errors)}
Всего строк: {stats.get('total_rows', 0)}
@@ -584,7 +576,7 @@ class DatabaseMigrator:
# Поэтому используем отдельный метод для установки
replication_state._set_table_total_rows(table_name, dst_stats['total_rows'])
migration_logger.info(f" Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}")
migration_logger.info(f"Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}")
# Логируем операцию
self.state.log_operation(

View File

@@ -2,6 +2,7 @@ import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
from app.models.replication import ReplicationSchedule
from app.repository.replication_metadata_repo import replication_metadata_repo
from app.core.logging import migration_logger
from app.core.config import settings
@@ -31,9 +32,9 @@ class ReplicationState:
'tables_count': stats['tables_count']
}
def get_table_last_sync(self, table_name: str) -> Optional[datetime]:
def get_table_last_sync(self, metadata_id: int) -> Optional[datetime]:
"""Получает время последней синхронизации таблицы"""
return self.replication_repo.get_last_sync_time(table_name)
return self.replication_repo.get_last_sync_time(metadata_id)
def get_last_id(self, table_name: str) -> Optional[int]:
"""Получает последний обработанный ID для таблицы"""
@@ -43,9 +44,9 @@ class ReplicationState:
"""Обновляет последний обработанный ID"""
self.replication_repo.update_last_id(table_name, last_id)
def update_table_sync_time(self, table_name: str):
def update_table_sync_time(self, schedule_id: int):
"""Обновляет время синхронизации таблицы"""
self.replication_repo.update_sync_time(table_name)
self.replication_repo.update_sync_time(schedule_id)
def update_table_stats(self, table_name: str, added_rows: int):
"""Обновляет статистику таблицы"""
@@ -69,15 +70,16 @@ class ReplicationState:
session = self.replication_repo.get_session()
try:
metadata = session.query(ReplicationMetadata).filter_by(
metadatas = session.query(ReplicationMetadata).filter_by(
table_name=table_name
).first()
).all()
if metadata:
metadata.total_rows = total_rows
metadata.updated_at = datetime.now()
session.commit()
migration_logger.debug(f" Установлено total_rows={total_rows} для {table_name}")
if metadatas:
for metadata in metadatas:
metadata.total_rows = total_rows
metadata.updated_at = datetime.now()
session.commit()
migration_logger.debug(f"Установлено total_rows={total_rows} для {table_name}")
else:
migration_logger.warning(f"Метаданные для {table_name} не найдены")
except Exception as e:

View File

@@ -7,9 +7,12 @@ import time as time_module
from app.core.logging import migration_logger
from app.core.config import settings
from app.models.replication import ReplicationSchedule
from app.services.batch_runner import get_tables_to_run, run_migration_batch
from app.services.migrator import migrator
from app.repository.replication_metadata_repo import replication_metadata_repo
from app.utils.email_sender import email_sender
from app.core.database import db_connector
class MigrationScheduler:
@@ -24,7 +27,8 @@ class MigrationScheduler:
def _init_default_schedules(self):
"""Инициализация расписаний по умолчанию"""
self.repo.init_default_schedules(settings.TABLES_TO_COPY)
metadatas = self.repo.get_all_metadata()
self.repo.init_default_schedules(metadatas)
migration_logger.info("Расписания по умолчанию инициализированы")
def set_schedule(self, table_name: str, schedule_time: str = "00:00",
@@ -71,95 +75,35 @@ class MigrationScheduler:
"""Включить расписание"""
self.repo.enable_schedule(table_name)
def delete_schedule(self, table_name: str):
def delete_schedule(self, schedule_id: int):
"""Удалить расписание"""
self.repo.delete_schedule(table_name)
self.repo.delete_schedule(schedule_id)
def get_due_tables(self, current_time: Optional[datetime] = None) -> List:
def get_due_tables(self, current_time: Optional[datetime] = None) -> List[ReplicationSchedule]:
"""Получить таблицы для запуска сейчас"""
due = self.repo.get_due_schedules(current_time)
return due
async def run_due_migrations(self):
"""Запустить миграции по расписанию"""
due_schedules = self.get_due_tables()
async def check_and_run_schedules(self):
"""
Проверить расписания и запустить задачи.
Вызывается по cron (например, каждую минуту).
"""
migration_logger.info("Проверка расписаний миграции...")
if not due_schedules:
return
try:
async with db_connector.async_dst_session() as session:
tables = await get_tables_to_run(session)
if tables:
batch_id = await run_migration_batch(tables)
migration_logger.info(f"Батч запущен: {batch_id}")
else:
migration_logger.info("Нет задач для запуска в это время")
migration_logger.info(f"Найдено {len(due_schedules)} таблиц для миграции по расписанию")
for schedule in due_schedules:
try:
days_str = ', '.join(schedule.days_display)
migration_logger.info(
f"Запуск миграции по расписанию для {schedule.table_name} "
f"в {schedule.schedule_time.strftime('%H:%M')} [{days_str}]"
)
# Запускаем миграцию
result = await asyncio.to_thread(
migrator.run_migration,
tables=[schedule.table_name],
full_reload=schedule.full_reload,
send_email=True
)
# Обновляем время последнего запуска
self.repo.update_schedule_last_run(schedule.table_name)
# Логируем успешный запуск
self.repo.log_operation(
table_name=schedule.table_name,
operation='SCHEDULED',
records_count=0,
status='SUCCESS'
)
migration_logger.info(f"Миграция по расписанию для {schedule.table_name} завершена")
except Exception as e:
error_msg = f"Ошибка при миграции по расписанию для {schedule.table_name}: {e}"
migration_logger.error(error_msg)
# Логируем ошибку
self.repo.log_operation(
table_name=schedule.table_name,
operation='SCHEDULED',
records_count=0,
status='ERROR',
error_message=str(e)[:500]
)
email_sender.send_error_notification(
error_message=error_msg,
table_name=schedule.table_name
)
def start_scheduler(self):
"""Запустить планировщик"""
self.running = True
migration_logger.info("Планировщик миграций запущен")
while self.running:
try:
# Проверяем, есть ли таблицы для миграции
asyncio.run(self.run_due_migrations())
# Ждем 60 секунд до следующей проверки
time_module.sleep(60)
except KeyboardInterrupt:
self.stop_scheduler()
break
except Exception as e:
migration_logger.error(f"Ошибка в планировщике: {e}")
time_module.sleep(60)
def stop_scheduler(self):
"""Остановить планировщик"""
self.running = False
migration_logger.info("Планировщик миграций остановлен")
except Exception as e:
migration_logger.error(f"Ошибка проверки расписаний: {e}")
raise
# Глобальный экземпляр

View File

@@ -0,0 +1,56 @@
# app/services/task_tracker.py
import redis.asyncio as redis
from typing import Optional
import json
class TaskTracker:
def __init__(self, redis_url: str = "redis://127.0.0.1:6379/0"):
self.redis_url = redis_url
async def _get_redis(self):
return await redis.from_url(self.redis_url)
async def init_batch(self, batch_id: str, total_tasks: int):
"""Инициализировать пакет задач"""
r = await self._get_redis()
await r.set(f"replicator_batch:{batch_id}:total", total_tasks)
await r.set(f"replicator_batch:{batch_id}:completed", 0)
await r.set(f"replicator_batch:{batch_id}:results", json.dumps([]))
await r.expire(f"replicator_batch:{batch_id}:total", 86400)
await r.expire(f"replicator_batch:{batch_id}:completed", 86400)
await r.expire(f"replicator_batch:{batch_id}:results", 86400)
await r.close()
async def mark_completed(self, batch_id: str, result: dict) -> int:
"""Отметить задачу как завершённую и вернуть количество выполненных"""
r = await self._get_redis()
completed = await r.incr(f"replicator_batch:{batch_id}:completed")
# Сохраняем результат задачи
results = json.loads(await r.get(f"replicator_batch:{batch_id}:results") or "[]")
results.append(result)
await r.set(f"replicator_batch:{batch_id}:results", json.dumps(results))
await r.close()
return completed
async def get_batch_status(self, batch_id: str) -> dict:
"""Получить статус пакета"""
r = await self._get_redis()
total = await r.get(f"replicator_batch:{batch_id}:total")
completed = await r.get(f"replicator_batch:{batch_id}:completed")
results = await r.get(f"replicator_batch:{batch_id}:results")
await r.close()
return {
"total": int(total) if total else 0,
"completed": int(completed) if completed else 0,
"results": json.loads(results) if results else []
}
async def is_batch_complete(self, batch_id: str) -> bool:
"""Проверить, завершены ли все задачи"""
status = await self.get_batch_status(batch_id)
return status["completed"] >= status["total"]
task_tracker = TaskTracker()