293 lines
13 KiB
Python
293 lines
13 KiB
Python
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.orm import sessionmaker, Session
|
||
from config import settings
|
||
from models import Base, DataSource, TargetDatabase, SourceType, DatabaseType
|
||
import pyodbc
|
||
import logging
|
||
from typing import Optional, Dict
|
||
from datetime import datetime, timedelta
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# PostgreSQL engine для хранения конфигурации и логов
|
||
postgres_engine = create_engine(
|
||
f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@"
|
||
f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}",
|
||
echo=False,
|
||
pool_pre_ping=True
|
||
)
|
||
|
||
PostgresSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=postgres_engine)
|
||
|
||
# PostgreSQL Target engine (назначение для всех репликаций по умолчанию)
|
||
POSTGRES_CONNECTION_STRING = (
|
||
f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@"
|
||
f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}"
|
||
)
|
||
|
||
postgres_target_engine = create_engine(POSTGRES_CONNECTION_STRING, echo=False)
|
||
|
||
# Кеши для подключений
|
||
_source_engines_cache: Dict[int, object] = {}
|
||
_target_engines_cache: Dict[int, object] = {}
|
||
_cache_timestamp: Dict[int, datetime] = {}
|
||
CACHE_TTL_SECONDS = 300 # 5 минут
|
||
|
||
|
||
class DatabaseManager:
|
||
"""Управление операциями с базами данных"""
|
||
|
||
@staticmethod
|
||
def init_postgres_db():
|
||
"""Инициализация таблиц в PostgreSQL"""
|
||
Base.metadata.create_all(bind=postgres_engine)
|
||
logger.info("PostgreSQL database initialized")
|
||
|
||
@staticmethod
|
||
def get_postgres_session() -> Session:
|
||
"""Получить сессию для работы с PostgreSQL"""
|
||
return PostgresSessionLocal()
|
||
|
||
@staticmethod
|
||
def _build_connection_string(source: DataSource) -> str:
|
||
"""Строить строку подключения из DataSource"""
|
||
if source.source_type == SourceType.MSSQL:
|
||
return (
|
||
f"mssql+pyodbc://{source.username}:{source.password}@"
|
||
f"{source.host}:{source.port}/{source.database}?"
|
||
f"driver=ODBC+Driver+17+for+SQL+Server"
|
||
)
|
||
elif source.source_type == SourceType.PGSQL:
|
||
return (
|
||
f"postgresql+psycopg2://{source.username}:{source.password}@"
|
||
f"{source.host}:{source.port}/{source.database}"
|
||
)
|
||
else:
|
||
raise ValueError(f"Unknown source type: {source.source_type}")
|
||
|
||
@staticmethod
|
||
def _is_cache_valid(source_id: int) -> bool:
|
||
"""Проверить валидность кеша для источника"""
|
||
if source_id not in _cache_timestamp:
|
||
return False
|
||
elapsed = (datetime.utcnow() - _cache_timestamp[source_id]).total_seconds()
|
||
return elapsed < CACHE_TTL_SECONDS
|
||
|
||
@staticmethod
|
||
def get_source_engine(source_id: int):
|
||
"""Получить engine для источника данных по ID (с кешированием)"""
|
||
# Проверить кеш
|
||
if source_id in _source_engines_cache and DatabaseManager._is_cache_valid(source_id):
|
||
logger.debug(f"Using cached engine for source {source_id}")
|
||
return _source_engines_cache[source_id]
|
||
|
||
# Загрузить DataSource из БД
|
||
session = DatabaseManager.get_postgres_session()
|
||
try:
|
||
data_source = session.query(DataSource).filter(DataSource.id == source_id).first()
|
||
if not data_source:
|
||
raise ValueError(f"DataSource with ID {source_id} not found")
|
||
|
||
if not data_source.is_active:
|
||
raise ValueError(f"DataSource {source_id} is inactive")
|
||
|
||
# Создать new engine
|
||
connection_string = DatabaseManager._build_connection_string(data_source)
|
||
engine = create_engine(connection_string, echo=False, pool_pre_ping=True)
|
||
|
||
# Кешировать
|
||
_source_engines_cache[source_id] = engine
|
||
_cache_timestamp[source_id] = datetime.utcnow()
|
||
|
||
logger.info(f"Created engine for source {source_id}: {data_source.name}")
|
||
return engine
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def get_data_source(source_id: int) -> Optional[DataSource]:
|
||
"""Получить DataSource по ID"""
|
||
session = DatabaseManager.get_postgres_session()
|
||
try:
|
||
return session.query(DataSource).filter(DataSource.id == source_id).first()
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def test_source_connection(source_id: int) -> bool:
|
||
"""Проверить подключение к источнику"""
|
||
try:
|
||
engine = DatabaseManager.get_source_engine(source_id)
|
||
with engine.connect() as connection:
|
||
connection.execute(text("SELECT 1"))
|
||
logger.info(f"Source {source_id} connection successful")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Source {source_id} connection failed: {e}")
|
||
return False
|
||
|
||
@staticmethod
|
||
def test_postgres_connection():
|
||
"""Проверка подключения к PostgreSQL назначению"""
|
||
try:
|
||
with postgres_engine.connect() as connection:
|
||
result = connection.execute(text("SELECT 1"))
|
||
logger.info("PostgreSQL target connection successful")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"PostgreSQL target connection failed: {e}")
|
||
return False
|
||
|
||
@staticmethod
|
||
def clear_engine_cache(source_id: int = None, target_id: int = None):
|
||
"""Очистить кеш подключений (для конкретного источника/целевой БД или всех)"""
|
||
if source_id:
|
||
_source_engines_cache.pop(source_id, None)
|
||
_cache_timestamp.pop(f"source_{source_id}", None)
|
||
logger.info(f"Cleared cache for source {source_id}")
|
||
elif target_id:
|
||
_target_engines_cache.pop(target_id, None)
|
||
_cache_timestamp.pop(f"target_{target_id}", None)
|
||
logger.info(f"Cleared cache for target {target_id}")
|
||
else:
|
||
_source_engines_cache.clear()
|
||
_target_engines_cache.clear()
|
||
_cache_timestamp.clear()
|
||
logger.info("Cleared all engine caches")
|
||
|
||
@staticmethod
|
||
def _build_target_connection_string(target: TargetDatabase) -> str:
|
||
"""Строить строку подключения для целевой БД"""
|
||
if target.db_type == DatabaseType.PGSQL:
|
||
return (
|
||
f"postgresql+psycopg2://{target.username}:{target.password}@"
|
||
f"{target.host}:{target.port}/{target.database}"
|
||
)
|
||
elif target.db_type == DatabaseType.MSSQL:
|
||
return (
|
||
f"mssql+pyodbc://{target.username}:{target.password}@"
|
||
f"{target.host}:{target.port}/{target.database}?"
|
||
f"driver=ODBC+Driver+17+for+SQL+Server"
|
||
)
|
||
else:
|
||
raise ValueError(f"Unknown target database type: {target.db_type}")
|
||
|
||
@staticmethod
|
||
def _is_cache_valid(cache_key: str) -> bool:
|
||
"""Проверить валидность кеша"""
|
||
if cache_key not in _cache_timestamp:
|
||
return False
|
||
elapsed = (datetime.utcnow() - _cache_timestamp[cache_key]).total_seconds()
|
||
return elapsed < CACHE_TTL_SECONDS
|
||
|
||
@staticmethod
|
||
def get_target_engine(target_id: int = None):
|
||
"""Получить engine для целевой БД (с кешированием)"""
|
||
# Если target_id не указан, использовать целевую БД по умолчанию
|
||
session = DatabaseManager.get_postgres_session()
|
||
try:
|
||
if not target_id:
|
||
target = session.query(TargetDatabase).filter(
|
||
TargetDatabase.is_active == True,
|
||
TargetDatabase.is_default == True
|
||
).first()
|
||
if not target:
|
||
# Если нет default, использовать последнюю добавленную активную
|
||
target = session.query(TargetDatabase).filter(
|
||
TargetDatabase.is_active == True
|
||
).order_by(TargetDatabase.id.desc()).first()
|
||
if not target:
|
||
logger.warning("No active target database found, using default PostgreSQL")
|
||
return postgres_target_engine
|
||
target_id = target.id
|
||
|
||
# Проверить кеш
|
||
cache_key = f"target_{target_id}"
|
||
if target_id in _target_engines_cache and DatabaseManager._is_cache_valid(cache_key):
|
||
logger.debug(f"Using cached engine for target {target_id}")
|
||
return _target_engines_cache[target_id]
|
||
|
||
# Загрузить из БД
|
||
target = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first()
|
||
if not target:
|
||
raise ValueError(f"TargetDatabase with ID {target_id} not found")
|
||
if not target.is_active:
|
||
raise ValueError(f"TargetDatabase {target_id} is inactive")
|
||
|
||
# Создать engine
|
||
connection_string = DatabaseManager._build_target_connection_string(target)
|
||
engine = create_engine(connection_string, echo=False, pool_pre_ping=True)
|
||
|
||
# Кешировать
|
||
_target_engines_cache[target_id] = engine
|
||
_cache_timestamp[cache_key] = datetime.utcnow()
|
||
|
||
logger.info(f"Created engine for target {target_id}: {target.name}")
|
||
return engine
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def get_target_database(target_id: int = None) -> Optional[TargetDatabase]:
|
||
"""Получить конфигурацию целевой БД"""
|
||
session = DatabaseManager.get_postgres_session()
|
||
try:
|
||
if not target_id:
|
||
# Получить default
|
||
target = session.query(TargetDatabase).filter(
|
||
TargetDatabase.is_active == True,
|
||
TargetDatabase.is_default == True
|
||
).first()
|
||
if not target:
|
||
target = session.query(TargetDatabase).filter(
|
||
TargetDatabase.is_active == True
|
||
).order_by(TargetDatabase.id.desc()).first()
|
||
return target
|
||
return session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first()
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def test_target_connection(target_id: int = None) -> bool:
|
||
"""Проверить подключение к целевой БД"""
|
||
try:
|
||
engine = DatabaseManager.get_target_engine(target_id)
|
||
with engine.connect() as connection:
|
||
connection.execute(text("SELECT 1"))
|
||
logger.info(f"Target {target_id} connection successful" if target_id else "Default target connection successful")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Target {target_id} connection failed: {e}" if target_id else f"Default target connection failed: {e}")
|
||
return False
|
||
|
||
@staticmethod
|
||
def get_table_columns(table_name: str, engine) -> list:
|
||
"""Получить список колонок таблицы"""
|
||
try:
|
||
inspector = __import__('sqlalchemy').inspect(engine)
|
||
columns = inspector.get_columns(table_name)
|
||
return [col['name'] for col in columns]
|
||
except Exception as e:
|
||
logger.error(f"Error getting columns for {table_name}: {e}")
|
||
return []
|
||
|
||
@staticmethod
|
||
def get_life_table_name(table_name: str) -> str:
|
||
"""Получить имя Life таблицы по имени оригинальной таблицы"""
|
||
return f"Life{table_name}"
|
||
|
||
@staticmethod
|
||
def check_life_table_exists(table_name: str, source_id: int) -> bool:
|
||
"""Проверить существует ли Life таблица"""
|
||
life_table = DatabaseManager.get_life_table_name(table_name)
|
||
try:
|
||
engine = DatabaseManager.get_source_engine(source_id)
|
||
with engine.connect() as connection:
|
||
result = connection.execute(
|
||
text(f"SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{life_table}'")
|
||
)
|
||
return result.fetchone() is not None
|
||
except Exception as e:
|
||
logger.warning(f"Error checking Life table {life_table}: {e}")
|
||
return False
|