Files
2026-03-29 23:24:15 +09:00

740 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, ConfigDict
from typing import List, Optional, Dict
from datetime import datetime
from database import PostgresSessionLocal, DatabaseManager
from models import MigrationTable, ReplicationJob, ReplicationStatus, DataSource, SourceType, TargetDatabase, DatabaseType
from scheduler import scheduler_manager
router = APIRouter(prefix="/api/v1", tags=["replication"])
# ========== TargetDatabase Pydantic Models ==========
class TargetDatabaseCreate(BaseModel):
"""Модель для создания целевой БД"""
name: str
db_type: str = "pgsql" # pgsql или mssql
host: str
port: int
database: str
username: str
password: str
is_default: bool = False
description: Optional[str] = None
class TargetDatabaseUpdate(BaseModel):
"""Модель для обновления целевой БД"""
name: Optional[str] = None
host: Optional[str] = None
port: Optional[int] = None
database: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
is_active: Optional[bool] = None
is_default: Optional[bool] = None
description: Optional[str] = None
class TargetDatabaseResponse(BaseModel):
"""Модель ответа для целевой БД"""
model_config = ConfigDict(from_attributes=True)
id: int
name: str
db_type: str
host: str
port: int
database: str
is_active: bool
is_default: bool
description: Optional[str] = None
created_at: datetime
updated_at: datetime
# ========== DataSource Pydantic Models ==========
class DataSourceCreate(BaseModel):
"""Модель для создания источника данных"""
name: str
source_type: str # "mssql" или "pgsql"
host: str
port: int
database: str
username: str
password: str
default_schema: str = "dbo"
description: Optional[str] = None
class DataSourceUpdate(BaseModel):
"""Модель для обновления источника данных"""
name: Optional[str] = None
host: Optional[str] = None
port: Optional[int] = None
database: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
default_schema: Optional[str] = None
description: Optional[str] = None
is_active: Optional[bool] = None
class DataSourceResponse(BaseModel):
"""Модель ответа для источника данных"""
model_config = ConfigDict(from_attributes=True)
id: int
name: str
source_type: str
host: str
port: int
database: str
default_schema: str
is_active: bool
description: Optional[str] = None
created_at: datetime
updated_at: datetime
# ========== MigrationTable Pydantic Models ==========
class MigrationTableCreate(BaseModel):
"""Модель для создания таблицы миграции"""
table_name: str
source_id: int # ID источника из таблицы DataSource
cron_schedule: str
source_schema: Optional[str] = None # Если None, используется default_schema из DataSource
target_schema: str = "public"
target_id: Optional[int] = None # ID целевой БД (если None, используется default)
target_table_name: Optional[str] = None # Переименование таблицы
column_mapping: Optional[Dict[str, str]] = None # Переименование столбцов
use_life_table: bool = False # Обрабатывать Life таблицы
life_excluded_fields: Optional[List[str]] = None # Сервисные поля для исключения
class MigrationTableUpdate(BaseModel):
"""Модель для обновления таблицы миграции"""
table_name: Optional[str] = None
source_id: Optional[int] = None
cron_schedule: Optional[str] = None
source_schema: Optional[str] = None
target_schema: Optional[str] = None
target_id: Optional[int] = None
target_table_name: Optional[str] = None
column_mapping: Optional[Dict[str, str]] = None
use_life_table: Optional[bool] = None
life_excluded_fields: Optional[List[str]] = None
is_active: Optional[bool] = None
class MigrationTableResponse(BaseModel):
"""Модель ответа для таблицы миграции"""
model_config = ConfigDict(from_attributes=True)
id: int
table_name: str
source_id: int
target_id: Optional[int]
source_schema: Optional[str]
target_schema: str
target_table_name: Optional[str]
column_mapping: Optional[Dict[str, str]]
use_life_table: bool
life_excluded_fields: Optional[List[str]]
cron_schedule: str
is_active: bool
created_at: datetime
updated_at: datetime
# ========== ReplicationJob Models ==========
class ReplicationJobResponse(BaseModel):
"""Модель ответа для задачи репликации"""
model_config = ConfigDict(from_attributes=True)
id: int
table_id: int
table_name: str
status: str
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
rows_processed: int
error_message: Optional[str] = None
created_at: datetime
class SchedulerStatusResponse(BaseModel):
"""Модель ответа статуса планировщика"""
is_running: bool
migration_tables_count: int
running_jobs: dict
queued_jobs: list
# ========== Health Check ==========
@router.get("/health")
def health_check():
"""Проверка здоровья сервиса"""
session = PostgresSessionLocal()
postgres_ok = DatabaseManager.test_postgres_connection()
# Получить все активные источники
try:
data_sources = session.query(DataSource).filter(DataSource.is_active == True).all()
sources_status = {}
for source in data_sources:
is_ok = DatabaseManager.test_source_connection(source.id)
sources_status[f"{source.name} ({source.source_type.value})"] = is_ok
except Exception as e:
sources_status = {"error": str(e)}
finally:
session.close()
# Здоров если есть целевая БД и хотя бы один активный источник
is_healthy = postgres_ok and any(sources_status.values()) if sources_status else False
return {
"status": "healthy" if is_healthy else "degraded",
"target_postgres": postgres_ok,
"data_sources": sources_status,
"timestamp": datetime.utcnow()
}
# ========== TargetDatabase Endpoints ==========
@router.post("/target-databases", response_model=TargetDatabaseResponse)
def create_target_database(db: TargetDatabaseCreate):
"""Создать новую целевую БД"""
session = PostgresSessionLocal()
try:
# Проверить существование
existing = session.query(TargetDatabase).filter(TargetDatabase.name == db.name).first()
if existing:
raise HTTPException(status_code=400, detail=f"TargetDatabase with name '{db.name}' already exists")
# Проверить db_type
try:
DatabaseType(db.db_type)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid db_type: {db.db_type}. Must be 'pgsql' or 'mssql'")
# Если это default, повернуть остальные как не default
if db.is_default:
session.query(TargetDatabase).update({TargetDatabase.is_default: False})
new_db = TargetDatabase(
name=db.name,
db_type=DatabaseType(db.db_type),
host=db.host,
port=db.port,
database=db.database,
username=db.username,
password=db.password,
is_default=db.is_default,
description=db.description
)
session.add(new_db)
session.commit()
return TargetDatabaseResponse.model_validate(new_db)
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.get("/target-databases", response_model=List[TargetDatabaseResponse])
def list_target_databases(active_only: bool = Query(False)):
"""Получить список целевых БД"""
session = PostgresSessionLocal()
try:
query = session.query(TargetDatabase)
if active_only:
query = query.filter(TargetDatabase.is_active == True)
dbs = query.all()
return [TargetDatabaseResponse.model_validate(db) for db in dbs]
finally:
session.close()
@router.get("/target-databases/{target_id}", response_model=TargetDatabaseResponse)
def get_target_database(target_id: int):
"""Получить целевую БД по ID"""
session = PostgresSessionLocal()
try:
db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first()
if not db:
raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found")
return TargetDatabaseResponse.model_validate(db)
finally:
session.close()
@router.put("/target-databases/{target_id}", response_model=TargetDatabaseResponse)
def update_target_database(target_id: int, update: TargetDatabaseUpdate):
"""Обновить целевую БД"""
session = PostgresSessionLocal()
try:
db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first()
if not db:
raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found")
update_data = update.model_dump(exclude_unset=True)
if update_data:
for key, value in update_data.items():
if hasattr(db, key):
setattr(db, key, value)
# Если это default, повернуть остальные
if update.is_default == True:
session.query(TargetDatabase).filter(TargetDatabase.id != target_id).update(
{TargetDatabase.is_default: False}
)
session.commit()
DatabaseManager.clear_engine_cache(target_id=target_id)
return TargetDatabaseResponse.model_validate(db)
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.delete("/target-databases/{target_id}")
def delete_target_database(target_id: int):
"""Удалить целевую БД (мягкое удаление)"""
session = PostgresSessionLocal()
try:
db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first()
if not db:
raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found")
db.is_active = False
session.commit()
DatabaseManager.clear_engine_cache(target_id=target_id)
return {"message": f"TargetDatabase {target_id} deactivated"}
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.post("/target-databases/{target_id}/test")
def test_target_database_connection(target_id: int):
"""Проверить подключение к целевой БД"""
try:
is_ok = DatabaseManager.test_target_connection(target_id)
return {"target_id": target_id, "connection_ok": is_ok}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ========== DataSource Endpoints ==========
@router.post("/data-sources", response_model=DataSourceResponse)
def create_data_source(source: DataSourceCreate):
"""Создать новый источник данных"""
session = PostgresSessionLocal()
try:
# Проверить что источник с таким именем еще не существует
existing = session.query(DataSource).filter(DataSource.name == source.name).first()
if existing:
raise HTTPException(status_code=400, detail=f"DataSource with name '{source.name}' already exists")
# Проверить что source_type валиден
try:
SourceType(source.source_type)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source_type: {source.source_type}. Must be 'mssql' or 'pgsql'")
# Создать новый источник
new_source = DataSource(
name=source.name,
source_type=SourceType(source.source_type),
host=source.host,
port=source.port,
database=source.database,
username=source.username,
password=source.password,
default_schema=source.default_schema,
description=source.description
)
session.add(new_source)
session.commit()
return DataSourceResponse.model_validate(new_source)
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.get("/data-sources", response_model=List[DataSourceResponse])
def list_data_sources(active_only: bool = Query(False)):
"""Получить список источников данных"""
session = PostgresSessionLocal()
try:
query = session.query(DataSource)
if active_only:
query = query.filter(DataSource.is_active == True)
sources = query.all()
return [DataSourceResponse.model_validate(s) for s in sources]
finally:
session.close()
@router.get("/data-sources/{source_id}", response_model=DataSourceResponse)
def get_data_source(source_id: int):
"""Получить источник данных по ID"""
session = PostgresSessionLocal()
try:
source = session.query(DataSource).filter(DataSource.id == source_id).first()
if not source:
raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found")
return DataSourceResponse.model_validate(source)
finally:
session.close()
@router.put("/data-sources/{source_id}", response_model=DataSourceResponse)
def update_data_source(source_id: int, update: DataSourceUpdate):
"""Обновить источник данных"""
session = PostgresSessionLocal()
try:
source = session.query(DataSource).filter(DataSource.id == source_id).first()
if not source:
raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found")
# Обновить только переданные поля
update_data = update.model_dump(exclude_unset=True)
if update_data:
for key, value in update_data.items():
if hasattr(source, key):
setattr(source, key, value)
session.commit()
return DataSourceResponse.model_validate(source)
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.delete("/data-sources/{source_id}")
def delete_data_source(source_id: int):
"""Удалить источник данных (мягкое удаление - установить is_active=False)"""
session = PostgresSessionLocal()
try:
source = session.query(DataSource).filter(DataSource.id == source_id).first()
if not source:
raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found")
# Мягкое удаление
source.is_active = False
session.commit()
# Очистить кеш подключений
DatabaseManager.clear_engine_cache(source_id)
return {"message": f"DataSource {source_id} deactivated"}
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.post("/data-sources/{source_id}/test")
def test_data_source_connection(source_id: int):
"""Проверить подключение к источнику данных"""
try:
is_ok = DatabaseManager.test_source_connection(source_id)
return {"source_id": source_id, "connection_ok": is_ok}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ========== Migration Tables Endpoints ==========
@router.post("/migration-tables", response_model=MigrationTableResponse)
def create_migration_table(table: MigrationTableCreate):
"""Создать новую таблицу для миграции"""
session = PostgresSessionLocal()
try:
# Проверить что источник существует и активен
source = session.query(DataSource).filter(DataSource.id == table.source_id).first()
if not source:
raise HTTPException(status_code=404, detail=f"DataSource with ID {table.source_id} not found")
if not source.is_active:
raise HTTPException(status_code=400, detail=f"DataSource {table.source_id} is inactive")
# Создать запись миграции
migration_table = MigrationTable(
table_name=table.table_name,
source_id=table.source_id,
target_id=table.target_id, # Может быть None
source_schema=table.source_schema,
target_schema=table.target_schema,
target_table_name=table.target_table_name,
column_mapping=table.column_mapping,
use_life_table=table.use_life_table,
life_excluded_fields=table.life_excluded_fields,
cron_schedule=table.cron_schedule,
is_active=True
)
session.add(migration_table)
session.commit()
# Добавить в планировщик
scheduler_manager.add_job(migration_table)
return MigrationTableResponse.model_validate(migration_table)
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.get("/migration-tables", response_model=List[MigrationTableResponse])
def list_migration_tables(active_only: bool = Query(False)):
"""Получить список таблиц для миграции"""
session = PostgresSessionLocal()
try:
query = session.query(MigrationTable)
if active_only:
query = query.filter(MigrationTable.is_active == True)
tables = query.all()
return [MigrationTableResponse.model_validate(t) for t in tables]
finally:
session.close()
@router.get("/migration-tables/{table_id}", response_model=MigrationTableResponse)
def get_migration_table(table_id: int):
"""Получить таблицу миграции по ID"""
session = PostgresSessionLocal()
try:
table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first()
if not table:
raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found")
return MigrationTableResponse.model_validate(table)
finally:
session.close()
@router.put("/migration-tables/{table_id}", response_model=MigrationTableResponse)
def update_migration_table(table_id: int, update: MigrationTableUpdate):
"""Обновить таблицу миграции"""
session = PostgresSessionLocal()
try:
table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first()
if not table:
raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found")
# Проверить что источник существует если изменяется
if update.source_id and update.source_id != table.source_id:
source = session.query(DataSource).filter(DataSource.id == update.source_id).first()
if not source:
raise HTTPException(status_code=404, detail=f"DataSource with ID {update.source_id} not found")
if not source.is_active:
raise HTTPException(status_code=400, detail=f"DataSource {update.source_id} is inactive")
# Обновить только переданные поля
update_data = update.model_dump(exclude_unset=True)
if update_data:
for key, value in update_data.items():
if hasattr(table, key):
setattr(table, key, value)
session.commit()
# Пересоздать job в планировщике если нужно
scheduler_manager.remove_job(table_id)
scheduler_manager.add_job(table)
return MigrationTableResponse.model_validate(table)
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
@router.delete("/migration-tables/{table_id}")
def delete_migration_table(table_id: int):
"""Удалить таблицу миграции"""
session = PostgresSessionLocal()
try:
table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first()
if not table:
raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found")
# Удалить из планировщика
scheduler_manager.remove_job(table_id)
# Удалить из БД
session.delete(table)
session.commit()
return {"message": f"Migration table {table_id} deleted"}
except HTTPException:
raise
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
# ========== Replication Jobs Endpoints ==========
@router.get("/replication-jobs", response_model=List[ReplicationJobResponse])
def list_replication_jobs(
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
status: Optional[str] = None
):
"""Получить историю задач репликации"""
try:
session = PostgresSessionLocal()
query = session.query(ReplicationJob)
if status:
query = query.filter(ReplicationJob.status == status)
jobs = query.order_by(ReplicationJob.created_at.desc()).offset(offset).limit(limit).all()
session.close()
return [ReplicationJobResponse.model_validate(job.__dict__) for job in jobs]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/replication-jobs/{job_id}", response_model=ReplicationJobResponse)
def get_replication_job(job_id: int):
"""Получить информацию о конкретной задаче репликации"""
try:
session = PostgresSessionLocal()
job = session.query(ReplicationJob).filter(
ReplicationJob.id == job_id
).first()
session.close()
if job:
return ReplicationJobResponse.model_validate(job.__dict__)
else:
raise HTTPException(status_code=404, detail="Replication job not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Scheduler endpoints
@router.get("/scheduler/status", response_model=SchedulerStatusResponse)
def get_scheduler_status():
"""Получить статус планировщика"""
tables = scheduler_manager.get_migration_tables()
running = scheduler_manager.get_running_jobs()
queued = scheduler_manager.get_queued_jobs()
return SchedulerStatusResponse(
is_running=scheduler_manager.scheduler.running,
migration_tables_count=len(tables),
running_jobs=running,
queued_jobs=queued
)
@router.post("/scheduler/start")
def start_scheduler():
"""Запустить планировщик"""
try:
if not scheduler_manager.scheduler.running:
scheduler_manager.start()
return {"message": "Scheduler started successfully"}
else:
return {"message": "Scheduler is already running"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/scheduler/stop")
def stop_scheduler():
"""Остановить планировщик"""
try:
if scheduler_manager.scheduler.running:
scheduler_manager.stop()
return {"message": "Scheduler stopped successfully"}
else:
return {"message": "Scheduler is not running"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/scheduler/jobs")
def get_scheduler_jobs():
"""Получить список всех задач в планировщике"""
try:
jobs = scheduler_manager.scheduler.get_jobs()
return {
"total_jobs": len(jobs),
"jobs": [
{
"id": job.id,
"name": job.name,
"next_run_time": str(job.next_run_time)
}
for job in jobs
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Statistics endpoints
@router.get("/statistics")
def get_statistics():
"""Получить статистику репликации"""
try:
session = PostgresSessionLocal()
total_jobs = session.query(ReplicationJob).count()
successful_jobs = session.query(ReplicationJob).filter(
ReplicationJob.status == ReplicationStatus.SUCCESS
).count()
failed_jobs = session.query(ReplicationJob).filter(
ReplicationJob.status == ReplicationStatus.FAILED
).count()
session.close()
return {
"total_jobs": total_jobs,
"successful_jobs": successful_jobs,
"failed_jobs": failed_jobs,
"success_rate": (successful_jobs / total_jobs * 100) if total_jobs > 0 else 0,
"timestamp": datetime.utcnow()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))