From b5d1f61a82f66c8a205d809e08e3040d11239ef2 Mon Sep 17 00:00:00 2001 From: brusnitsyn Date: Wed, 10 Jun 2026 16:53:03 +0900 Subject: [PATCH] v2026.06 --- app/api.py | 30 +- app/backfill_watermarks.py | 27 +- app/config.py | 14 +- app/email_sender.py | 13 +- app/logging_utils.py | 9 + app/migrator.py | 617 +++++++++++++++++++++++++++++-------- app/queue.py | 64 +++- app/worker.py | 2 +- req.txt | 15 +- 9 files changed, 640 insertions(+), 151 deletions(-) diff --git a/app/api.py b/app/api.py index 6554d70..524653c 100644 --- a/app/api.py +++ b/app/api.py @@ -1,3 +1,4 @@ +from contextlib import asynccontextmanager from datetime import datetime from typing import List, Optional @@ -49,24 +50,37 @@ def create_app(): if FastAPI is None: return None - api = FastAPI(title="Syncio Migration API", version="0.1.0") + _shared: dict = {} - @api.on_event("startup") - def startup_queue(): + @asynccontextmanager + async def lifespan(app: FastAPI): + from sqlalchemy import create_engine + + config = Config() + _shared['engine'] = create_engine( + config.POSTGRES_CONNECTION_STRING, + pool_pre_ping=True, + pool_recycle=1800, + ) + _shared['config'] = config if Config.START_API_WORKER: migration_queue.start() + yield + + engine = _shared.get('engine') + if engine is not None: + engine.dispose() + + api = FastAPI(title="Syncio Migration API", version="0.1.0", lifespan=lifespan) + @api.get("/health") def health(): return {"status": "ok"} @api.get("/tables") def tables(): - from sqlalchemy import create_engine - - config = Config() - engine = create_engine(config.POSTGRES_CONNECTION_STRING) - repository = TableConfigRepository(config, engine) + repository = TableConfigRepository(_shared['config'], _shared['engine']) table_configs = repository.load_configs(seed_defaults=True) return [ { diff --git a/app/backfill_watermarks.py b/app/backfill_watermarks.py index b2383ea..d8dc565 100644 --- a/app/backfill_watermarks.py +++ b/app/backfill_watermarks.py @@ -24,6 +24,15 @@ def parse_args(): action="store_true", help="Только показать, какие watermark будут записаны, без сохранения в PostgreSQL.", ) + parser.add_argument( + "--from-pg", + action="store_true", + help=( + "Определить watermark из уже реплицированных данных PostgreSQL: " + "для каждой таблицы загружает PK из PG и ищет соответствующие " + "x_DateTime/sequence в MSSQL Life_, не читая всю таблицу заново." + ), + ) parser.add_argument( "--json", action="store_true", @@ -39,11 +48,19 @@ def main(): args = parse_args() config = Config() migrator = DatabaseMigrator(config) - summary = migrator.backfill_watermarks( - table_names=args.tables, - overwrite=args.overwrite, - dry_run=args.dry_run, - ) + + if args.from_pg: + summary = migrator.backfill_state_from_pg( + table_names=args.tables, + overwrite=args.overwrite, + dry_run=args.dry_run, + ) + else: + summary = migrator.backfill_watermarks( + table_names=args.tables, + overwrite=args.overwrite, + dry_run=args.dry_run, + ) if args.json: print(json.dumps(summary, ensure_ascii=False, indent=2, default=str)) diff --git a/app/config.py b/app/config.py index 937576e..15681c5 100644 --- a/app/config.py +++ b/app/config.py @@ -109,11 +109,23 @@ class Config: ENABLE_TIMESCALE = os.getenv('ENABLE_TIMESCALE', 'false').lower() == 'true' DRY_RUN = os.getenv('DRY_RUN', 'false').lower() == 'true' READ_LIMIT = int(os.getenv('READ_LIMIT', '0')) or None - QUEUE_POLL_SECONDS = float(os.getenv('QUEUE_POLL_SECONDS', '1')) + QUEUE_POLL_SECONDS = float(os.getenv('QUEUE_POLL_SECONDS', '1800')) SCHEDULE_GRACE_SECONDS = int(os.getenv('SCHEDULE_GRACE_SECONDS', '60')) START_API_WORKER = os.getenv('START_API_WORKER', 'true').lower() == 'true' CREATE_FOREIGN_KEYS = os.getenv('CREATE_FOREIGN_KEYS', 'true').lower() == 'true' + def validate(self): + """Проверка обязательных параметров конфигурации.""" + missing = [] + if not self.MSSQL_CONNECTION_STRING: + missing.append('MSSQL_CONNECTION_STRING') + if not self.POSTGRES_CONNECTION_STRING: + missing.append('POSTGRES_CONNECTION_STRING') + if missing: + raise ValueError( + f"Обязательные переменные окружения не заданы: {', '.join(missing)}" + ) + # Настройки таблиц. Для инкрементальной миграции заполните life_table # primary_key и exclude_columns, чтобы запись можно было делать идемпотентно. DEFAULT_TABLE_MIGRATIONS = [ diff --git a/app/email_sender.py b/app/email_sender.py index 2419a0e..4bfc8c6 100644 --- a/app/email_sender.py +++ b/app/email_sender.py @@ -1,3 +1,4 @@ +import logging import os import smtplib from email.mime.application import MIMEApplication @@ -7,6 +8,8 @@ from typing import List from .config import Config +logger = logging.getLogger(__name__) + class EmailSender: """Класс для отправки email уведомлений""" @@ -16,9 +19,9 @@ class EmailSender: def send_email(self, subject: str, body: str, attachments: List[str] = None): """Отправка email с вложениями""" - if not all([self.config.EMAIL_HOST, self.config.EMAIL_USER, + if not all([self.config.EMAIL_HOST, self.config.EMAIL_USER, self.config.EMAIL_PASSWORD, self.config.EMAIL_FROM]): - print("Настройки email не заполнены. Отправка email пропущена.") + logger.warning("Настройки email не заполнены. Отправка email пропущена.") return False try: @@ -45,11 +48,11 @@ class EmailSender: server.login(self.config.EMAIL_USER, self.config.EMAIL_PASSWORD) server.send_message(msg) - print(f"Email успешно отправлен на {', '.join(self.config.EMAIL_TO)}") + logger.info(f"Email успешно отправлен на {', '.join(self.config.EMAIL_TO)}") return True - + except Exception as e: - print(f"Ошибка при отправке email: {e}") + logger.error(f"Ошибка при отправке email: {e}") return False diff --git a/app/logging_utils.py b/app/logging_utils.py index d080b57..fbd85dd 100644 --- a/app/logging_utils.py +++ b/app/logging_utils.py @@ -144,5 +144,14 @@ class MigrationLogger: except Exception as e: return f"Ошибка при чтении лог-файла: {e}" + def close(self): + """Закрытие и удаление всех хендлеров логгера.""" + for handler in list(self.logger.handlers): + try: + handler.close() + except Exception: + pass + self.logger.removeHandler(handler) + # ============================================================================ diff --git a/app/migrator.py b/app/migrator.py index 439c5ee..b8058ac 100644 --- a/app/migrator.py +++ b/app/migrator.py @@ -1,5 +1,6 @@ import os import re +import threading import time import csv import io @@ -17,15 +18,98 @@ from .email_sender import EmailSender from .logging_utils import MigrationLogger from .table_config_repository import TableConfigRepository +_CACHE_MISS = object() + + +class MssqlMetadataCache: + """Thread-safe синглтон кэша метаданных MSSQL. + + Разделяется между всеми экземплярами DatabaseMigrator в процессе, + поэтому metadata-запросы к MSSQL выполняются один раз на таблицу + независимо от количества job-запусков. + """ + + _instance: Optional['MssqlMetadataCache'] = None + _class_lock = threading.Lock() + + def __new__(cls) -> 'MssqlMetadataCache': + with cls._class_lock: + if cls._instance is None: + inst = super().__new__(cls) + inst._lock = threading.Lock() + inst._columns: Dict[str, List[Dict[str, Any]]] = {} + inst._indexes: Dict[str, List[Dict[str, Any]]] = {} + inst._pk: Dict[str, Optional[List[str]]] = {} + inst._fk: Dict[str, Optional[List[Dict[str, str]]]] = {} + cls._instance = inst + return cls._instance + + def get_columns(self, key: str): + with self._lock: + return self._columns.get(key, _CACHE_MISS) + + def set_columns(self, key: str, value: List[Dict[str, Any]]): + with self._lock: + self._columns[key] = value + + def get_indexes(self, key: str): + with self._lock: + return self._indexes.get(key, _CACHE_MISS) + + def set_indexes(self, key: str, value: List[Dict[str, Any]]): + with self._lock: + self._indexes[key] = value + + def get_pk(self, key: str): + with self._lock: + return self._pk.get(key, _CACHE_MISS) + + def set_pk(self, key: str, value: Optional[List[str]]): + with self._lock: + self._pk[key] = value + + def get_fk(self, key: str): + with self._lock: + return self._fk.get(key, _CACHE_MISS) + + def set_fk(self, key: str, value: Optional[List[Dict[str, str]]]): + with self._lock: + self._fk[key] = value + + def invalidate(self, table_name: str): + """Инвалидация кэша одной таблицы (например, после force_full).""" + key = table_name.lower() + with self._lock: + self._columns.pop(key, None) + self._indexes.pop(key, None) + self._pk.pop(key, None) + self._fk.pop(key, None) + + def clear(self): + """Полная очистка кэша (например, после переподключения к MSSQL).""" + with self._lock: + self._columns.clear() + self._indexes.clear() + self._pk.clear() + self._fk.clear() + + +_mssql_metadata_cache = MssqlMetadataCache() + class DatabaseMigrator: """Основной класс для миграции данных""" def __init__(self, config: Config): self.config = config + config.validate() # Подключение к PostgreSQL и загрузка конфигурации таблиц из БД - self.dst_engine = create_engine(config.POSTGRES_CONNECTION_STRING) + self.dst_engine = create_engine( + config.POSTGRES_CONNECTION_STRING, + pool_pre_ping=True, + pool_recycle=1800, + ) self.table_config_repository = TableConfigRepository(config, self.dst_engine) self.table_configs = self.table_config_repository.load_configs(seed_defaults=True) self.config.TABLE_MIGRATIONS = self.table_configs @@ -33,21 +117,21 @@ class DatabaseMigrator: self.logger = MigrationLogger(config) self.email_sender = EmailSender(config) - self._source_columns_cache: Dict[str, List[Dict[str, Any]]] = {} - self._mssql_indexes_cache: Dict[str, List[Dict[str, Any]]] = {} - self._mssql_pk_cache: Dict[str, Optional[List[str]]] = {} - self._mssql_fk_cache: Dict[str, Optional[List[Dict[str, str]]]] = {} + self._state_table_ready: bool = False # Подключение к БД self.logger.log_info( f"Подключение к БД МИС (MSSQL), charset={self.config.MSSQL_CHARSET}" ) + # src_engine — NullPool для потоковых data-запросов (long-lived cursor) self.src_engine = self.create_mssql_engine() - + # _src_metadata_engine — пул 1+1 для коротких metadata-запросов + self._src_metadata_engine = self.create_mssql_metadata_engine() + self.logger.log_info("Подключение к PostgreSQL") def create_mssql_engine(self): - """Создание SQLAlchemy engine для MSSQL с явной кодировкой pymssql.""" + """MSSQL engine без пула — для потоковых data-запросов (long-lived cursor).""" return create_engine( self.config.MSSQL_CONNECTION_STRING, connect_args={ @@ -57,17 +141,30 @@ class DatabaseMigrator: poolclass=NullPool, ) + def create_mssql_metadata_engine(self): + """MSSQL engine с пулом 1+1 — для коротких metadata-запросов.""" + return create_engine( + self.config.MSSQL_CONNECTION_STRING, + connect_args={ + 'charset': self.config.MSSQL_CHARSET, + 'login_timeout': self.config.MSSQL_CONNECT_TIMEOUT, + }, + pool_size=1, + max_overflow=1, + pool_recycle=300, + pool_pre_ping=True, + ) + def reconnect_mssql_engine(self): """Пересоздание MSSQL engine после сетевого/драйверного сбоя.""" - try: - self.src_engine.dispose() - except Exception: - pass - self._source_columns_cache.clear() - self._mssql_indexes_cache.clear() - self._mssql_pk_cache.clear() - self._mssql_fk_cache.clear() + for engine in (self.src_engine, self._src_metadata_engine): + try: + engine.dispose() + except Exception: + pass + _mssql_metadata_cache.clear() self.src_engine = self.create_mssql_engine() + self._src_metadata_engine = self.create_mssql_metadata_engine() def is_retryable_mssql_error(self, exception: Exception) -> bool: """Определение временной ошибки MSSQL/pymssql, при которой есть смысл повторить таблицу.""" @@ -192,10 +289,13 @@ class DatabaseMigrator: def get_source_table_columns(self, table_name: str) -> List[Dict[str, Any]]: """Получение колонок исходной MSSQL таблицы.""" - cache_key = table_name.lower() - if cache_key not in self._source_columns_cache: - self._source_columns_cache[cache_key] = inspect(self.src_engine).get_columns(table_name) - return self._source_columns_cache[cache_key] + key = table_name.lower() + cached = _mssql_metadata_cache.get_columns(key) + if cached is not _CACHE_MISS: + return cached + result = inspect(self._src_metadata_engine).get_columns(table_name) + _mssql_metadata_cache.set_columns(key, result) + return result def get_target_table_columns(self, table_name: str) -> List[Dict[str, Any]]: """Получение колонок целевой PostgreSQL таблицы.""" @@ -269,6 +369,9 @@ class DatabaseMigrator: def ensure_state_table(self): """Создание таблицы состояния инкрементальной миграции.""" + if self._state_table_ready: + return + qualified_state_table = self.qualify_table_name( self.config.STATE_TABLE, self.config.REPLICATOR_SCHEMA, @@ -305,6 +408,8 @@ class DatabaseMigrator: """)) conn.commit() + self._state_table_ready = True + def get_last_watermark(self, table_name: str) -> Dict[str, Any]: """Чтение последнего успешно обработанного x_DateTime.""" empty_watermark = {'last_x_datetime': None, 'last_sequence_value': None} @@ -396,7 +501,7 @@ class DatabaseMigrator: ]) sql = text(f""" SELECT TOP 1 {', '.join(select_columns)} - FROM {self.quote_mssql_identifier(table_config.read_table)} + FROM {self.quote_mssql_identifier(table_config.read_table)} WITH (NOLOCK) ORDER BY {order_columns} """) with self.src_engine.connect() as conn: @@ -461,7 +566,7 @@ class DatabaseMigrator: sql = text(f""" SELECT {top_clause}* - FROM {self.quote_mssql_identifier(table_config.read_table)} + FROM {self.quote_mssql_identifier(table_config.read_table)} WITH (NOLOCK) WHERE {' AND '.join(where_parts)} ORDER BY {order_columns} """) @@ -479,11 +584,9 @@ class DatabaseMigrator: read_limit: Optional[int] = None, ): """Чтение полной таблицы чанками с опциональным лимитом для проверки.""" - if read_limit: - sql = text(f"SELECT TOP {int(read_limit)} * FROM {self.quote_mssql_identifier(table_name)}") - return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE) - - return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE) + top_clause = f"TOP {int(read_limit)} " if read_limit else "" + sql = text(f"SELECT {top_clause}* FROM {self.quote_mssql_identifier(table_name)}") + return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE) def write_dataframe_batch( self, @@ -496,6 +599,10 @@ class DatabaseMigrator: self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}") return + if if_exists == 'append' and not chunk.empty: + self.write_dataframe_batch_without_sqlalchemy(chunk, table_name) + return + chunk.to_sql( table_name, self.dst_engine, @@ -568,6 +675,7 @@ class DatabaseMigrator: dst_connection = self.dst_engine.raw_connection() cursor = dst_connection.cursor() try: + cursor.execute("SET synchronous_commit = off") cursor.copy_expert(copy_sql, buffer) dst_connection.commit() except Exception: @@ -829,56 +937,47 @@ class DatabaseMigrator: def get_mssql_indexes(self, table_name: str) -> List[Dict[str, Any]]: """Получение индексов из MSSQL таблицы""" - cache_key = table_name.lower() - if cache_key in self._mssql_indexes_cache: - return self._mssql_indexes_cache[cache_key] + key = table_name.lower() + cached = _mssql_metadata_cache.get_indexes(key) + if cached is not _CACHE_MISS: + return cached - indexes = [] - - query = f""" - SELECT + indexes: List[Dict[str, Any]] = [] + + query = text(""" + SELECT i.name as index_name, i.is_unique, - i.type_desc as index_type, c.name as column_name, - ic.key_ordinal as column_position, - ic.is_descending_key + ic.key_ordinal as column_position FROM sys.indexes i INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id INNER JOIN sys.tables t ON i.object_id = t.object_id - WHERE t.name = '{table_name}' + WHERE t.name = :table_name AND i.type_desc NOT IN ('HEAP', 'CLUSTERED') AND i.is_primary_key = 0 ORDER BY i.name, ic.key_ordinal - """ - + """) + try: - index_df = pd.read_sql_query(query, self.src_engine) - - if not index_df.empty: - for index_name in index_df['index_name'].unique(): - index_data = index_df[index_df['index_name'] == index_name] - unique = bool(index_data.iloc[0]['is_unique']) - - columns = [] - for _, row in index_data.sort_values('column_position').iterrows(): - column_name = row['column_name'] - column_name = column_name.replace('[', '').replace(']', '') - columns.append(column_name) - - indexes.append({ - 'name': index_name, - 'unique': unique, - 'columns': columns - }) - + with self._src_metadata_engine.connect() as conn: + rows = conn.execute(query, {'table_name': table_name}).mappings().all() + + grouped: Dict[str, Dict[str, Any]] = {} + for row in rows: + name = row['index_name'] + if name not in grouped: + grouped[name] = {'name': name, 'unique': bool(row['is_unique']), 'columns': []} + grouped[name]['columns'].append(row['column_name'].replace('[', '').replace(']', '')) + + indexes = list(grouped.values()) self.logger.log_info(f"Найдено {len(indexes)} индексов для таблицы {table_name}") - + except Exception as e: self.logger.log_error(f"Ошибка при получении индексов для таблицы {table_name}", e) - - self._mssql_indexes_cache[cache_key] = indexes + + _mssql_metadata_cache.set_indexes(key, indexes) return indexes def create_pg_indexes(self, table_name: str, indexes: List[Dict[str, Any]]): @@ -908,32 +1007,33 @@ class DatabaseMigrator: def get_mssql_primary_key(self, table_name: str) -> Optional[List[str]]: """Получение информации о первичном ключе из MSSQL""" - cache_key = table_name.lower() - if cache_key in self._mssql_pk_cache: - return self._mssql_pk_cache[cache_key] + key = table_name.lower() + cached = _mssql_metadata_cache.get_pk(key) + if cached is not _CACHE_MISS: + return cached - query = f""" - SELECT - c.name as column_name + query = text(""" + SELECT c.name as column_name FROM sys.indexes i INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id INNER JOIN sys.tables t ON i.object_id = t.object_id - WHERE t.name = '{table_name}' + WHERE t.name = :table_name AND i.is_primary_key = 1 ORDER BY ic.key_ordinal - """ - + """) + try: - pk_df = pd.read_sql_query(query, self.src_engine) - if not pk_df.empty: - pk_columns = [row['column_name'] for _, row in pk_df.iterrows()] + with self._src_metadata_engine.connect() as conn: + rows = conn.execute(query, {'table_name': table_name}).mappings().all() + if rows: + pk_columns = [row['column_name'] for row in rows] self.logger.log_info(f"Найден первичный ключ для таблицы {table_name}: {pk_columns}") - self._mssql_pk_cache[cache_key] = pk_columns + _mssql_metadata_cache.set_pk(key, pk_columns) return pk_columns except Exception as e: self.logger.log_error(f"Ошибка при получении первичного ключа для таблицы {table_name}", e) - self._mssql_pk_cache[cache_key] = None + _mssql_metadata_cache.set_pk(key, None) return None def create_pg_primary_key(self, table_name: str, pk_columns: List[str]): @@ -983,12 +1083,13 @@ class DatabaseMigrator: def get_mssql_foreign_keys(self, table_name: str) -> Optional[List[Dict[str, str]]]: """Получение информации о внешних ключах из MSSQL""" - cache_key = table_name.lower() - if cache_key in self._mssql_fk_cache: - return self._mssql_fk_cache[cache_key] + key = table_name.lower() + cached = _mssql_metadata_cache.get_fk(key) + if cached is not _CACHE_MISS: + return cached - query = f""" - SELECT + query = text(""" + SELECT fk.name as fk_name, pc.name as parent_column, rc.name as referenced_column, @@ -999,28 +1100,31 @@ class DatabaseMigrator: INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id INNER JOIN sys.tables pt ON fkc.parent_object_id = pt.object_id - WHERE pt.name = '{table_name}' + WHERE pt.name = :table_name ORDER BY fk.name, fkc.constraint_column_id - """ - + """) + try: - fk_df = pd.read_sql_query(query, self.src_engine) - if not fk_df.empty: - fks = [] - for fk_name in fk_df['fk_name'].unique(): - fk_data = fk_df[fk_df['fk_name'] == fk_name] - fks.append({ - 'name': fk_name, - 'parent_column': fk_data.iloc[0]['parent_column'], - 'referenced_table': fk_data.iloc[0]['referenced_table'].lower(), - 'referenced_column': fk_data.iloc[0]['referenced_column'] - }) + with self._src_metadata_engine.connect() as conn: + rows = conn.execute(query, {'table_name': table_name}).mappings().all() + if rows: + grouped: Dict[str, Dict[str, str]] = {} + for row in rows: + name = row['fk_name'] + if name not in grouped: + grouped[name] = { + 'name': name, + 'parent_column': row['parent_column'], + 'referenced_table': row['referenced_table'].lower(), + 'referenced_column': row['referenced_column'], + } + fks = list(grouped.values()) self.logger.log_info(f"Найдено {len(fks)} внешних ключей для таблицы {table_name}") - self._mssql_fk_cache[cache_key] = fks + _mssql_metadata_cache.set_fk(key, fks) return fks except Exception as e: self.logger.log_error(f"Ошибка при получении внешних ключей для таблицы {table_name}", e) - self._mssql_fk_cache[cache_key] = None + _mssql_metadata_cache.set_fk(key, None) return None def create_pg_foreign_keys(self, table_name: str, foreign_keys: List[Dict[str, str]]): @@ -1064,37 +1168,32 @@ class DatabaseMigrator: self.logger.log_info(f"DRY RUN: пропущен ANALYZE для {table_name}") return try: - with self.dst_engine.connect() as conn: - sql = f'ANALYZE "{table_name}"' - conn.execute(text(sql)) - conn.commit() + with self.dst_engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + conn.execute(text(f'ANALYZE {self.quote_identifier(table_name)}')) self.logger.log_info(f"Выполнен ANALYZE для таблицы {table_name}") except Exception as e: self.logger.log_error(f"Ошибка при выполнении ANALYZE для таблицы {table_name}", e) - + def vacuum_analyze_table(self, table_name: str): """Выполнение VACUUM ANALYZE для таблицы""" if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущен VACUUM ANALYZE для {table_name}") return try: - with self.dst_engine.connect() as conn: - sql = f'VACUUM ANALYZE "{table_name}"' - conn.execute(text(sql)) - conn.commit() + with self.dst_engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + conn.execute(text(f'VACUUM ANALYZE {self.quote_identifier(table_name)}')) self.logger.log_info(f"Выполнен VACUUM ANALYZE для таблицы {table_name}") except Exception as e: self.logger.log_error(f"Ошибка при выполнении VACUUM ANALYZE для таблицы {table_name}", e) - + def update_table_statistics(self): """Обновление статистики для всей базы данных""" if self.config.DRY_RUN: self.logger.log_info("DRY RUN: пропущен ANALYZE для всей базы данных") return try: - with self.dst_engine.connect() as conn: + with self.dst_engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: conn.execute(text('ANALYZE')) - conn.commit() self.logger.log_info("Выполнен ANALYZE для всей базы данных") except Exception as e: self.logger.log_error("Ошибка при выполнении ANALYZE для всей базы данных", e) @@ -1513,6 +1612,285 @@ class DatabaseMigrator: return report + def _get_pg_row_count(self, table_name: str) -> int: + """Количество строк в PostgreSQL таблице.""" + with self.dst_engine.connect() as conn: + return conn.execute( + text(f'SELECT COUNT(*) FROM {self.quote_identifier(table_name)}') + ).scalar() or 0 + + @staticmethod + def _format_mssql_literal(value) -> str: + """Форматирование Python-значения как MSSQL SQL-литерал.""" + import decimal + from datetime import date, datetime as dt + + if value is None: + return 'NULL' + if isinstance(value, bool): + return '1' if value else '0' + if isinstance(value, (int, float)): + return str(value) + if isinstance(value, decimal.Decimal): + return str(value) + if isinstance(value, dt): + return "CONVERT(datetime, '" + value.strftime('%Y-%m-%d %H:%M:%S.') + f"{value.microsecond // 1000:03d}' , 121)" + if isinstance(value, date): + return "CONVERT(date, '" + value.strftime('%Y-%m-%d') + "', 23)" + if isinstance(value, (bytes, bytearray)): + return '0x' + value.hex() + return "'" + str(value).replace("'", "''") + "'" + + def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig) -> Dict[str, Any]: + """Поиск watermark в MSSQL Life_ для строк, уже имеющихся в PostgreSQL. + + Читает PK из PG потоком (без загрузки всей таблицы в память), батчами + запрашивает MSSQL Life_ и находит максимальную (datetime, sequence) пару + среди событий, относящихся к уже реплицированным строкам. + """ + empty: Dict[str, Any] = {'last_x_datetime': None, 'last_sequence_value': None} + pk_cols = table_config.primary_key + + if not pk_cols or not table_config.life_table: + return self.get_incremental_upper_bound(table_config) + + LOOKUP_BATCH = 500 + + order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} DESC"] + if table_config.sequence_column: + order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} DESC") + order_clause = ', '.join(order_parts) + + select_parts = [ + f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_dt", + ] + if table_config.sequence_column: + select_parts.append( + f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_seq" + ) + select_clause = ', '.join(select_parts) + + max_datetime = None + max_sequence = None + total_rows = 0 + batch_num = 0 + + quoted_pk = ', '.join(self.quote_identifier(c) for c in pk_cols) + pk_sql = text(f'SELECT {quoted_pk} FROM {self.quote_identifier(table_config.pg_table)}') + + self.logger.log_info( + f"Потоковое чтение PK из {table_config.pg_table}, " + f"поиск watermark в {table_config.life_table} батчами по {LOOKUP_BATCH}" + ) + + with self.dst_engine.connect().execution_options( + stream_results=True, yield_per=LOOKUP_BATCH + ) as conn: + for batch in conn.execute(pk_sql).partitions(LOOKUP_BATCH): + if not batch: + continue + + batch_num += 1 + total_rows += len(batch) + + if len(pk_cols) == 1: + col = pk_cols[0] + values_str = ', '.join(self._format_mssql_literal(row[0]) for row in batch) + where = f"{self.quote_mssql_identifier(col)} IN ({values_str})" + else: + conditions = [] + for row in batch: + parts = [ + f"{self.quote_mssql_identifier(col)} = {self._format_mssql_literal(row[i])}" + for i, col in enumerate(pk_cols) + ] + conditions.append(f"({' AND '.join(parts)})") + where = ' OR '.join(conditions) + + sql = text(f""" + SELECT TOP 1 {select_clause} + FROM {self.quote_mssql_identifier(table_config.life_table)} + WHERE {where} + ORDER BY {order_clause} + """) + + try: + with self.src_engine.connect() as src_conn: + row = src_conn.execute(sql).mappings().first() + + if row and row['max_dt'] is not None: + batch_dt = row['max_dt'] + batch_seq = row['max_seq'] if table_config.sequence_column else None + + if ( + max_datetime is None + or batch_dt > max_datetime + or ( + batch_dt == max_datetime + and batch_seq is not None + and (max_sequence is None or batch_seq > max_sequence) + ) + ): + max_datetime = batch_dt + max_sequence = batch_seq + + except Exception as e: + self.logger.log_warning( + f"Ошибка при поиске watermark батч #{batch_num} " + f"(строки {total_rows - len(batch) + 1}–{total_rows}): {e}" + ) + + self.logger.log_info( + f"Обработано {total_rows} PK из {table_config.pg_table} " + f"в {batch_num} батчах, watermark: {max_datetime}" + ) + + return {'last_x_datetime': max_datetime, 'last_sequence_value': max_sequence} + + def backfill_state_from_pg( + self, + table_names: Optional[List[str]] = None, + overwrite: bool = False, + dry_run: bool = False, + ) -> Dict[str, Any]: + """Заполнение migration_state watermark-ами из уже реплицированных данных PostgreSQL. + + Для каждой incremental-таблицы, данные которой уже находятся в PG, + определяет watermark путём поиска последних соответствующих событий + в MSSQL Life_-таблице и сохраняет их в replicator.migration_state. + """ + original_dry_run = self.config.DRY_RUN + self.config.DRY_RUN = dry_run + + try: + table_configs = self.table_configs + if table_names: + requested = {t.lower() for t in table_names} + table_configs = [ + tc for tc in self.table_configs + if tc.source_table.lower() in requested or tc.pg_table in requested + ] + + summary: Dict[str, Any] = { + 'processed': 0, + 'updated': 0, + 'skipped': 0, + 'failed': 0, + 'tables': [], + } + + self.logger.log_info('=' * 60) + self.logger.log_info('НАЧАЛО BACKFILL WATERMARKS ИЗ POSTGRESQL') + self.logger.log_info(f"Таблиц для обработки: {len(table_configs)}") + self.logger.log_info(f"Overwrite existing: {overwrite}") + self.logger.log_info(f"Dry run: {dry_run}") + self.logger.log_info('=' * 60) + + for table_config in table_configs: + summary['processed'] += 1 + source_table = table_config.source_table + pg_table = table_config.pg_table + + if table_config.mode != 'incremental' or not table_config.life_table: + self.logger.log_info( + f"Пропуск {source_table}: не incremental с life_table" + ) + summary['skipped'] += 1 + summary['tables'].append({ + 'table': source_table, 'status': 'skipped', 'reason': 'not_incremental', + }) + continue + + if not self.table_exists(pg_table): + self.logger.log_info( + f"Пропуск {source_table}: {pg_table} отсутствует в PostgreSQL" + ) + summary['skipped'] += 1 + summary['tables'].append({ + 'table': source_table, 'status': 'skipped', 'reason': 'pg_table_missing', + }) + continue + + existing = self.get_last_watermark(pg_table) + if existing['last_x_datetime'] is not None and not overwrite: + self.logger.log_info( + f"Пропуск {source_table}: watermark уже есть ({existing['last_x_datetime']})" + ) + summary['skipped'] += 1 + summary['tables'].append({ + 'table': source_table, + 'status': 'skipped', + 'reason': 'watermark_exists', + 'watermark': existing, + }) + continue + + try: + pg_count = self._get_pg_row_count(pg_table) + + if pg_count == 0: + self.logger.log_info(f"Пропуск {source_table}: {pg_table} пуста") + summary['skipped'] += 1 + summary['tables'].append({ + 'table': source_table, 'status': 'skipped', 'reason': 'pg_table_empty', + }) + continue + + self.logger.log_info( + f"Определение watermark для {source_table} ({pg_count} строк в PG)..." + ) + watermark = self._get_watermark_for_pg_data(table_config) + + if watermark['last_x_datetime'] is None: + self.logger.log_warning( + f"Пропуск {source_table}: не удалось определить watermark из данных PG" + ) + summary['skipped'] += 1 + summary['tables'].append({ + 'table': source_table, + 'status': 'skipped', + 'reason': 'watermark_not_determined', + }) + continue + + self.save_watermark( + pg_table, + watermark['last_x_datetime'], + watermark['last_sequence_value'], + pg_count, + 'success', + ) + self.logger.log_info( + f"Watermark сохранен для {pg_table}: {watermark}, rows_copied={pg_count}" + ) + summary['updated'] += 1 + summary['tables'].append({ + 'table': source_table, + 'status': 'updated', + 'watermark': watermark, + 'rows': pg_count, + }) + + except Exception as e: + self.logger.log_error(f"Ошибка backfill для {source_table}", e) + summary['failed'] += 1 + summary['tables'].append({ + 'table': source_table, 'status': 'failed', 'error': str(e), + }) + + self.logger.log_info('=' * 60) + self.logger.log_info('ИТОГ BACKFILL ИЗ POSTGRESQL') + self.logger.log_info( + f"Обработано: {summary['processed']}, обновлено: {summary['updated']}, " + f"пропущено: {summary['skipped']}, ошибок: {summary['failed']}" + ) + self.logger.log_info('=' * 60) + + return summary + + finally: + self.config.DRY_RUN = original_dry_run + def backfill_watermarks( self, table_names: Optional[List[str]] = None, @@ -1708,22 +2086,17 @@ Job ID: {job_id or 'n/a'} self.email_sender.send_email(subject, body, attachments) def cleanup_old_logs(self, days_to_keep: int = 7): - """Очистка старых логов""" + """Удаление .log-файлов старше days_to_keep дней. JSON-отчёты не удаляются.""" try: - cutoff_date = datetime.now().timestamp() - (days_to_keep * 24 * 3600) - + cutoff_ts = datetime.now().timestamp() - (days_to_keep * 24 * 3600) for filename in os.listdir(self.config.LOG_DIR): + if not filename.endswith('.log'): + continue filepath = os.path.join(self.config.LOG_DIR, filename) - - # Проверяем, что это лог-файл - if filename.endswith('.log') or filename.endswith('.json'): - # Получаем время последнего изменения - file_mtime = os.path.getmtime(filepath) - - # Удаляем старые файлы - if file_mtime < cutoff_date: - os.remove(filepath) - self.logger.log_info(f"Удален старый файл: {filename}") - + if not os.path.isfile(filepath): + continue + if os.path.getmtime(filepath) < cutoff_ts: + os.remove(filepath) + self.logger.log_info(f"Удален старый лог-файл: {filename}") except Exception as e: self.logger.log_error("Ошибка при очистке старых логов", e) diff --git a/app/queue.py b/app/queue.py index 5a9bfcb..d6f56f2 100644 --- a/app/queue.py +++ b/app/queue.py @@ -387,6 +387,13 @@ class MigrationJobQueue: CREATE INDEX IF NOT EXISTS idx_{self.SCHEDULES_TABLE}_enabled_next_run_at ON {self._qualified_table(self.SCHEDULES_TABLE)} (enabled, next_run_at) """)) + conn.execute(self._text(f""" + UPDATE {self._qualified_table(self.JOBS_TABLE)} + SET status = 'failed', + finished_at = :now, + error = 'Прервано перезапуском воркера' + WHERE status = 'running' + """), {'now': datetime.now()}) conn.commit() self.schema_ready = True @@ -477,6 +484,8 @@ class MigrationJobQueue: self._execute_job(self._job_from_row(job_row)) continue except Exception: + import logging + logging.getLogger(__name__).exception("Необработанная ошибка в _worker_loop") time.sleep(1.0) continue @@ -577,7 +586,14 @@ class MigrationJobQueue: return row def _execute_job(self, job: MigrationJob): + import logging + logger = logging.getLogger(__name__) + migrator = None + final_status = 'failed' + final_report = None + final_error = 'Неизвестная ошибка' + try: from .migrator import DatabaseMigrator @@ -591,11 +607,14 @@ class MigrationJobQueue: force_full=job.force_full, ) migrator.cleanup_old_logs(days_to_keep=7) - self._finish_job(job.job_id, status='completed', report=report, error=None) + final_status = 'completed' + final_report = report + final_error = None if job.send_email: migrator.send_notification(report) except Exception as exc: - self._finish_job(job.job_id, status='failed', report=None, error=str(exc)) + final_error = str(exc) + logger.exception(f"Ошибка выполнения job {job.job_id}") if job.send_email and migrator is not None: try: migrator.send_failure_notification( @@ -605,6 +624,47 @@ class MigrationJobQueue: ) except Exception: pass + finally: + if migrator is not None: + try: + migrator.logger.close() + except Exception: + pass + try: + self._finish_job( + job.job_id, + status=final_status, + report=final_report, + error=final_error, + ) + except Exception: + logger.exception(f"Не удалось финализировать job {job.job_id}, принудительный сброс") + self._force_fail_job(job.job_id, 'Ошибка финализации job') + + def _force_fail_job(self, job_id: str, error: str): + """Аварийная финализация job через новое соединение вне пула.""" + try: + from sqlalchemy import create_engine + from sqlalchemy.pool import NullPool + + engine = create_engine(Config.POSTGRES_CONNECTION_STRING, poolclass=NullPool) + try: + with engine.connect() as conn: + conn.execute(self._text(f""" + UPDATE {self._qualified_table(self.JOBS_TABLE)} + SET status = 'failed', + finished_at = :now, + error = :error + WHERE job_id = :job_id + """), {'now': datetime.now(), 'error': error, 'job_id': job_id}) + conn.commit() + finally: + engine.dispose() + except Exception: + import logging + logging.getLogger(__name__).exception( + f"Критическая ошибка: не удалось аварийно завершить job {job_id}" + ) def _finish_job( self, diff --git a/app/worker.py b/app/worker.py index 3cfde99..4c9d112 100644 --- a/app/worker.py +++ b/app/worker.py @@ -19,7 +19,7 @@ def main(): migration_queue.start() while not stop_requested: - time.sleep(max(Config.QUEUE_POLL_SECONDS, 1.0)) + time.sleep(max(Config.QUEUE_POLL_SECONDS, 1800.0)) if __name__ == "__main__": diff --git a/req.txt b/req.txt index 58ba55d..f9163cb 100644 --- a/req.txt +++ b/req.txt @@ -1,7 +1,8 @@ -pandas -sqlalchemy -pymssql -psycopg2-binary -fastapi -uvicorn -python-dotenv +pandas==3.0.2 +SQLAlchemy==2.0.49 +pymssql==2.3.13 +psycopg2-binary==2.9.11 +fastapi==0.135.3 +uvicorn==0.44.0 +pydantic==2.12.5 +python-dotenv>=1.0.0