from contextlib import asynccontextmanager from datetime import datetime from typing import List, Optional try: from fastapi import FastAPI, HTTPException from pydantic import BaseModel except ImportError: FastAPI = None HTTPException = None BaseModel = object from sqlalchemy import text from .config import Config from .queue import migration_queue from .table_config_repository import TableConfigRepository # ============================================================================ # FASTAPI # ============================================================================ class MigrationRequest(BaseModel): """Запрос на запуск миграции из API.""" tables: Optional[List[str]] = None send_email: bool = True dry_run: Optional[bool] = None read_limit: Optional[int] = None force_full: bool = False run_at: Optional[datetime] = None delay_seconds: Optional[int] = None class ScheduleRequest(BaseModel): """Запрос на создание расписания миграции.""" schedule_type: str tables: Optional[List[str]] = None send_email: bool = True dry_run: Optional[bool] = None read_limit: Optional[int] = None interval_seconds: Optional[int] = None daily_time: Optional[str] = None start_at: Optional[datetime] = None name: Optional[str] = None enabled: bool = True catch_up_missed_runs: bool = False initial_force_full: bool = False class WebhookCreate(BaseModel): url: str secret: Optional[str] = None label: Optional[str] = None class WebhookToggle(BaseModel): active: bool def create_app(): """Создание FastAPI приложения.""" if FastAPI is None: return None _shared: dict = {} @asynccontextmanager async def lifespan(app: FastAPI): from sqlalchemy import create_engine config = Config() engine = create_engine( config.POSTGRES_CONNECTION_STRING, pool_pre_ping=True, pool_recycle=1800, ) _shared['engine'] = engine _shared['config'] = config schema = f'"{config.REPLICATOR_SCHEMA}"' with engine.connect() as conn: conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS {schema}')) conn.execute(text(f""" CREATE TABLE IF NOT EXISTS {schema}.webhook_subscriptions ( id SERIAL PRIMARY KEY, url TEXT NOT NULL, secret TEXT, label TEXT, active BOOLEAN NOT NULL DEFAULT true, created_at TIMESTAMP NOT NULL DEFAULT now() ) """)) conn.commit() if Config.START_API_WORKER: migration_queue.start() yield engine = _shared.get('engine') if engine is not None: engine.dispose() api = FastAPI(title="Syncio Migration API", version="0.1.0", lifespan=lifespan) @api.get("/health") def health(): return {"status": "ok"} @api.get("/tables") def tables(): repository = TableConfigRepository(_shared['config'], _shared['engine']) table_configs = repository.load_configs(seed_defaults=True) return [ { "source_table": table.source_table, "target_table": table.pg_table, "mode": table.mode, "life_table": table.life_table, "datetime_column": table.datetime_column, "sequence_column": table.sequence_column, "operation_column": table.operation_column, "initial_load_mode": table.initial_load_mode, "order_columns": table.incremental_order_columns, "timescale": table.timescale, "enabled": table.enabled, } for table in table_configs ] @api.get("/migrations/status") def migration_status(): return migration_queue.get_status() @api.get("/migrations/queue") def migration_queue_list(): return migration_queue.list_jobs() @api.get("/migrations/queue/{job_id}") def migration_queue_job(job_id: str): job = migration_queue.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @api.get("/migrations/schedules") def migration_schedule_list(): return migration_queue.list_schedules() @api.get("/migrations/schedules/{schedule_id}") def migration_schedule(schedule_id: str): schedule = migration_queue.get_schedule(schedule_id) if not schedule: raise HTTPException(status_code=404, detail="Schedule not found") return schedule @api.post("/migrations/run") def run_migration(request: MigrationRequest): try: job = migration_queue.enqueue( tables=request.tables, send_email=request.send_email, dry_run=request.dry_run, read_limit=request.read_limit, force_full=request.force_full, run_at=request.run_at, delay_seconds=request.delay_seconds, ) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) return {"status": "queued", "job": job} @api.post("/migrations/tables/{table_name}/run") def run_table_migration(table_name: str): job = migration_queue.enqueue(tables=[table_name], send_email=True) return {"status": "queued", "job": job} @api.post("/migrations/schedules") def create_schedule(request: ScheduleRequest): try: schedule = migration_queue.create_schedule( schedule_type=request.schedule_type, tables=request.tables, send_email=request.send_email, dry_run=request.dry_run, read_limit=request.read_limit, interval_seconds=request.interval_seconds, daily_time=request.daily_time, start_at=request.start_at, name=request.name, enabled=request.enabled, catch_up_missed_runs=request.catch_up_missed_runs, initial_force_full=request.initial_force_full, ) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) return {"status": "scheduled", "schedule": schedule} # ----------------------------------------------------------------------- # Webhooks # ----------------------------------------------------------------------- _WEBHOOKS_TABLE = f'"{Config.REPLICATOR_SCHEMA}".webhook_subscriptions' @api.get("/webhooks") def list_webhooks(): with _shared['engine'].connect() as conn: rows = conn.execute( text(f"SELECT id, url, secret, label, active, created_at FROM {_WEBHOOKS_TABLE} ORDER BY id") ).mappings().all() return [dict(r) for r in rows] @api.post("/webhooks", status_code=201) def create_webhook(body: WebhookCreate): with _shared['engine'].connect() as conn: row = conn.execute( text(f""" INSERT INTO {_WEBHOOKS_TABLE} (url, secret, label) VALUES (:url, :secret, :label) RETURNING id, url, secret, label, active, created_at """), {'url': body.url, 'secret': body.secret, 'label': body.label}, ).mappings().first() conn.commit() return dict(row) @api.patch("/webhooks/{webhook_id}") def toggle_webhook(webhook_id: int, body: WebhookToggle): with _shared['engine'].connect() as conn: row = conn.execute( text(f""" UPDATE {_WEBHOOKS_TABLE} SET active = :active WHERE id = :id RETURNING id, url, secret, label, active, created_at """), {'active': body.active, 'id': webhook_id}, ).mappings().first() conn.commit() if not row: raise HTTPException(status_code=404, detail="Webhook not found") return dict(row) @api.delete("/webhooks/{webhook_id}", status_code=204) def delete_webhook(webhook_id: int): with _shared['engine'].connect() as conn: result = conn.execute( text(f"DELETE FROM {_WEBHOOKS_TABLE} WHERE id = :id"), {'id': webhook_id}, ) conn.commit() if result.rowcount == 0: raise HTTPException(status_code=404, detail="Webhook not found") return api app = create_app()