390 lines
15 KiB
Python
390 lines
15 KiB
Python
from datetime import datetime
|
||
from typing import Optional
|
||
from fastapi import APIRouter, HTTPException, BackgroundTasks, Query
|
||
from app.models.replication import ReplicationMetadata
|
||
from app.repository.replication_metadata_repo import replication_metadata_repo
|
||
from app.services.scheduler import scheduler
|
||
from app.services.migrator import migrator
|
||
from app.services.replication_state import replication_state
|
||
from app.core.logging import migration_logger
|
||
from app.core.config import settings
|
||
from app.utils.email_sender import email_sender
|
||
from app.taskiq.broker import refresh_schedules
|
||
|
||
router = APIRouter(prefix="/api/v1")
|
||
|
||
@router.get("/replication/last")
|
||
async def get_last_replication():
|
||
"""Получить информацию о последней репликации (максимальное время по всем таблицам)"""
|
||
return replication_state.get_last_replication_info()
|
||
|
||
|
||
@router.get("/replication/tables")
|
||
async def get_tables_status():
|
||
"""Получить статус всех таблиц (из replication_metadata)"""
|
||
stats = replication_state.get_all_stats()
|
||
|
||
# Форматируем для API
|
||
result = []
|
||
for table in stats['tables']:
|
||
result.append({
|
||
"table": table['name'],
|
||
"last_id": table['last_id'],
|
||
"rows_count": table['rows'],
|
||
"last_sync": table['last_sync'].isoformat() if table['last_sync'] else None,
|
||
"active": table['active']
|
||
})
|
||
|
||
return {
|
||
"total_rows": stats['total_rows'],
|
||
"tables_count": stats['tables_count'],
|
||
"active_tables": stats['active_tables'],
|
||
"tables": result
|
||
}
|
||
|
||
|
||
@router.get("/replication/tables/{metadata_id}")
|
||
async def get_table_status(metadata_id: int):
|
||
"""Получить статус конкретной таблицы"""
|
||
from app.repository.replication_metadata_repo import replication_metadata_repo
|
||
|
||
metadata = replication_metadata_repo.get_table_metadata(metadata_id)
|
||
if not metadata:
|
||
raise HTTPException(status_code=404, detail=f"Метадата с ID={metadata_id} не найдена")
|
||
|
||
return {
|
||
"table": metadata.table_name,
|
||
"last_id": metadata.last_id,
|
||
"last_sync_time": metadata.last_sync_time.isoformat() if metadata.last_sync_time else None,
|
||
"total_rows": metadata.total_rows,
|
||
"is_active": metadata.is_active,
|
||
"created_at": metadata.created_at.isoformat() if metadata.created_at else None,
|
||
"updated_at": metadata.updated_at.isoformat() if metadata.updated_at else None,
|
||
"last_error": metadata.last_error
|
||
}
|
||
|
||
|
||
@router.post("/replication/tables")
|
||
async def add_metadata(
|
||
table_name: str = Query(None, description="Наименование таблицы"),
|
||
life_table_name: Optional[str] = Query(None, description="Наименование Life таблицы"),
|
||
description: Optional[str] = Query(None, description="Описание"),
|
||
enabled: bool = Query(True, description="Включено"),
|
||
):
|
||
"""Добавить метадату"""
|
||
try:
|
||
from app.repository.replication_metadata_repo import replication_metadata_repo
|
||
|
||
metadata = replication_metadata_repo.add_metadata(
|
||
table_name=table_name,
|
||
life_table_name=life_table_name,
|
||
description=description,
|
||
enabled=enabled,
|
||
)
|
||
|
||
if metadata:
|
||
return {
|
||
"message": f"Метадата для таблицы {metadata.table_name} создана",
|
||
"metadata": metadata
|
||
}
|
||
else:
|
||
raise HTTPException(status_code=500, detail="Ошибка создания метадаты")
|
||
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
|
||
@router.get("/replication/logs")
|
||
async def get_replication_logs(
|
||
table_name: Optional[str] = None,
|
||
limit: int = Query(100, ge=1, le=1000),
|
||
status: Optional[str] = None
|
||
):
|
||
"""Получить логи репликации"""
|
||
from app.repository.replication_metadata_repo import replication_metadata_repo
|
||
from app.models.replication import ReplicationLog
|
||
|
||
session = replication_metadata_repo.get_session()
|
||
try:
|
||
query = session.query(ReplicationLog).order_by(ReplicationLog.created_at.desc())
|
||
|
||
if table_name:
|
||
query = query.filter(ReplicationLog.table_name == table_name)
|
||
|
||
if status:
|
||
query = query.filter(ReplicationLog.status == status.upper())
|
||
|
||
logs = query.limit(limit).all()
|
||
|
||
return [
|
||
{
|
||
"id": log.id,
|
||
"table_name": log.table_name,
|
||
"operation": log.operation,
|
||
"records_count": log.records_count,
|
||
"status": log.status,
|
||
"error_message": log.error_message,
|
||
"created_at": log.created_at.isoformat()
|
||
}
|
||
for log in logs
|
||
]
|
||
finally:
|
||
session.close()
|
||
|
||
# ==================== РАСПИСАНИЯ ====================
|
||
|
||
@router.get("/schedules")
|
||
async def get_schedules():
|
||
"""Получить все расписания миграций"""
|
||
schedules = replication_metadata_repo.get_all_schedules()
|
||
return [s.to_dict() for s in schedules]
|
||
|
||
|
||
@router.get("/schedules/next-runs")
|
||
async def get_next_runs(limit: int = 10):
|
||
"""Получить следующие запуски"""
|
||
from datetime import timedelta
|
||
|
||
now = datetime.now()
|
||
runs = []
|
||
|
||
# Получаем все расписания
|
||
schedules = replication_metadata_repo.get_all_schedules()
|
||
|
||
for minute_offset in range(60 * 24 * 7): # Проверяем на неделю вперед
|
||
check_time = now + timedelta(minutes=minute_offset)
|
||
check_time_obj = check_time.time()
|
||
check_weekday = check_time.weekday()
|
||
|
||
for schedule in schedules:
|
||
if not schedule.enabled:
|
||
continue
|
||
|
||
# Проверяем совпадение времени и дня
|
||
time_diff = abs(
|
||
(schedule.schedule_time.hour * 60 + schedule.schedule_time.minute) -
|
||
(check_time_obj.hour * 60 + check_time_obj.minute)
|
||
)
|
||
|
||
if time_diff <= 1 and check_weekday in schedule.days_list:
|
||
# Получаем статистику таблицы
|
||
metadata = schedule.table
|
||
|
||
runs.append({
|
||
'table': metadata.table_name,
|
||
'time': check_time.strftime('%Y-%m-%d %H:%M'),
|
||
'day': check_time.strftime('%A'),
|
||
'days_schedule': schedule.days_display,
|
||
'full_reload': schedule.full_reload,
|
||
'rows_count': metadata.total_rows if metadata else 0,
|
||
'last_sync': metadata.last_sync_time.isoformat() if metadata and metadata.last_sync_time else None
|
||
})
|
||
|
||
if len(runs) >= limit:
|
||
break
|
||
|
||
if len(runs) >= limit:
|
||
break
|
||
|
||
return runs[:limit]
|
||
|
||
|
||
# @router.post("/schedules/run-now")
|
||
# async def run_scheduled_now(background_tasks: BackgroundTasks):
|
||
# """Принудительно запустить все запланированные на текущее время миграции"""
|
||
# due = scheduler.get_due_tables()
|
||
# if not due:
|
||
# return {'message': 'Нет таблиц для миграции в текущее время и день'}
|
||
|
||
# for schedule in due:
|
||
# background_tasks.add_task(
|
||
# run_scheduled_migration,
|
||
# schedule.table.table_name,
|
||
# schedule.full_reload
|
||
# )
|
||
|
||
# return {
|
||
# 'message': f'Запущено {len(due)} миграций',
|
||
# 'tables': [
|
||
# {
|
||
# 'name': s.table.table_name,
|
||
# 'time': s.schedule_time.strftime("%H:%M"),
|
||
# 'days': s.days_display,
|
||
# 'full_reload': s.full_reload
|
||
# }
|
||
# for s in due
|
||
# ]
|
||
# }
|
||
|
||
|
||
@router.post("/schedules/{table_name}")
|
||
async def set_schedule(
|
||
metadata_id: int = Query('ID', description="ID метадаты таблицы"),
|
||
schedule_time: str = Query("00:00", description="Время в формате HH:MM"),
|
||
days: Optional[str] = Query(None, description="Дни недели через запятую: пн,вт,ср,чт,пт,сб,вс"),
|
||
full_reload: bool = Query(False, description="Полная перезагрузка"),
|
||
enabled: bool = Query(True, description="Включено"),
|
||
name: Optional[str] = Query(None, description="Название расписания"),
|
||
description: Optional[str] = Query(None, description="Описание")
|
||
):
|
||
"""Добавить новое расписание для таблицы"""
|
||
try:
|
||
# if table_name not in settings.TABLES_TO_COPY:
|
||
# raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена")
|
||
|
||
days_list = None
|
||
if days:
|
||
days_list = [d.strip() for d in days.split(',')]
|
||
|
||
from app.repository.replication_metadata_repo import replication_metadata_repo
|
||
|
||
schedule = replication_metadata_repo.add_schedule(
|
||
metadata_id=metadata_id,
|
||
schedule_time=schedule_time,
|
||
days=days_list,
|
||
full_reload=full_reload,
|
||
enabled=enabled,
|
||
name=name,
|
||
description=description
|
||
)
|
||
|
||
if schedule:
|
||
await refresh_schedules()
|
||
return {
|
||
"message": f"Расписание добавлено",
|
||
"schedule": schedule
|
||
}
|
||
else:
|
||
raise HTTPException(status_code=500, detail="Ошибка добавления расписания")
|
||
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
|
||
@router.get("/schedules/{metadata_id}")
|
||
async def get_table_schedule(metadata_id: int):
|
||
"""Получить расписание для конкретной таблицы"""
|
||
schedules = replication_metadata_repo.get_schedule(metadata_id)
|
||
if not schedules:
|
||
raise HTTPException(status_code=404, detail="Расписание не найдено")
|
||
|
||
# Получаем статистику таблицы
|
||
metadata = replication_metadata_repo.get_table_metadata(metadata_id)
|
||
|
||
result = {}
|
||
result['schedules'] = []
|
||
|
||
for schedule in schedules:
|
||
result['schedules'].append(schedule)
|
||
|
||
if metadata:
|
||
result['table_stats'] = {
|
||
'table_name': metadata.table_name,
|
||
'rows_count': metadata.total_rows,
|
||
'last_sync': metadata.last_sync_time.isoformat() if metadata.last_sync_time else None,
|
||
'last_id': metadata.last_id
|
||
}
|
||
|
||
return result
|
||
|
||
@router.put("/schedules/{schedule_id}")
|
||
async def update_schedule(
|
||
schedule_id: int,
|
||
schedule_time: Optional[str] = Query(None, description="Время в формате HH:MM"),
|
||
days: Optional[str] = Query(None, description="Дни недели через запятую"),
|
||
full_reload: Optional[bool] = Query(None, description="Полная перезагрузка"),
|
||
enabled: Optional[bool] = Query(None, description="Включено"),
|
||
name: Optional[str] = Query(None, description="Название"),
|
||
description: Optional[str] = Query(None, description="Описание")
|
||
):
|
||
"""Обновить существующее расписание по ID"""
|
||
from app.repository.replication_metadata_repo import replication_metadata_repo
|
||
|
||
update_kwargs = {}
|
||
if schedule_time:
|
||
update_kwargs['schedule_time'] = schedule_time
|
||
if days:
|
||
update_kwargs['days'] = [d.strip() for d in days.split(',')]
|
||
if full_reload is not None:
|
||
update_kwargs['full_reload'] = full_reload
|
||
if enabled is not None:
|
||
update_kwargs['enabled'] = enabled
|
||
if name:
|
||
update_kwargs['name'] = name
|
||
if description:
|
||
update_kwargs['description'] = description
|
||
|
||
success = replication_metadata_repo.update_schedule(schedule_id, **update_kwargs)
|
||
|
||
if success:
|
||
await refresh_schedules()
|
||
return {"message": f"Расписание {schedule_id} обновлено"}
|
||
else:
|
||
raise HTTPException(status_code=404, detail=f"Расписание {schedule_id} не найдено")
|
||
|
||
@router.post("/schedules/{schedule_id}/disable")
|
||
async def disable_schedule(schedule_id: int):
|
||
"""Отключить расписание"""
|
||
success = replication_metadata_repo.disable_schedule(schedule_id)
|
||
if success:
|
||
await refresh_schedules()
|
||
return {'message': f'Расписание отключено'}
|
||
else:
|
||
raise HTTPException(status_code=404, detail=f"Расписание с ID={schedule_id} не найдено")
|
||
|
||
|
||
@router.post("/schedules/{schedule_id}/enable")
|
||
async def enable_schedule(schedule_id: int):
|
||
"""Включить расписание"""
|
||
success = replication_metadata_repo.enable_schedule(schedule_id)
|
||
if success:
|
||
await refresh_schedules()
|
||
return {'message': f'Расписание включено'}
|
||
else:
|
||
raise HTTPException(status_code=404, detail=f"Расписание с ID={schedule_id} не найдено")
|
||
|
||
# ==================== Фоновые задачи ====================
|
||
|
||
# def run_migration_task(full_reload: bool):
|
||
# """Фоновая задача для миграции всех таблиц"""
|
||
# try:
|
||
# migrator.run_migration(full_reload=full_reload)
|
||
# except Exception as e:
|
||
# migration_logger.error(f"Ошибка в фоновой задаче: {e}")
|
||
|
||
|
||
# def run_scheduled_migration(table_name: str, full_reload: bool):
|
||
# """Фоновая задача для запланированной миграции одной таблицы"""
|
||
# try:
|
||
# migration_logger.info(f"Запуск запланированной миграции для {table_name}")
|
||
|
||
# migrator.run_migration(
|
||
# tables=[table_name],
|
||
# full_reload=full_reload,
|
||
# send_email=True
|
||
# )
|
||
|
||
# # Обновляем время последнего запуска в расписании
|
||
# replication_metadata_repo.update_schedule_last_run(table_name)
|
||
|
||
# # Логируем успешный запуск
|
||
# replication_metadata_repo.log_operation(
|
||
# table_name=table_name,
|
||
# operation='SCHEDULED',
|
||
# records_count=0,
|
||
# status='SUCCESS'
|
||
# )
|
||
|
||
# migration_logger.info(f"Запланированная миграция для {table_name} завершена")
|
||
|
||
# except Exception as e:
|
||
# error_msg = f"Ошибка в запланированной миграции для {table_name}: {e}"
|
||
# migration_logger.error(error_msg)
|
||
# migration_logger.exception(e)
|
||
|
||
# # Логируем ошибку
|
||
# replication_metadata_repo.log_operation(
|
||
# table_name=table_name,
|
||
# operation='SCHEDULED',
|
||
# records_count=0,
|
||
# status='ERROR',
|
||
# error_message=str(e)[:500]
|
||
# ) |