from datetime import datetime from typing import List, Optional try: from fastapi import FastAPI, HTTPException from pydantic import BaseModel except ImportError: FastAPI = None HTTPException = None BaseModel = object from .config import Config from .queue import migration_queue from .table_config_repository import TableConfigRepository # ============================================================================ # FASTAPI # ============================================================================ class MigrationRequest(BaseModel): """Запрос на запуск миграции из API.""" tables: Optional[List[str]] = None send_email: bool = True dry_run: Optional[bool] = None read_limit: Optional[int] = None force_full: bool = False run_at: Optional[datetime] = None delay_seconds: Optional[int] = None class ScheduleRequest(BaseModel): """Запрос на создание расписания миграции.""" schedule_type: str tables: Optional[List[str]] = None send_email: bool = True dry_run: Optional[bool] = None read_limit: Optional[int] = None interval_seconds: Optional[int] = None daily_time: Optional[str] = None start_at: Optional[datetime] = None name: Optional[str] = None enabled: bool = True catch_up_missed_runs: bool = False initial_force_full: bool = False def create_app(): """Создание FastAPI приложения.""" if FastAPI is None: return None api = FastAPI(title="Syncio Migration API", version="0.1.0") @api.on_event("startup") def startup_queue(): if Config.START_API_WORKER: migration_queue.start() @api.get("/health") def health(): return {"status": "ok"} @api.get("/tables") def tables(): from sqlalchemy import create_engine config = Config() engine = create_engine(config.POSTGRES_CONNECTION_STRING) repository = TableConfigRepository(config, engine) table_configs = repository.load_configs(seed_defaults=True) return [ { "source_table": table.source_table, "target_table": table.pg_table, "mode": table.mode, "life_table": table.life_table, "datetime_column": table.datetime_column, "sequence_column": table.sequence_column, "operation_column": table.operation_column, "initial_load_mode": table.initial_load_mode, "order_columns": table.incremental_order_columns, "timescale": table.timescale, "enabled": table.enabled, } for table in table_configs ] @api.get("/migrations/status") def migration_status(): return migration_queue.get_status() @api.get("/migrations/queue") def migration_queue_list(): return migration_queue.list_jobs() @api.get("/migrations/queue/{job_id}") def migration_queue_job(job_id: str): job = migration_queue.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @api.get("/migrations/schedules") def migration_schedule_list(): return migration_queue.list_schedules() @api.get("/migrations/schedules/{schedule_id}") def migration_schedule(schedule_id: str): schedule = migration_queue.get_schedule(schedule_id) if not schedule: raise HTTPException(status_code=404, detail="Schedule not found") return schedule @api.post("/migrations/run") def run_migration(request: MigrationRequest): try: job = migration_queue.enqueue( tables=request.tables, send_email=request.send_email, dry_run=request.dry_run, read_limit=request.read_limit, force_full=request.force_full, run_at=request.run_at, delay_seconds=request.delay_seconds, ) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) return {"status": "queued", "job": job} @api.post("/migrations/tables/{table_name}/run") def run_table_migration(table_name: str): job = migration_queue.enqueue(tables=[table_name], send_email=True) return {"status": "queued", "job": job} @api.post("/migrations/schedules") def create_schedule(request: ScheduleRequest): try: schedule = migration_queue.create_schedule( schedule_type=request.schedule_type, tables=request.tables, send_email=request.send_email, dry_run=request.dry_run, read_limit=request.read_limit, interval_seconds=request.interval_seconds, daily_time=request.daily_time, start_at=request.start_at, name=request.name, enabled=request.enabled, catch_up_missed_runs=request.catch_up_missed_runs, initial_force_full=request.initial_force_full, ) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) return {"status": "scheduled", "schedule": schedule} return api app = create_app()