from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, ConfigDict from typing import List, Optional, Dict from datetime import datetime from database import PostgresSessionLocal, DatabaseManager from models import MigrationTable, ReplicationJob, ReplicationStatus, DataSource, SourceType, TargetDatabase, DatabaseType from scheduler import scheduler_manager router = APIRouter(prefix="/api/v1", tags=["replication"]) # ========== TargetDatabase Pydantic Models ========== class TargetDatabaseCreate(BaseModel): """Модель для создания целевой БД""" name: str db_type: str = "pgsql" # pgsql или mssql host: str port: int database: str username: str password: str is_default: bool = False description: Optional[str] = None class TargetDatabaseUpdate(BaseModel): """Модель для обновления целевой БД""" name: Optional[str] = None host: Optional[str] = None port: Optional[int] = None database: Optional[str] = None username: Optional[str] = None password: Optional[str] = None is_active: Optional[bool] = None is_default: Optional[bool] = None description: Optional[str] = None class TargetDatabaseResponse(BaseModel): """Модель ответа для целевой БД""" model_config = ConfigDict(from_attributes=True) id: int name: str db_type: str host: str port: int database: str is_active: bool is_default: bool description: Optional[str] = None created_at: datetime updated_at: datetime # ========== DataSource Pydantic Models ========== class DataSourceCreate(BaseModel): """Модель для создания источника данных""" name: str source_type: str # "mssql" или "pgsql" host: str port: int database: str username: str password: str default_schema: str = "dbo" description: Optional[str] = None class DataSourceUpdate(BaseModel): """Модель для обновления источника данных""" name: Optional[str] = None host: Optional[str] = None port: Optional[int] = None database: Optional[str] = None username: Optional[str] = None password: Optional[str] = None default_schema: Optional[str] = None description: Optional[str] = None is_active: Optional[bool] = None class DataSourceResponse(BaseModel): """Модель ответа для источника данных""" model_config = ConfigDict(from_attributes=True) id: int name: str source_type: str host: str port: int database: str default_schema: str is_active: bool description: Optional[str] = None created_at: datetime updated_at: datetime # ========== MigrationTable Pydantic Models ========== class MigrationTableCreate(BaseModel): """Модель для создания таблицы миграции""" table_name: str source_id: int # ID источника из таблицы DataSource cron_schedule: str source_schema: Optional[str] = None # Если None, используется default_schema из DataSource target_schema: str = "public" target_id: Optional[int] = None # ID целевой БД (если None, используется default) target_table_name: Optional[str] = None # Переименование таблицы column_mapping: Optional[Dict[str, str]] = None # Переименование столбцов use_life_table: bool = False # Обрабатывать Life таблицы life_excluded_fields: Optional[List[str]] = None # Сервисные поля для исключения class MigrationTableUpdate(BaseModel): """Модель для обновления таблицы миграции""" table_name: Optional[str] = None source_id: Optional[int] = None cron_schedule: Optional[str] = None source_schema: Optional[str] = None target_schema: Optional[str] = None target_id: Optional[int] = None target_table_name: Optional[str] = None column_mapping: Optional[Dict[str, str]] = None use_life_table: Optional[bool] = None life_excluded_fields: Optional[List[str]] = None is_active: Optional[bool] = None class MigrationTableResponse(BaseModel): """Модель ответа для таблицы миграции""" model_config = ConfigDict(from_attributes=True) id: int table_name: str source_id: int target_id: Optional[int] source_schema: Optional[str] target_schema: str target_table_name: Optional[str] column_mapping: Optional[Dict[str, str]] use_life_table: bool life_excluded_fields: Optional[List[str]] cron_schedule: str is_active: bool created_at: datetime updated_at: datetime # ========== ReplicationJob Models ========== class ReplicationJobResponse(BaseModel): """Модель ответа для задачи репликации""" model_config = ConfigDict(from_attributes=True) id: int table_id: int table_name: str status: str started_at: Optional[datetime] = None completed_at: Optional[datetime] = None rows_processed: int error_message: Optional[str] = None created_at: datetime class SchedulerStatusResponse(BaseModel): """Модель ответа статуса планировщика""" is_running: bool migration_tables_count: int running_jobs: dict queued_jobs: list # ========== Health Check ========== @router.get("/health") def health_check(): """Проверка здоровья сервиса""" session = PostgresSessionLocal() postgres_ok = DatabaseManager.test_postgres_connection() # Получить все активные источники try: data_sources = session.query(DataSource).filter(DataSource.is_active == True).all() sources_status = {} for source in data_sources: is_ok = DatabaseManager.test_source_connection(source.id) sources_status[f"{source.name} ({source.source_type.value})"] = is_ok except Exception as e: sources_status = {"error": str(e)} finally: session.close() # Здоров если есть целевая БД и хотя бы один активный источник is_healthy = postgres_ok and any(sources_status.values()) if sources_status else False return { "status": "healthy" if is_healthy else "degraded", "target_postgres": postgres_ok, "data_sources": sources_status, "timestamp": datetime.utcnow() } # ========== TargetDatabase Endpoints ========== @router.post("/target-databases", response_model=TargetDatabaseResponse) def create_target_database(db: TargetDatabaseCreate): """Создать новую целевую БД""" session = PostgresSessionLocal() try: # Проверить существование existing = session.query(TargetDatabase).filter(TargetDatabase.name == db.name).first() if existing: raise HTTPException(status_code=400, detail=f"TargetDatabase with name '{db.name}' already exists") # Проверить db_type try: DatabaseType(db.db_type) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid db_type: {db.db_type}. Must be 'pgsql' or 'mssql'") # Если это default, повернуть остальные как не default if db.is_default: session.query(TargetDatabase).update({TargetDatabase.is_default: False}) new_db = TargetDatabase( name=db.name, db_type=DatabaseType(db.db_type), host=db.host, port=db.port, database=db.database, username=db.username, password=db.password, is_default=db.is_default, description=db.description ) session.add(new_db) session.commit() return TargetDatabaseResponse.model_validate(new_db) except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.get("/target-databases", response_model=List[TargetDatabaseResponse]) def list_target_databases(active_only: bool = Query(False)): """Получить список целевых БД""" session = PostgresSessionLocal() try: query = session.query(TargetDatabase) if active_only: query = query.filter(TargetDatabase.is_active == True) dbs = query.all() return [TargetDatabaseResponse.model_validate(db) for db in dbs] finally: session.close() @router.get("/target-databases/{target_id}", response_model=TargetDatabaseResponse) def get_target_database(target_id: int): """Получить целевую БД по ID""" session = PostgresSessionLocal() try: db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() if not db: raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found") return TargetDatabaseResponse.model_validate(db) finally: session.close() @router.put("/target-databases/{target_id}", response_model=TargetDatabaseResponse) def update_target_database(target_id: int, update: TargetDatabaseUpdate): """Обновить целевую БД""" session = PostgresSessionLocal() try: db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() if not db: raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found") update_data = update.model_dump(exclude_unset=True) if update_data: for key, value in update_data.items(): if hasattr(db, key): setattr(db, key, value) # Если это default, повернуть остальные if update.is_default == True: session.query(TargetDatabase).filter(TargetDatabase.id != target_id).update( {TargetDatabase.is_default: False} ) session.commit() DatabaseManager.clear_engine_cache(target_id=target_id) return TargetDatabaseResponse.model_validate(db) except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.delete("/target-databases/{target_id}") def delete_target_database(target_id: int): """Удалить целевую БД (мягкое удаление)""" session = PostgresSessionLocal() try: db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() if not db: raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found") db.is_active = False session.commit() DatabaseManager.clear_engine_cache(target_id=target_id) return {"message": f"TargetDatabase {target_id} deactivated"} except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.post("/target-databases/{target_id}/test") def test_target_database_connection(target_id: int): """Проверить подключение к целевой БД""" try: is_ok = DatabaseManager.test_target_connection(target_id) return {"target_id": target_id, "connection_ok": is_ok} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # ========== DataSource Endpoints ========== @router.post("/data-sources", response_model=DataSourceResponse) def create_data_source(source: DataSourceCreate): """Создать новый источник данных""" session = PostgresSessionLocal() try: # Проверить что источник с таким именем еще не существует existing = session.query(DataSource).filter(DataSource.name == source.name).first() if existing: raise HTTPException(status_code=400, detail=f"DataSource with name '{source.name}' already exists") # Проверить что source_type валиден try: SourceType(source.source_type) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid source_type: {source.source_type}. Must be 'mssql' or 'pgsql'") # Создать новый источник new_source = DataSource( name=source.name, source_type=SourceType(source.source_type), host=source.host, port=source.port, database=source.database, username=source.username, password=source.password, default_schema=source.default_schema, description=source.description ) session.add(new_source) session.commit() return DataSourceResponse.model_validate(new_source) except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.get("/data-sources", response_model=List[DataSourceResponse]) def list_data_sources(active_only: bool = Query(False)): """Получить список источников данных""" session = PostgresSessionLocal() try: query = session.query(DataSource) if active_only: query = query.filter(DataSource.is_active == True) sources = query.all() return [DataSourceResponse.model_validate(s) for s in sources] finally: session.close() @router.get("/data-sources/{source_id}", response_model=DataSourceResponse) def get_data_source(source_id: int): """Получить источник данных по ID""" session = PostgresSessionLocal() try: source = session.query(DataSource).filter(DataSource.id == source_id).first() if not source: raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found") return DataSourceResponse.model_validate(source) finally: session.close() @router.put("/data-sources/{source_id}", response_model=DataSourceResponse) def update_data_source(source_id: int, update: DataSourceUpdate): """Обновить источник данных""" session = PostgresSessionLocal() try: source = session.query(DataSource).filter(DataSource.id == source_id).first() if not source: raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found") # Обновить только переданные поля update_data = update.model_dump(exclude_unset=True) if update_data: for key, value in update_data.items(): if hasattr(source, key): setattr(source, key, value) session.commit() return DataSourceResponse.model_validate(source) except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.delete("/data-sources/{source_id}") def delete_data_source(source_id: int): """Удалить источник данных (мягкое удаление - установить is_active=False)""" session = PostgresSessionLocal() try: source = session.query(DataSource).filter(DataSource.id == source_id).first() if not source: raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found") # Мягкое удаление source.is_active = False session.commit() # Очистить кеш подключений DatabaseManager.clear_engine_cache(source_id) return {"message": f"DataSource {source_id} deactivated"} except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.post("/data-sources/{source_id}/test") def test_data_source_connection(source_id: int): """Проверить подключение к источнику данных""" try: is_ok = DatabaseManager.test_source_connection(source_id) return {"source_id": source_id, "connection_ok": is_ok} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # ========== Migration Tables Endpoints ========== @router.post("/migration-tables", response_model=MigrationTableResponse) def create_migration_table(table: MigrationTableCreate): """Создать новую таблицу для миграции""" session = PostgresSessionLocal() try: # Проверить что источник существует и активен source = session.query(DataSource).filter(DataSource.id == table.source_id).first() if not source: raise HTTPException(status_code=404, detail=f"DataSource with ID {table.source_id} not found") if not source.is_active: raise HTTPException(status_code=400, detail=f"DataSource {table.source_id} is inactive") # Создать запись миграции migration_table = MigrationTable( table_name=table.table_name, source_id=table.source_id, target_id=table.target_id, # Может быть None source_schema=table.source_schema, target_schema=table.target_schema, target_table_name=table.target_table_name, column_mapping=table.column_mapping, use_life_table=table.use_life_table, life_excluded_fields=table.life_excluded_fields, cron_schedule=table.cron_schedule, is_active=True ) session.add(migration_table) session.commit() # Добавить в планировщик scheduler_manager.add_job(migration_table) return MigrationTableResponse.model_validate(migration_table) except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.get("/migration-tables", response_model=List[MigrationTableResponse]) def list_migration_tables(active_only: bool = Query(False)): """Получить список таблиц для миграции""" session = PostgresSessionLocal() try: query = session.query(MigrationTable) if active_only: query = query.filter(MigrationTable.is_active == True) tables = query.all() return [MigrationTableResponse.model_validate(t) for t in tables] finally: session.close() @router.get("/migration-tables/{table_id}", response_model=MigrationTableResponse) def get_migration_table(table_id: int): """Получить таблицу миграции по ID""" session = PostgresSessionLocal() try: table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first() if not table: raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found") return MigrationTableResponse.model_validate(table) finally: session.close() @router.put("/migration-tables/{table_id}", response_model=MigrationTableResponse) def update_migration_table(table_id: int, update: MigrationTableUpdate): """Обновить таблицу миграции""" session = PostgresSessionLocal() try: table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first() if not table: raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found") # Проверить что источник существует если изменяется if update.source_id and update.source_id != table.source_id: source = session.query(DataSource).filter(DataSource.id == update.source_id).first() if not source: raise HTTPException(status_code=404, detail=f"DataSource with ID {update.source_id} not found") if not source.is_active: raise HTTPException(status_code=400, detail=f"DataSource {update.source_id} is inactive") # Обновить только переданные поля update_data = update.model_dump(exclude_unset=True) if update_data: for key, value in update_data.items(): if hasattr(table, key): setattr(table, key, value) session.commit() # Пересоздать job в планировщике если нужно scheduler_manager.remove_job(table_id) scheduler_manager.add_job(table) return MigrationTableResponse.model_validate(table) except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() @router.delete("/migration-tables/{table_id}") def delete_migration_table(table_id: int): """Удалить таблицу миграции""" session = PostgresSessionLocal() try: table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first() if not table: raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found") # Удалить из планировщика scheduler_manager.remove_job(table_id) # Удалить из БД session.delete(table) session.commit() return {"message": f"Migration table {table_id} deleted"} except HTTPException: raise except Exception as e: session.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: session.close() # ========== Replication Jobs Endpoints ========== @router.get("/replication-jobs", response_model=List[ReplicationJobResponse]) def list_replication_jobs( limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), status: Optional[str] = None ): """Получить историю задач репликации""" try: session = PostgresSessionLocal() query = session.query(ReplicationJob) if status: query = query.filter(ReplicationJob.status == status) jobs = query.order_by(ReplicationJob.created_at.desc()).offset(offset).limit(limit).all() session.close() return [ReplicationJobResponse.model_validate(job.__dict__) for job in jobs] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/replication-jobs/{job_id}", response_model=ReplicationJobResponse) def get_replication_job(job_id: int): """Получить информацию о конкретной задаче репликации""" try: session = PostgresSessionLocal() job = session.query(ReplicationJob).filter( ReplicationJob.id == job_id ).first() session.close() if job: return ReplicationJobResponse.model_validate(job.__dict__) else: raise HTTPException(status_code=404, detail="Replication job not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Scheduler endpoints @router.get("/scheduler/status", response_model=SchedulerStatusResponse) def get_scheduler_status(): """Получить статус планировщика""" tables = scheduler_manager.get_migration_tables() running = scheduler_manager.get_running_jobs() queued = scheduler_manager.get_queued_jobs() return SchedulerStatusResponse( is_running=scheduler_manager.scheduler.running, migration_tables_count=len(tables), running_jobs=running, queued_jobs=queued ) @router.post("/scheduler/start") def start_scheduler(): """Запустить планировщик""" try: if not scheduler_manager.scheduler.running: scheduler_manager.start() return {"message": "Scheduler started successfully"} else: return {"message": "Scheduler is already running"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/scheduler/stop") def stop_scheduler(): """Остановить планировщик""" try: if scheduler_manager.scheduler.running: scheduler_manager.stop() return {"message": "Scheduler stopped successfully"} else: return {"message": "Scheduler is not running"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/scheduler/jobs") def get_scheduler_jobs(): """Получить список всех задач в планировщике""" try: jobs = scheduler_manager.scheduler.get_jobs() return { "total_jobs": len(jobs), "jobs": [ { "id": job.id, "name": job.name, "next_run_time": str(job.next_run_time) } for job in jobs ] } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Statistics endpoints @router.get("/statistics") def get_statistics(): """Получить статистику репликации""" try: session = PostgresSessionLocal() total_jobs = session.query(ReplicationJob).count() successful_jobs = session.query(ReplicationJob).filter( ReplicationJob.status == ReplicationStatus.SUCCESS ).count() failed_jobs = session.query(ReplicationJob).filter( ReplicationJob.status == ReplicationStatus.FAILED ).count() session.close() return { "total_jobs": total_jobs, "successful_jobs": successful_jobs, "failed_jobs": failed_jobs, "success_rate": (successful_jobs / total_jobs * 100) if total_jobs > 0 else 0, "timestamp": datetime.utcnow() } except Exception as e: raise HTTPException(status_code=500, detail=str(e))