Files
2026-03-29 23:24:15 +09:00

93 lines
3.0 KiB
Python

from fastapi import FastAPI
from contextlib import asynccontextmanager
import logging
from database import DatabaseManager, PostgresSessionLocal
from models import DataSource
from scheduler import scheduler_manager
from api import router
# Логирование
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Управление жизненным циклом приложения"""
# Startup
logger.info("Starting Data Replication Service...")
# Инициализировать БД
DatabaseManager.init_postgres_db()
logger.info("PostgreSQL database initialized")
# Проверить подключение к целевой БД
postgres_ok = DatabaseManager.test_postgres_connection()
if not postgres_ok:
logger.warning("Target PostgreSQL connection failed! Replication service requires target database.")
else:
logger.info("Target PostgreSQL connection successful")
# Проверить доступные источники данных
session = PostgresSessionLocal()
try:
data_sources = session.query(DataSource).filter(DataSource.is_active == True).all()
if not data_sources:
logger.warning("No active data sources configured. Add data sources via API before replication.")
else:
logger.info(f"Found {len(data_sources)} active data source(s):")
for source in data_sources:
is_ok = DatabaseManager.test_source_connection(source.id)
status = "" if is_ok else ""
logger.info(f" {status} {source.name} ({source.source_type.value})")
finally:
session.close()
# Запустить планировщик
scheduler_manager.start()
logger.info("Scheduler started")
yield
# Shutdown
logger.info("Shutting down Data Replication Service...")
scheduler_manager.stop()
logger.info("Scheduler stopped")
app = FastAPI(
title="Data Replication Service",
description="Service for replicating data from multiple sources (MSSQL, PostgreSQL) to target PostgreSQL database using DLT. Configure data sources dynamically without restart.",
version="1.0.0",
lifespan=lifespan
)
# Подключить маршруты API
app.include_router(router)
@app.get("/")
def read_root():
"""Root endpoint"""
return {
"message": "Data Replication Service is running",
"docs": "/docs",
"openapi": "/openapi.json",
"features": [
"Dynamic data source management",
"Multiple source types (MSSQL, PostgreSQL)",
"Cron-based scheduling",
"DLT-powered data transfer",
"Real-time monitoring"
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)