157 lines
6.3 KiB
Python
157 lines
6.3 KiB
Python
import os
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional
|
|
|
|
try:
|
|
from dotenv import load_dotenv
|
|
except ImportError:
|
|
def load_dotenv(path: str = '.env'):
|
|
"""Минимальный fallback для локального .env, если python-dotenv еще не установлен."""
|
|
if not os.path.exists(path):
|
|
return False
|
|
with open(path, encoding='utf-8') as env_file:
|
|
for raw_line in env_file:
|
|
line = raw_line.strip()
|
|
if not line or line.startswith('#') or '=' not in line:
|
|
continue
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
os.environ.setdefault(key, value)
|
|
return True
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
@dataclass
|
|
class TableMigrationConfig:
|
|
"""Настройки миграции одной таблицы."""
|
|
source_table: str
|
|
target_table: Optional[str] = None
|
|
mode: str = "full"
|
|
initial_load_mode: str = "full_then_incremental"
|
|
life_table: Optional[str] = None
|
|
datetime_column: str = "x_DateTime"
|
|
sequence_column: Optional[str] = None
|
|
order_columns: List[str] = field(default_factory=list)
|
|
operation_column: Optional[str] = None
|
|
delete_operations: List[str] = field(default_factory=lambda: ['d'])
|
|
upsert_operations: List[str] = field(default_factory=lambda: ['i', 'u'])
|
|
primary_key: List[str] = field(default_factory=list)
|
|
exclude_columns: List[str] = field(default_factory=list)
|
|
timescale: bool = False
|
|
timescale_time_column: Optional[str] = None
|
|
enabled: bool = True
|
|
|
|
@property
|
|
def pg_table(self) -> str:
|
|
return (self.target_table or self.source_table).lower()
|
|
|
|
@property
|
|
def read_table(self) -> str:
|
|
return self.life_table or self.source_table
|
|
|
|
@property
|
|
def incremental_order_columns(self) -> List[str]:
|
|
if self.order_columns:
|
|
return self.order_columns
|
|
columns = [self.datetime_column]
|
|
if self.sequence_column:
|
|
columns.append(self.sequence_column)
|
|
return columns
|
|
|
|
# ============================================================================
|
|
# КОНФИГУРАЦИЯ
|
|
# ============================================================================
|
|
class Config:
|
|
"""Конфигурация приложения"""
|
|
# Настройки MSSQL
|
|
MSSQL_CONNECTION_STRING = os.getenv('MSSQL_CONNECTION_STRING')
|
|
MSSQL_CHARSET = os.getenv('MSSQL_CHARSET', 'cp1251')
|
|
MSSQL_POOL_RECYCLE = int(os.getenv('MSSQL_POOL_RECYCLE', '1800'))
|
|
MSSQL_CONNECT_TIMEOUT = int(os.getenv('MSSQL_CONNECT_TIMEOUT', '30'))
|
|
MSSQL_TABLE_RETRIES = int(os.getenv('MSSQL_TABLE_RETRIES', '2'))
|
|
MSSQL_POOL_SIZE = int(os.getenv('MSSQL_POOL_SIZE', '1'))
|
|
MSSQL_MAX_OVERFLOW = int(os.getenv('MSSQL_MAX_OVERFLOW', '0'))
|
|
|
|
# Настройки PostgreSQL
|
|
POSTGRES_CONNECTION_STRING = os.getenv('POSTGRES_CONNECTION_STRING')
|
|
|
|
# Настройки email
|
|
EMAIL_HOST = os.getenv('EMAIL_HOST')
|
|
EMAIL_PORT = int(os.getenv('EMAIL_PORT', '465'))
|
|
EMAIL_USER = os.getenv('EMAIL_USER')
|
|
EMAIL_PASSWORD = os.getenv('EMAIL_PASSWORD')
|
|
EMAIL_FROM = os.getenv('EMAIL_FROM')
|
|
EMAIL_TO = [
|
|
email.strip()
|
|
for email in os.getenv('EMAIL_TO', '').split(',')
|
|
if email.strip()
|
|
]
|
|
EMAIL_SUBJECT = os.getenv('EMAIL_SUBJECT', 'Результат миграции данных MSSQL → PostgreSQL')
|
|
|
|
# Настройки логирования
|
|
LOG_DIR = 'logs'
|
|
LOG_FILE = 'migration_log_{timestamp}.log'
|
|
LOG_LEVEL = logging.INFO
|
|
|
|
# Настройки миграции
|
|
CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', '5000'))
|
|
WRITE_CHUNK_SIZE = int(os.getenv('WRITE_CHUNK_SIZE', str(CHUNK_SIZE)))
|
|
BATCH_SIZE = 10 # Через сколько чанков выводить прогресс
|
|
REPLICATOR_SCHEMA = os.getenv('REPLICATOR_SCHEMA', 'replicator')
|
|
STATE_TABLE = 'migration_state'
|
|
TABLE_CONFIGS_TABLE = 'migration_tables'
|
|
ENABLE_TIMESCALE = os.getenv('ENABLE_TIMESCALE', 'false').lower() == 'true'
|
|
DRY_RUN = os.getenv('DRY_RUN', 'false').lower() == 'true'
|
|
READ_LIMIT = int(os.getenv('READ_LIMIT', '0')) or None
|
|
QUEUE_POLL_SECONDS = float(os.getenv('QUEUE_POLL_SECONDS', '1'))
|
|
SCHEDULE_GRACE_SECONDS = int(os.getenv('SCHEDULE_GRACE_SECONDS', '60'))
|
|
START_API_WORKER = os.getenv('START_API_WORKER', 'true').lower() == 'true'
|
|
CREATE_FOREIGN_KEYS = os.getenv('CREATE_FOREIGN_KEYS', 'true').lower() == 'true'
|
|
|
|
# Настройки таблиц. Для инкрементальной миграции заполните life_table
|
|
# primary_key и exclude_columns, чтобы запись можно было делать идемпотентно.
|
|
DEFAULT_TABLE_MIGRATIONS = [
|
|
TableMigrationConfig(
|
|
source_table='Oms_LPU',
|
|
target_table='Oms_LPU',
|
|
mode='incremental',
|
|
initial_load_mode='full_then_incremental',
|
|
life_table='Life_oms_LPU',
|
|
datetime_column='x_DateTime',
|
|
sequence_column='LPULifeID',
|
|
order_columns=['x_DateTime', 'LPULifeID'],
|
|
operation_column='x_Operation',
|
|
primary_key=['LPUID'],
|
|
exclude_columns=[
|
|
'LPULifeID',
|
|
'x_Operation',
|
|
'x_DateTime',
|
|
'x_Seance',
|
|
'x_User',
|
|
],
|
|
timescale=False,
|
|
enabled=True,
|
|
),
|
|
# TableMigrationConfig(
|
|
# source_table='stt_MigrationPatient',
|
|
# target_table='stt_MigrationPatient',
|
|
# mode='incremental',
|
|
# initial_load_mode='full_then_incremental',
|
|
# life_table='Life_stt_MigrationPatient',
|
|
# datetime_column='x_DateTime',
|
|
# sequence_column='MigrationPatientLifeID',
|
|
# order_columns=['x_DateTime', 'MigrationPatientLifeID'],
|
|
# operation_column='x_Operation',
|
|
# primary_key=['Id'],
|
|
# exclude_columns=['MigrationPatientLifeID', 'x_Operation', 'x_DateTime', 'x_Seance', 'x_User'],
|
|
# timescale=True,
|
|
# timescale_time_column='x_DateTime',
|
|
# ),
|
|
]
|
|
TABLE_MIGRATIONS = list(DEFAULT_TABLE_MIGRATIONS)
|
|
TABLES_TO_COPY = [table.source_table for table in TABLE_MIGRATIONS]
|