Files
replicator/app/api/routes.py
2026-03-13 17:11:39 +09:00

390 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks, Query
from app.models.replication import ReplicationMetadata
from app.repository.replication_metadata_repo import replication_metadata_repo
from app.services.scheduler import scheduler
from app.services.migrator import migrator
from app.services.replication_state import replication_state
from app.core.logging import migration_logger
from app.core.config import settings
from app.utils.email_sender import email_sender
from app.taskiq.broker import refresh_schedules
router = APIRouter(prefix="/api/v1")
@router.get("/replication/last")
async def get_last_replication():
"""Получить информацию о последней репликации (максимальное время по всем таблицам)"""
return replication_state.get_last_replication_info()
@router.get("/replication/tables")
async def get_tables_status():
"""Получить статус всех таблиц (из replication_metadata)"""
stats = replication_state.get_all_stats()
# Форматируем для API
result = []
for table in stats['tables']:
result.append({
"table": table['name'],
"last_id": table['last_id'],
"rows_count": table['rows'],
"last_sync": table['last_sync'].isoformat() if table['last_sync'] else None,
"active": table['active']
})
return {
"total_rows": stats['total_rows'],
"tables_count": stats['tables_count'],
"active_tables": stats['active_tables'],
"tables": result
}
@router.get("/replication/tables/{metadata_id}")
async def get_table_status(metadata_id: int):
"""Получить статус конкретной таблицы"""
from app.repository.replication_metadata_repo import replication_metadata_repo
metadata = replication_metadata_repo.get_table_metadata(metadata_id)
if not metadata:
raise HTTPException(status_code=404, detail=f"Метадата с ID={metadata_id} не найдена")
return {
"table": metadata.table_name,
"last_id": metadata.last_id,
"last_sync_time": metadata.last_sync_time.isoformat() if metadata.last_sync_time else None,
"total_rows": metadata.total_rows,
"is_active": metadata.is_active,
"created_at": metadata.created_at.isoformat() if metadata.created_at else None,
"updated_at": metadata.updated_at.isoformat() if metadata.updated_at else None,
"last_error": metadata.last_error
}
@router.post("/replication/tables")
async def add_metadata(
table_name: str = Query(None, description="Наименование таблицы"),
life_table_name: Optional[str] = Query(None, description="Наименование Life таблицы"),
description: Optional[str] = Query(None, description="Описание"),
enabled: bool = Query(True, description="Включено"),
):
"""Добавить метадату"""
try:
from app.repository.replication_metadata_repo import replication_metadata_repo
metadata = replication_metadata_repo.add_metadata(
table_name=table_name,
life_table_name=life_table_name,
description=description,
enabled=enabled,
)
if metadata:
return {
"message": f"Метадата для таблицы {metadata.table_name} создана",
"metadata": metadata
}
else:
raise HTTPException(status_code=500, detail="Ошибка создания метадаты")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/replication/logs")
async def get_replication_logs(
table_name: Optional[str] = None,
limit: int = Query(100, ge=1, le=1000),
status: Optional[str] = None
):
"""Получить логи репликации"""
from app.repository.replication_metadata_repo import replication_metadata_repo
from app.models.replication import ReplicationLog
session = replication_metadata_repo.get_session()
try:
query = session.query(ReplicationLog).order_by(ReplicationLog.created_at.desc())
if table_name:
query = query.filter(ReplicationLog.table_name == table_name)
if status:
query = query.filter(ReplicationLog.status == status.upper())
logs = query.limit(limit).all()
return [
{
"id": log.id,
"table_name": log.table_name,
"operation": log.operation,
"records_count": log.records_count,
"status": log.status,
"error_message": log.error_message,
"created_at": log.created_at.isoformat()
}
for log in logs
]
finally:
session.close()
# ==================== РАСПИСАНИЯ ====================
@router.get("/schedules")
async def get_schedules():
"""Получить все расписания миграций"""
schedules = replication_metadata_repo.get_all_schedules()
return [s.to_dict() for s in schedules]
@router.get("/schedules/next-runs")
async def get_next_runs(limit: int = 10):
"""Получить следующие запуски"""
from datetime import timedelta
now = datetime.now()
runs = []
# Получаем все расписания
schedules = replication_metadata_repo.get_all_schedules()
for minute_offset in range(60 * 24 * 7): # Проверяем на неделю вперед
check_time = now + timedelta(minutes=minute_offset)
check_time_obj = check_time.time()
check_weekday = check_time.weekday()
for schedule in schedules:
if not schedule.enabled:
continue
# Проверяем совпадение времени и дня
time_diff = abs(
(schedule.schedule_time.hour * 60 + schedule.schedule_time.minute) -
(check_time_obj.hour * 60 + check_time_obj.minute)
)
if time_diff <= 1 and check_weekday in schedule.days_list:
# Получаем статистику таблицы
metadata = schedule.table
runs.append({
'table': metadata.table_name,
'time': check_time.strftime('%Y-%m-%d %H:%M'),
'day': check_time.strftime('%A'),
'days_schedule': schedule.days_display,
'full_reload': schedule.full_reload,
'rows_count': metadata.total_rows if metadata else 0,
'last_sync': metadata.last_sync_time.isoformat() if metadata and metadata.last_sync_time else None
})
if len(runs) >= limit:
break
if len(runs) >= limit:
break
return runs[:limit]
# @router.post("/schedules/run-now")
# async def run_scheduled_now(background_tasks: BackgroundTasks):
# """Принудительно запустить все запланированные на текущее время миграции"""
# due = scheduler.get_due_tables()
# if not due:
# return {'message': 'Нет таблиц для миграции в текущее время и день'}
# for schedule in due:
# background_tasks.add_task(
# run_scheduled_migration,
# schedule.table.table_name,
# schedule.full_reload
# )
# return {
# 'message': f'Запущено {len(due)} миграций',
# 'tables': [
# {
# 'name': s.table.table_name,
# 'time': s.schedule_time.strftime("%H:%M"),
# 'days': s.days_display,
# 'full_reload': s.full_reload
# }
# for s in due
# ]
# }
@router.post("/schedules/{table_name}")
async def set_schedule(
metadata_id: int = Query('ID', description="ID метадаты таблицы"),
schedule_time: str = Query("00:00", description="Время в формате HH:MM"),
days: Optional[str] = Query(None, description="Дни недели через запятую: пн,вт,ср,чт,пт,сб,вс"),
full_reload: bool = Query(False, description="Полная перезагрузка"),
enabled: bool = Query(True, description="Включено"),
name: Optional[str] = Query(None, description="Название расписания"),
description: Optional[str] = Query(None, description="Описание")
):
"""Добавить новое расписание для таблицы"""
try:
# if table_name not in settings.TABLES_TO_COPY:
# raise HTTPException(status_code=404, detail=f"Таблица {table_name} не найдена")
days_list = None
if days:
days_list = [d.strip() for d in days.split(',')]
from app.repository.replication_metadata_repo import replication_metadata_repo
schedule = replication_metadata_repo.add_schedule(
metadata_id=metadata_id,
schedule_time=schedule_time,
days=days_list,
full_reload=full_reload,
enabled=enabled,
name=name,
description=description
)
if schedule:
await refresh_schedules()
return {
"message": f"Расписание добавлено",
"schedule": schedule
}
else:
raise HTTPException(status_code=500, detail="Ошибка добавления расписания")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/schedules/{metadata_id}")
async def get_table_schedule(metadata_id: int):
"""Получить расписание для конкретной таблицы"""
schedules = replication_metadata_repo.get_schedule(metadata_id)
if not schedules:
raise HTTPException(status_code=404, detail="Расписание не найдено")
# Получаем статистику таблицы
metadata = replication_metadata_repo.get_table_metadata(metadata_id)
result = {}
result['schedules'] = []
for schedule in schedules:
result['schedules'].append(schedule)
if metadata:
result['table_stats'] = {
'table_name': metadata.table_name,
'rows_count': metadata.total_rows,
'last_sync': metadata.last_sync_time.isoformat() if metadata.last_sync_time else None,
'last_id': metadata.last_id
}
return result
@router.put("/schedules/{schedule_id}")
async def update_schedule(
schedule_id: int,
schedule_time: Optional[str] = Query(None, description="Время в формате HH:MM"),
days: Optional[str] = Query(None, description="Дни недели через запятую"),
full_reload: Optional[bool] = Query(None, description="Полная перезагрузка"),
enabled: Optional[bool] = Query(None, description="Включено"),
name: Optional[str] = Query(None, description="Название"),
description: Optional[str] = Query(None, description="Описание")
):
"""Обновить существующее расписание по ID"""
from app.repository.replication_metadata_repo import replication_metadata_repo
update_kwargs = {}
if schedule_time:
update_kwargs['schedule_time'] = schedule_time
if days:
update_kwargs['days'] = [d.strip() for d in days.split(',')]
if full_reload is not None:
update_kwargs['full_reload'] = full_reload
if enabled is not None:
update_kwargs['enabled'] = enabled
if name:
update_kwargs['name'] = name
if description:
update_kwargs['description'] = description
success = replication_metadata_repo.update_schedule(schedule_id, **update_kwargs)
if success:
await refresh_schedules()
return {"message": f"Расписание {schedule_id} обновлено"}
else:
raise HTTPException(status_code=404, detail=f"Расписание {schedule_id} не найдено")
@router.post("/schedules/{schedule_id}/disable")
async def disable_schedule(schedule_id: int):
"""Отключить расписание"""
success = replication_metadata_repo.disable_schedule(schedule_id)
if success:
await refresh_schedules()
return {'message': f'Расписание отключено'}
else:
raise HTTPException(status_code=404, detail=f"Расписание с ID={schedule_id} не найдено")
@router.post("/schedules/{schedule_id}/enable")
async def enable_schedule(schedule_id: int):
"""Включить расписание"""
success = replication_metadata_repo.enable_schedule(schedule_id)
if success:
await refresh_schedules()
return {'message': f'Расписание включено'}
else:
raise HTTPException(status_code=404, detail=f"Расписание с ID={schedule_id} не найдено")
# ==================== Фоновые задачи ====================
# def run_migration_task(full_reload: bool):
# """Фоновая задача для миграции всех таблиц"""
# try:
# migrator.run_migration(full_reload=full_reload)
# except Exception as e:
# migration_logger.error(f"Ошибка в фоновой задаче: {e}")
# def run_scheduled_migration(table_name: str, full_reload: bool):
# """Фоновая задача для запланированной миграции одной таблицы"""
# try:
# migration_logger.info(f"Запуск запланированной миграции для {table_name}")
# migrator.run_migration(
# tables=[table_name],
# full_reload=full_reload,
# send_email=True
# )
# # Обновляем время последнего запуска в расписании
# replication_metadata_repo.update_schedule_last_run(table_name)
# # Логируем успешный запуск
# replication_metadata_repo.log_operation(
# table_name=table_name,
# operation='SCHEDULED',
# records_count=0,
# status='SUCCESS'
# )
# migration_logger.info(f"Запланированная миграция для {table_name} завершена")
# except Exception as e:
# error_msg = f"Ошибка в запланированной миграции для {table_name}: {e}"
# migration_logger.error(error_msg)
# migration_logger.exception(e)
# # Логируем ошибку
# replication_metadata_repo.log_operation(
# table_name=table_name,
# operation='SCHEDULED',
# records_count=0,
# status='ERROR',
# error_message=str(e)[:500]
# )