from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker, Session from config import settings from models import Base, DataSource, TargetDatabase, SourceType, DatabaseType import pyodbc import logging from typing import Optional, Dict from datetime import datetime, timedelta logger = logging.getLogger(__name__) # PostgreSQL engine для хранения конфигурации и логов postgres_engine = create_engine( f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@" f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}", echo=False, pool_pre_ping=True ) PostgresSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=postgres_engine) # PostgreSQL Target engine (назначение для всех репликаций по умолчанию) POSTGRES_CONNECTION_STRING = ( f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@" f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}" ) postgres_target_engine = create_engine(POSTGRES_CONNECTION_STRING, echo=False) # Кеши для подключений _source_engines_cache: Dict[int, object] = {} _target_engines_cache: Dict[int, object] = {} _cache_timestamp: Dict[int, datetime] = {} CACHE_TTL_SECONDS = 300 # 5 минут class DatabaseManager: """Управление операциями с базами данных""" @staticmethod def init_postgres_db(): """Инициализация таблиц в PostgreSQL""" Base.metadata.create_all(bind=postgres_engine) logger.info("PostgreSQL database initialized") @staticmethod def get_postgres_session() -> Session: """Получить сессию для работы с PostgreSQL""" return PostgresSessionLocal() @staticmethod def _build_connection_string(source: DataSource) -> str: """Строить строку подключения из DataSource""" if source.source_type == SourceType.MSSQL: return ( f"mssql+pyodbc://{source.username}:{source.password}@" f"{source.host}:{source.port}/{source.database}?" f"driver=ODBC+Driver+17+for+SQL+Server" ) elif source.source_type == SourceType.PGSQL: return ( f"postgresql+psycopg2://{source.username}:{source.password}@" f"{source.host}:{source.port}/{source.database}" ) else: raise ValueError(f"Unknown source type: {source.source_type}") @staticmethod def _is_cache_valid(source_id: int) -> bool: """Проверить валидность кеша для источника""" if source_id not in _cache_timestamp: return False elapsed = (datetime.utcnow() - _cache_timestamp[source_id]).total_seconds() return elapsed < CACHE_TTL_SECONDS @staticmethod def get_source_engine(source_id: int): """Получить engine для источника данных по ID (с кешированием)""" # Проверить кеш if source_id in _source_engines_cache and DatabaseManager._is_cache_valid(source_id): logger.debug(f"Using cached engine for source {source_id}") return _source_engines_cache[source_id] # Загрузить DataSource из БД session = DatabaseManager.get_postgres_session() try: data_source = session.query(DataSource).filter(DataSource.id == source_id).first() if not data_source: raise ValueError(f"DataSource with ID {source_id} not found") if not data_source.is_active: raise ValueError(f"DataSource {source_id} is inactive") # Создать new engine connection_string = DatabaseManager._build_connection_string(data_source) engine = create_engine(connection_string, echo=False, pool_pre_ping=True) # Кешировать _source_engines_cache[source_id] = engine _cache_timestamp[source_id] = datetime.utcnow() logger.info(f"Created engine for source {source_id}: {data_source.name}") return engine finally: session.close() @staticmethod def get_data_source(source_id: int) -> Optional[DataSource]: """Получить DataSource по ID""" session = DatabaseManager.get_postgres_session() try: return session.query(DataSource).filter(DataSource.id == source_id).first() finally: session.close() @staticmethod def test_source_connection(source_id: int) -> bool: """Проверить подключение к источнику""" try: engine = DatabaseManager.get_source_engine(source_id) with engine.connect() as connection: connection.execute(text("SELECT 1")) logger.info(f"Source {source_id} connection successful") return True except Exception as e: logger.error(f"Source {source_id} connection failed: {e}") return False @staticmethod def test_postgres_connection(): """Проверка подключения к PostgreSQL назначению""" try: with postgres_engine.connect() as connection: result = connection.execute(text("SELECT 1")) logger.info("PostgreSQL target connection successful") return True except Exception as e: logger.error(f"PostgreSQL target connection failed: {e}") return False @staticmethod def clear_engine_cache(source_id: int = None, target_id: int = None): """Очистить кеш подключений (для конкретного источника/целевой БД или всех)""" if source_id: _source_engines_cache.pop(source_id, None) _cache_timestamp.pop(f"source_{source_id}", None) logger.info(f"Cleared cache for source {source_id}") elif target_id: _target_engines_cache.pop(target_id, None) _cache_timestamp.pop(f"target_{target_id}", None) logger.info(f"Cleared cache for target {target_id}") else: _source_engines_cache.clear() _target_engines_cache.clear() _cache_timestamp.clear() logger.info("Cleared all engine caches") @staticmethod def _build_target_connection_string(target: TargetDatabase) -> str: """Строить строку подключения для целевой БД""" if target.db_type == DatabaseType.PGSQL: return ( f"postgresql+psycopg2://{target.username}:{target.password}@" f"{target.host}:{target.port}/{target.database}" ) elif target.db_type == DatabaseType.MSSQL: return ( f"mssql+pyodbc://{target.username}:{target.password}@" f"{target.host}:{target.port}/{target.database}?" f"driver=ODBC+Driver+17+for+SQL+Server" ) else: raise ValueError(f"Unknown target database type: {target.db_type}") @staticmethod def _is_cache_valid(cache_key: str) -> bool: """Проверить валидность кеша""" if cache_key not in _cache_timestamp: return False elapsed = (datetime.utcnow() - _cache_timestamp[cache_key]).total_seconds() return elapsed < CACHE_TTL_SECONDS @staticmethod def get_target_engine(target_id: int = None): """Получить engine для целевой БД (с кешированием)""" # Если target_id не указан, использовать целевую БД по умолчанию session = DatabaseManager.get_postgres_session() try: if not target_id: target = session.query(TargetDatabase).filter( TargetDatabase.is_active == True, TargetDatabase.is_default == True ).first() if not target: # Если нет default, использовать последнюю добавленную активную target = session.query(TargetDatabase).filter( TargetDatabase.is_active == True ).order_by(TargetDatabase.id.desc()).first() if not target: logger.warning("No active target database found, using default PostgreSQL") return postgres_target_engine target_id = target.id # Проверить кеш cache_key = f"target_{target_id}" if target_id in _target_engines_cache and DatabaseManager._is_cache_valid(cache_key): logger.debug(f"Using cached engine for target {target_id}") return _target_engines_cache[target_id] # Загрузить из БД target = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() if not target: raise ValueError(f"TargetDatabase with ID {target_id} not found") if not target.is_active: raise ValueError(f"TargetDatabase {target_id} is inactive") # Создать engine connection_string = DatabaseManager._build_target_connection_string(target) engine = create_engine(connection_string, echo=False, pool_pre_ping=True) # Кешировать _target_engines_cache[target_id] = engine _cache_timestamp[cache_key] = datetime.utcnow() logger.info(f"Created engine for target {target_id}: {target.name}") return engine finally: session.close() @staticmethod def get_target_database(target_id: int = None) -> Optional[TargetDatabase]: """Получить конфигурацию целевой БД""" session = DatabaseManager.get_postgres_session() try: if not target_id: # Получить default target = session.query(TargetDatabase).filter( TargetDatabase.is_active == True, TargetDatabase.is_default == True ).first() if not target: target = session.query(TargetDatabase).filter( TargetDatabase.is_active == True ).order_by(TargetDatabase.id.desc()).first() return target return session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() finally: session.close() @staticmethod def test_target_connection(target_id: int = None) -> bool: """Проверить подключение к целевой БД""" try: engine = DatabaseManager.get_target_engine(target_id) with engine.connect() as connection: connection.execute(text("SELECT 1")) logger.info(f"Target {target_id} connection successful" if target_id else "Default target connection successful") return True except Exception as e: logger.error(f"Target {target_id} connection failed: {e}" if target_id else f"Default target connection failed: {e}") return False @staticmethod def get_table_columns(table_name: str, engine) -> list: """Получить список колонок таблицы""" try: inspector = __import__('sqlalchemy').inspect(engine) columns = inspector.get_columns(table_name) return [col['name'] for col in columns] except Exception as e: logger.error(f"Error getting columns for {table_name}: {e}") return [] @staticmethod def get_life_table_name(table_name: str) -> str: """Получить имя Life таблицы по имени оригинальной таблицы""" return f"Life{table_name}" @staticmethod def check_life_table_exists(table_name: str, source_id: int) -> bool: """Проверить существует ли Life таблица""" life_table = DatabaseManager.get_life_table_name(table_name) try: engine = DatabaseManager.get_source_engine(source_id) with engine.connect() as connection: result = connection.execute( text(f"SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{life_table}'") ) return result.fetchone() is not None except Exception as e: logger.warning(f"Error checking Life table {life_table}: {e}") return False