Files
replicator/app/core/database.py
2026-03-13 17:11:39 +09:00

120 lines
4.1 KiB
Python

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine, create_async_engine, async_sessionmaker
from contextlib import asynccontextmanager, contextmanager
from typing import AsyncGenerator, Optional
from app.core.config import settings
class DatabaseConnector:
"""Управление подключениями к базам данных"""
def __init__(self):
self._src_engine: Optional[Engine] = None
self._dst_engine: Optional[Engine] = None
self._async_dst_engine: Optional[AsyncEngine] = None
self.dst_session = None
self._async_dst_session_maker = None
self.schedule_session = None
@property
def src_engine(self) -> Engine:
"""Подключение к MSSQL"""
if not self._src_engine:
self._src_engine = create_engine(
settings.MSSQL_CONNECTION_STRING + "?charset=cp1251",
pool_pre_ping=True,
echo=settings.DEBUG
)
return self._src_engine
@property
def dst_engine(self) -> Engine:
"""Подключение к PostgreSQL (основная БД)"""
if not self._dst_engine:
self._dst_engine = create_engine(
settings.POSTGRES_CONNECTION_STRING,
pool_pre_ping=True,
echo=settings.DEBUG,
connect_args={
"options": "-c client_encoding=utf8"
}
)
self.dst_session = sessionmaker(bind=self._dst_engine)
return self._dst_engine
@property
def async_dst_engine(self) -> AsyncEngine:
"""Асинхронное подключение к PostgreSQL (основная БД)"""
if not self._async_dst_engine:
self._async_dst_engine = create_async_engine(
settings.POSTGRES_ASYNC_CONNECTION_STRING,
pool_pre_ping=True,
echo=settings.DEBUG,
connect_args={
"server_settings": {
"client_encoding": "UTF8"
}
}
)
self._async_dst_session_maker = async_sessionmaker(
bind=self._async_dst_engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
return self._async_dst_engine
@asynccontextmanager
async def async_dst_session(self) -> AsyncGenerator[AsyncSession, None]:
"""
Асинхронный контекстный менеджер для сессии PostgreSQL.
Использование:
async with db_connector.async_dst_session() as session:
result = await session.execute(select(...))
"""
self.async_dst_engine
async with self._async_dst_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@contextmanager
def src_connection(self):
"""Контекстный менеджер для MSSQL соединения"""
conn = self.src_engine.connect()
try:
yield conn
finally:
conn.close()
@contextmanager
def dst_connection(self):
"""Контекстный менеджер для PostgreSQL соединения"""
conn = self.dst_engine.connect()
try:
yield conn
finally:
conn.close()
async def dispose_engines(self):
"""Закрытие всех соединений"""
if self._src_engine:
self._src_engine.dispose()
if self._dst_engine:
self._dst_engine.dispose()
if self._async_dst_engine:
await self._async_dst_engine.dispose()
# Глобальный экземпляр подключений
db_connector = DatabaseConnector()