Files
2026-03-29 23:24:15 +09:00

293 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from config import settings
from models import Base, DataSource, TargetDatabase, SourceType, DatabaseType
import pyodbc
import logging
from typing import Optional, Dict
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
# PostgreSQL engine для хранения конфигурации и логов
postgres_engine = create_engine(
f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@"
f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}",
echo=False,
pool_pre_ping=True
)
PostgresSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=postgres_engine)
# PostgreSQL Target engine (назначение для всех репликаций по умолчанию)
POSTGRES_CONNECTION_STRING = (
f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@"
f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}"
)
postgres_target_engine = create_engine(POSTGRES_CONNECTION_STRING, echo=False)
# Кеши для подключений
_source_engines_cache: Dict[int, object] = {}
_target_engines_cache: Dict[int, object] = {}
_cache_timestamp: Dict[int, datetime] = {}
CACHE_TTL_SECONDS = 300 # 5 минут
class DatabaseManager:
"""Управление операциями с базами данных"""
@staticmethod
def init_postgres_db():
"""Инициализация таблиц в PostgreSQL"""
Base.metadata.create_all(bind=postgres_engine)
logger.info("PostgreSQL database initialized")
@staticmethod
def get_postgres_session() -> Session:
"""Получить сессию для работы с PostgreSQL"""
return PostgresSessionLocal()
@staticmethod
def _build_connection_string(source: DataSource) -> str:
"""Строить строку подключения из DataSource"""
if source.source_type == SourceType.MSSQL:
return (
f"mssql+pyodbc://{source.username}:{source.password}@"
f"{source.host}:{source.port}/{source.database}?"
f"driver=ODBC+Driver+17+for+SQL+Server"
)
elif source.source_type == SourceType.PGSQL:
return (
f"postgresql+psycopg2://{source.username}:{source.password}@"
f"{source.host}:{source.port}/{source.database}"
)
else:
raise ValueError(f"Unknown source type: {source.source_type}")
@staticmethod
def _is_cache_valid(source_id: int) -> bool:
"""Проверить валидность кеша для источника"""
if source_id not in _cache_timestamp:
return False
elapsed = (datetime.utcnow() - _cache_timestamp[source_id]).total_seconds()
return elapsed < CACHE_TTL_SECONDS
@staticmethod
def get_source_engine(source_id: int):
"""Получить engine для источника данных по ID (с кешированием)"""
# Проверить кеш
if source_id in _source_engines_cache and DatabaseManager._is_cache_valid(source_id):
logger.debug(f"Using cached engine for source {source_id}")
return _source_engines_cache[source_id]
# Загрузить DataSource из БД
session = DatabaseManager.get_postgres_session()
try:
data_source = session.query(DataSource).filter(DataSource.id == source_id).first()
if not data_source:
raise ValueError(f"DataSource with ID {source_id} not found")
if not data_source.is_active:
raise ValueError(f"DataSource {source_id} is inactive")
# Создать new engine
connection_string = DatabaseManager._build_connection_string(data_source)
engine = create_engine(connection_string, echo=False, pool_pre_ping=True)
# Кешировать
_source_engines_cache[source_id] = engine
_cache_timestamp[source_id] = datetime.utcnow()
logger.info(f"Created engine for source {source_id}: {data_source.name}")
return engine
finally:
session.close()
@staticmethod
def get_data_source(source_id: int) -> Optional[DataSource]:
"""Получить DataSource по ID"""
session = DatabaseManager.get_postgres_session()
try:
return session.query(DataSource).filter(DataSource.id == source_id).first()
finally:
session.close()
@staticmethod
def test_source_connection(source_id: int) -> bool:
"""Проверить подключение к источнику"""
try:
engine = DatabaseManager.get_source_engine(source_id)
with engine.connect() as connection:
connection.execute(text("SELECT 1"))
logger.info(f"Source {source_id} connection successful")
return True
except Exception as e:
logger.error(f"Source {source_id} connection failed: {e}")
return False
@staticmethod
def test_postgres_connection():
"""Проверка подключения к PostgreSQL назначению"""
try:
with postgres_engine.connect() as connection:
result = connection.execute(text("SELECT 1"))
logger.info("PostgreSQL target connection successful")
return True
except Exception as e:
logger.error(f"PostgreSQL target connection failed: {e}")
return False
@staticmethod
def clear_engine_cache(source_id: int = None, target_id: int = None):
"""Очистить кеш подключений (для конкретного источника/целевой БД или всех)"""
if source_id:
_source_engines_cache.pop(source_id, None)
_cache_timestamp.pop(f"source_{source_id}", None)
logger.info(f"Cleared cache for source {source_id}")
elif target_id:
_target_engines_cache.pop(target_id, None)
_cache_timestamp.pop(f"target_{target_id}", None)
logger.info(f"Cleared cache for target {target_id}")
else:
_source_engines_cache.clear()
_target_engines_cache.clear()
_cache_timestamp.clear()
logger.info("Cleared all engine caches")
@staticmethod
def _build_target_connection_string(target: TargetDatabase) -> str:
"""Строить строку подключения для целевой БД"""
if target.db_type == DatabaseType.PGSQL:
return (
f"postgresql+psycopg2://{target.username}:{target.password}@"
f"{target.host}:{target.port}/{target.database}"
)
elif target.db_type == DatabaseType.MSSQL:
return (
f"mssql+pyodbc://{target.username}:{target.password}@"
f"{target.host}:{target.port}/{target.database}?"
f"driver=ODBC+Driver+17+for+SQL+Server"
)
else:
raise ValueError(f"Unknown target database type: {target.db_type}")
@staticmethod
def _is_cache_valid(cache_key: str) -> bool:
"""Проверить валидность кеша"""
if cache_key not in _cache_timestamp:
return False
elapsed = (datetime.utcnow() - _cache_timestamp[cache_key]).total_seconds()
return elapsed < CACHE_TTL_SECONDS
@staticmethod
def get_target_engine(target_id: int = None):
"""Получить engine для целевой БД (с кешированием)"""
# Если target_id не указан, использовать целевую БД по умолчанию
session = DatabaseManager.get_postgres_session()
try:
if not target_id:
target = session.query(TargetDatabase).filter(
TargetDatabase.is_active == True,
TargetDatabase.is_default == True
).first()
if not target:
# Если нет default, использовать последнюю добавленную активную
target = session.query(TargetDatabase).filter(
TargetDatabase.is_active == True
).order_by(TargetDatabase.id.desc()).first()
if not target:
logger.warning("No active target database found, using default PostgreSQL")
return postgres_target_engine
target_id = target.id
# Проверить кеш
cache_key = f"target_{target_id}"
if target_id in _target_engines_cache and DatabaseManager._is_cache_valid(cache_key):
logger.debug(f"Using cached engine for target {target_id}")
return _target_engines_cache[target_id]
# Загрузить из БД
target = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first()
if not target:
raise ValueError(f"TargetDatabase with ID {target_id} not found")
if not target.is_active:
raise ValueError(f"TargetDatabase {target_id} is inactive")
# Создать engine
connection_string = DatabaseManager._build_target_connection_string(target)
engine = create_engine(connection_string, echo=False, pool_pre_ping=True)
# Кешировать
_target_engines_cache[target_id] = engine
_cache_timestamp[cache_key] = datetime.utcnow()
logger.info(f"Created engine for target {target_id}: {target.name}")
return engine
finally:
session.close()
@staticmethod
def get_target_database(target_id: int = None) -> Optional[TargetDatabase]:
"""Получить конфигурацию целевой БД"""
session = DatabaseManager.get_postgres_session()
try:
if not target_id:
# Получить default
target = session.query(TargetDatabase).filter(
TargetDatabase.is_active == True,
TargetDatabase.is_default == True
).first()
if not target:
target = session.query(TargetDatabase).filter(
TargetDatabase.is_active == True
).order_by(TargetDatabase.id.desc()).first()
return target
return session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first()
finally:
session.close()
@staticmethod
def test_target_connection(target_id: int = None) -> bool:
"""Проверить подключение к целевой БД"""
try:
engine = DatabaseManager.get_target_engine(target_id)
with engine.connect() as connection:
connection.execute(text("SELECT 1"))
logger.info(f"Target {target_id} connection successful" if target_id else "Default target connection successful")
return True
except Exception as e:
logger.error(f"Target {target_id} connection failed: {e}" if target_id else f"Default target connection failed: {e}")
return False
@staticmethod
def get_table_columns(table_name: str, engine) -> list:
"""Получить список колонок таблицы"""
try:
inspector = __import__('sqlalchemy').inspect(engine)
columns = inspector.get_columns(table_name)
return [col['name'] for col in columns]
except Exception as e:
logger.error(f"Error getting columns for {table_name}: {e}")
return []
@staticmethod
def get_life_table_name(table_name: str) -> str:
"""Получить имя Life таблицы по имени оригинальной таблицы"""
return f"Life{table_name}"
@staticmethod
def check_life_table_exists(table_name: str, source_id: int) -> bool:
"""Проверить существует ли Life таблица"""
life_table = DatabaseManager.get_life_table_name(table_name)
try:
engine = DatabaseManager.get_source_engine(source_id)
with engine.connect() as connection:
result = connection.execute(
text(f"SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{life_table}'")
)
return result.fetchone() is not None
except Exception as e:
logger.warning(f"Error checking Life table {life_table}: {e}")
return False