from sqlalchemy import create_engine from sqlalchemy.engine import Engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine, create_async_engine, async_sessionmaker from contextlib import asynccontextmanager, contextmanager from typing import AsyncGenerator, Optional from app.core.config import settings class DatabaseConnector: """Управление подключениями к базам данных""" def __init__(self): self._src_engine: Optional[Engine] = None self._dst_engine: Optional[Engine] = None self._async_dst_engine: Optional[AsyncEngine] = None self.dst_session = None self._async_dst_session_maker = None self.schedule_session = None @property def src_engine(self) -> Engine: """Подключение к MSSQL""" if not self._src_engine: self._src_engine = create_engine( settings.MSSQL_CONNECTION_STRING + "?charset=cp1251", pool_pre_ping=True, echo=settings.DEBUG ) return self._src_engine @property def dst_engine(self) -> Engine: """Подключение к PostgreSQL (основная БД)""" if not self._dst_engine: self._dst_engine = create_engine( settings.POSTGRES_CONNECTION_STRING, pool_pre_ping=True, echo=settings.DEBUG, connect_args={ "options": "-c client_encoding=utf8" } ) self.dst_session = sessionmaker(bind=self._dst_engine) return self._dst_engine @property def async_dst_engine(self) -> AsyncEngine: """Асинхронное подключение к PostgreSQL (основная БД)""" if not self._async_dst_engine: self._async_dst_engine = create_async_engine( settings.POSTGRES_ASYNC_CONNECTION_STRING, pool_pre_ping=True, echo=settings.DEBUG, connect_args={ "server_settings": { "client_encoding": "UTF8" } } ) self._async_dst_session_maker = async_sessionmaker( bind=self._async_dst_engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, ) return self._async_dst_engine @asynccontextmanager async def async_dst_session(self) -> AsyncGenerator[AsyncSession, None]: """ Асинхронный контекстный менеджер для сессии PostgreSQL. Использование: async with db_connector.async_dst_session() as session: result = await session.execute(select(...)) """ self.async_dst_engine async with self._async_dst_session_maker() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() @contextmanager def src_connection(self): """Контекстный менеджер для MSSQL соединения""" conn = self.src_engine.connect() try: yield conn finally: conn.close() @contextmanager def dst_connection(self): """Контекстный менеджер для PostgreSQL соединения""" conn = self.dst_engine.connect() try: yield conn finally: conn.close() async def dispose_engines(self): """Закрытие всех соединений""" if self._src_engine: self._src_engine.dispose() if self._dst_engine: self._dst_engine.dispose() if self._async_dst_engine: await self._async_dst_engine.dispose() # Глобальный экземпляр подключений db_connector = DatabaseConnector()