From ff810e59bc7e331f0cda986dcf0706c92c8f1085 Mon Sep 17 00:00:00 2001 From: brusnitsyn Date: Fri, 29 May 2026 06:47:29 +0900 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=B5=D0=B1=D0=BE=D0=BB=D1=8C=D1=88?= =?UTF-8?q?=D0=B8=D0=B5=20=D0=B8=D0=B7=D0=BC=D0=B5=D0=BD=D0=B5=D0=BD=D0=B8?= =?UTF-8?q?=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 2 + app/config.py | 2 + app/logging_utils.py | 19 +-- app/migrator.py | 349 +++++++++++++++++++++++++++++++++---------- 4 files changed, 284 insertions(+), 88 deletions(-) diff --git a/.env.example b/.env.example index b0fe51b..3411931 100644 --- a/.env.example +++ b/.env.example @@ -21,6 +21,8 @@ ENABLE_TIMESCALE=false DRY_RUN=false READ_LIMIT=0 CHUNK_SIZE=5000 +INCREMENTAL_CHUNK_SIZE=5000 +FULL_LOAD_CHUNK_SIZE=5000 WRITE_CHUNK_SIZE=5000 CREATE_FOREIGN_KEYS=true QUEUE_POLL_SECONDS=1 diff --git a/app/config.py b/app/config.py index fd0adf6..937576e 100644 --- a/app/config.py +++ b/app/config.py @@ -100,6 +100,8 @@ class Config: # Настройки миграции CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', '5000')) WRITE_CHUNK_SIZE = int(os.getenv('WRITE_CHUNK_SIZE', str(CHUNK_SIZE))) + INCREMENTAL_CHUNK_SIZE = int(os.getenv('INCREMENTAL_CHUNK_SIZE', str(CHUNK_SIZE * 2))) + FULL_LOAD_CHUNK_SIZE = int(os.getenv('FULL_LOAD_CHUNK_SIZE', str(CHUNK_SIZE * 4))) BATCH_SIZE = 10 # Через сколько чанков выводить прогресс REPLICATOR_SCHEMA = os.getenv('REPLICATOR_SCHEMA', 'replicator') STATE_TABLE = 'migration_state' diff --git a/app/logging_utils.py b/app/logging_utils.py index 22a8570..d080b57 100644 --- a/app/logging_utils.py +++ b/app/logging_utils.py @@ -37,15 +37,16 @@ class MigrationLogger: def setup_logging(self): """Настройка системы логирования""" - logging.basicConfig( - level=self.config.LOG_LEVEL, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(self.log_file, encoding='utf-8'), - logging.StreamHandler() # Вывод в консоль - ] - ) - self.logger = logging.getLogger(__name__) + fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + self.logger = logging.getLogger(f"migration.{self.timestamp}") + self.logger.setLevel(self.config.LOG_LEVEL) + self.logger.propagate = False + file_handler = logging.FileHandler(self.log_file, encoding='utf-8') + file_handler.setFormatter(fmt) + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(fmt) + self.logger.addHandler(file_handler) + self.logger.addHandler(stream_handler) def log_info(self, message: str): """Логирование информационного сообщения""" diff --git a/app/migrator.py b/app/migrator.py index d626f62..439c5ee 100644 --- a/app/migrator.py +++ b/app/migrator.py @@ -1,12 +1,15 @@ import os import re import time +import csv +import io from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import pandas as pd from sqlalchemy import create_engine, inspect, text from sqlalchemy.exc import DBAPIError, OperationalError +from sqlalchemy.pool import NullPool from sqlalchemy.sql import sqltypes from .config import Config, TableMigrationConfig @@ -50,12 +53,8 @@ class DatabaseMigrator: connect_args={ 'charset': self.config.MSSQL_CHARSET, 'login_timeout': self.config.MSSQL_CONNECT_TIMEOUT, - 'timeout': self.config.MSSQL_CONNECT_TIMEOUT, }, - pool_pre_ping=True, - pool_recycle=self.config.MSSQL_POOL_RECYCLE, - pool_size=self.config.MSSQL_POOL_SIZE, - max_overflow=self.config.MSSQL_MAX_OVERFLOW, + poolclass=NullPool, ) def reconnect_mssql_engine(self): @@ -72,28 +71,46 @@ class DatabaseMigrator: def is_retryable_mssql_error(self, exception: Exception) -> bool: """Определение временной ошибки MSSQL/pymssql, при которой есть смысл повторить таблицу.""" - if isinstance(exception, (OperationalError, DBAPIError)): - message = str(exception).lower() - retry_markers = ( - 'dbprocess is dead', - 'adaptive server connection failed', - 'server closed the connection unexpectedly', - 'connection reset', - 'connection refused', - 'communication link failure', - 'lost connection', - 'closed connection', - 'not enabled', - '08s01', - ) - return any(marker in message for marker in retry_markers) - return False + message = str(exception).lower() + retry_markers = ( + 'dbprocess is dead', + 'unexpected eof', + 'adaptive server connection failed', + 'server closed the connection unexpectedly', + 'connection reset', + 'connection refused', + 'communication link failure', + 'lost connection', + 'closed connection', + 'not enabled', + '08s01', + '20002', + '20017', + ) + + if any(marker in message for marker in retry_markers): + return True + + return isinstance(exception, (OperationalError, DBAPIError)) def migrate_table_once(self, table_config: TableMigrationConfig, force_full: bool = False) -> bool: """Один проход миграции таблицы без retry-обертки.""" if force_full: self.logger.log_info(f"Force full reload для таблицы {table_config.source_table}") - success = self.migrate_full_table(table_config) + is_initial_force_full = ( + table_config.mode == 'incremental' + and table_config.initial_load_mode == 'full_then_incremental' + and not self.table_exists(table_config.pg_table) + and self.get_last_watermark(table_config.pg_table).get('last_x_datetime') is None + ) + if is_initial_force_full: + self.logger.log_info( + f"Первый force_full для {table_config.source_table}: " + "загрузка выполняется без SQLAlchemy" + ) + success = self.migrate_full_table_without_sqlalchemy(table_config) + else: + success = self.migrate_full_table(table_config) if success and table_config.mode == 'incremental' and table_config.life_table: upper_bound = self.get_incremental_upper_bound(table_config) self.save_watermark( @@ -453,7 +470,7 @@ class DatabaseMigrator: sql, self.src_engine, params=params, - chunksize=self.config.CHUNK_SIZE, + chunksize=self.config.INCREMENTAL_CHUNK_SIZE, ) def read_full_chunks( @@ -464,9 +481,9 @@ class DatabaseMigrator: """Чтение полной таблицы чанками с опциональным лимитом для проверки.""" if read_limit: sql = text(f"SELECT TOP {int(read_limit)} * FROM {self.quote_mssql_identifier(table_name)}") - return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.CHUNK_SIZE) + return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE) - return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.CHUNK_SIZE) + return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE) def write_dataframe_batch( self, @@ -488,6 +505,80 @@ class DatabaseMigrator: method='multi', ) + def read_full_chunks_without_sqlalchemy( + self, + table_name: str, + read_limit: Optional[int] = None, + ): + """Чтение полной таблицы чанками через DBAPI-курсор (без pandas.read_sql_*).""" + src_connection = self.src_engine.raw_connection() + cursor = src_connection.cursor() + + top_clause = f"TOP {int(read_limit)} " if read_limit else "" + sql = f"SELECT {top_clause}* FROM {self.quote_mssql_identifier(table_name)}" + + try: + cursor.execute(sql) + columns = [column[0] for column in cursor.description] + while True: + rows = cursor.fetchmany(self.config.FULL_LOAD_CHUNK_SIZE) + if not rows: + break + yield pd.DataFrame.from_records(rows, columns=columns) + finally: + try: + cursor.close() + finally: + src_connection.close() + + def write_dataframe_batch_without_sqlalchemy( + self, + chunk: pd.DataFrame, + table_name: str, + ): + """Batch-запись DataFrame в PostgreSQL через COPY (без pandas.to_sql).""" + if chunk.empty: + return + if self.config.DRY_RUN: + self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}") + return + + buffer = io.StringIO() + chunk.to_csv( + buffer, + index=False, + header=False, + sep='\t', + na_rep='\\N', + date_format='%Y-%m-%d %H:%M:%S.%f', + quoting=csv.QUOTE_MINIMAL, + escapechar='\\', + ) + buffer.seek(0) + + quoted_columns = ', '.join( + self.quote_identifier(column_name) + for column_name in chunk.columns + ) + copy_sql = ( + f"COPY {self.qualify_table_name(table_name)} ({quoted_columns}) " + "FROM STDIN WITH (FORMAT csv, DELIMITER E'\\t', NULL '\\N', ESCAPE '\\', QUOTE '\"')" + ) + + dst_connection = self.dst_engine.raw_connection() + cursor = dst_connection.cursor() + try: + cursor.copy_expert(copy_sql, buffer) + dst_connection.commit() + except Exception: + dst_connection.rollback() + raise + finally: + try: + cursor.close() + finally: + dst_connection.close() + def prepare_incremental_chunk( self, chunk: pd.DataFrame, @@ -571,6 +662,7 @@ class DatabaseMigrator: chunk: pd.DataFrame, table_name: str, primary_key: List[str], + staging_table: Optional[str] = None, ): """Batch delete в PostgreSQL через staging-таблицу с ключами.""" if chunk.empty: @@ -588,7 +680,8 @@ class DatabaseMigrator: if missing_columns: raise ValueError(f"Для удаления из {table_name} не найдены ключевые поля: {missing_columns}") - staging_table = f"_stg_delete_{table_name}_{int(time.time() * 1000)}" + own_staging = staging_table is None + staging = staging_table or f"_stg_delete_{table_name}_{int(time.time() * 1000)}" key_chunk = chunk[primary_key].drop_duplicates() join_condition = ' AND '.join([ f"target.{self.quote_identifier(column)} = source.{self.quote_identifier(column)}" @@ -596,20 +689,31 @@ class DatabaseMigrator: ]) try: - self.write_dataframe_batch(key_chunk, staging_table, if_exists='replace') + if self.table_exists(staging): + with self.dst_engine.connect() as conn: + conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}')) + conn.commit() + else: + key_chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False) + with self.dst_engine.connect() as conn: + conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED')) + conn.commit() + self.write_dataframe_batch_without_sqlalchemy(key_chunk, staging) sql = f""" DELETE FROM {self.quote_identifier(table_name)} AS target - USING {self.quote_identifier(staging_table)} AS source + USING {self.quote_identifier(staging)} AS source WHERE {join_condition} """ with self.dst_engine.connect() as conn: conn.execute(text(sql)) - conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}')) + if own_staging: + conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) conn.commit() except Exception: - with self.dst_engine.connect() as conn: - conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}')) - conn.commit() + if own_staging: + with self.dst_engine.connect() as conn: + conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) + conn.commit() raise def upsert_dataframe_batch( @@ -617,6 +721,7 @@ class DatabaseMigrator: chunk: pd.DataFrame, table_name: str, primary_key: List[str], + staging_table: Optional[str] = None, ): """Batch upsert через staging-таблицу.""" if self.config.DRY_RUN: @@ -627,8 +732,8 @@ class DatabaseMigrator: self.write_dataframe_batch(chunk, table_name, if_exists='append') return - chunk = self.deduplicate_incremental_chunk(chunk, primary_key) - staging_table = f"_stg_{table_name}_{int(time.time() * 1000)}" + own_staging = staging_table is None + staging = staging_table or f"_stg_{table_name}_{int(time.time() * 1000)}" columns = list(chunk.columns) quoted_columns = ', '.join([self.quote_identifier(column) for column in columns]) conflict_columns = ', '.join([self.quote_identifier(column) for column in primary_key]) @@ -644,21 +749,32 @@ class DatabaseMigrator: conflict_action = "DO NOTHING" try: - self.write_dataframe_batch(chunk, staging_table, if_exists='replace') + if self.table_exists(staging): + with self.dst_engine.connect() as conn: + conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}')) + conn.commit() + else: + chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False) + with self.dst_engine.connect() as conn: + conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED')) + conn.commit() + self.write_dataframe_batch_without_sqlalchemy(chunk, staging) sql = f""" INSERT INTO {self.quote_identifier(table_name)} ({quoted_columns}) SELECT {quoted_columns} - FROM {self.quote_identifier(staging_table)} + FROM {self.quote_identifier(staging)} ON CONFLICT ({conflict_columns}) {conflict_action} """ with self.dst_engine.connect() as conn: conn.execute(text(sql)) - conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}')) + if own_staging: + conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) conn.commit() except Exception: - with self.dst_engine.connect() as conn: - conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}')) - conn.commit() + if own_staging: + with self.dst_engine.connect() as conn: + conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) + conn.commit() raise def create_timescale_hypertable(self, table_config: TableMigrationConfig): @@ -1040,26 +1156,25 @@ class DatabaseMigrator: pk_columns = self.get_mssql_primary_key(table_name) foreign_keys = self.get_mssql_foreign_keys(table_name) - # Очищаем целевую таблицу - self.logger.log_info(f"Очистка целевой таблицы {pg_table}") - self.truncate_table(pg_table) - # Читаем данные self.logger.log_info(f"Чтение данных из {table_name}") chunks = self.read_full_chunks(table_name, read_limit=read_limit) - + # Загружаем данные first_chunk = True total_rows = 0 - + for chunk_num, chunk in enumerate(chunks, 1): if first_chunk: - self.write_dataframe_batch(chunk, pg_table, if_exists='fail') - first_chunk = False + # Дропаем целевую таблицу только после успешного чтения первого чанка, + # чтобы не уничтожить данные при недоступном MSSQL + self.logger.log_info(f"Очистка целевой таблицы {pg_table}") + self.truncate_table(pg_table) + chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False) self.logger.log_info(f"Таблица {pg_table} создана") - else: - self.write_dataframe_batch(chunk, pg_table, if_exists='append') - + first_chunk = False + self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table) + total_rows += len(chunk) if chunk_num % self.config.BATCH_SIZE == 0: self.logger.log_progress(table_name, chunk_num, total_rows) @@ -1068,31 +1183,25 @@ class DatabaseMigrator: if total_rows > 0: self.sync_target_schema(table_name, pg_table) - - if total_rows > 0: self.create_timescale_hypertable(table_config) - - # Создаем первичный ключ - if self.can_create_primary_key(table_config, pk_columns): - self.logger.log_info(f"Создание первичного ключа для {pg_table}") - self.create_pg_primary_key(pg_table, pk_columns) - - # Создаем индексы - if indexes: - self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}") - self.create_pg_indexes(pg_table, indexes) - - # Создаем внешние ключи - if foreign_keys: - self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}") - self.create_pg_foreign_keys(pg_table, foreign_keys) - - # Обновляем статистику - self.logger.log_info(f"Обновление статистики для {pg_table}") - if total_rows > 1000000: - self.vacuum_analyze_table(pg_table) - else: - self.analyze_table(pg_table) + + if self.can_create_primary_key(table_config, pk_columns): + self.logger.log_info(f"Создание первичного ключа для {pg_table}") + self.create_pg_primary_key(pg_table, pk_columns) + + if indexes: + self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}") + self.create_pg_indexes(pg_table, indexes) + + if foreign_keys: + self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}") + self.create_pg_foreign_keys(pg_table, foreign_keys) + + self.logger.log_info(f"Обновление статистики для {pg_table}") + if total_rows > 1000000: + self.vacuum_analyze_table(pg_table) + else: + self.analyze_table(pg_table) self.logger.log_table_success(table_name, total_rows) return True @@ -1103,11 +1212,83 @@ class DatabaseMigrator: self.logger.log_table_failure(table_name, str(e)) return False + def migrate_full_table_without_sqlalchemy( + self, + table_config: TableMigrationConfig, + read_limit: Optional[int] = None, + ) -> bool: + """Полная миграция одной таблицы без SQLAlchemy в этапе чтения/заливки данных.""" + table_name = table_config.source_table + pg_table = table_config.pg_table + self.logger.log_table_start(table_name) + + try: + indexes = self.get_mssql_indexes(table_name) + pk_columns = self.get_mssql_primary_key(table_name) + foreign_keys = self.get_mssql_foreign_keys(table_name) + + self.logger.log_info(f"Чтение данных из {table_name} без SQLAlchemy") + chunks = self.read_full_chunks_without_sqlalchemy(table_name, read_limit=read_limit) + + first_chunk = True + total_rows = 0 + + for chunk_num, chunk in enumerate(chunks, 1): + if first_chunk: + # Дропаем целевую таблицу только после успешного чтения первого чанка, + # чтобы не уничтожить данные при недоступном MSSQL + self.logger.log_info(f"Очистка целевой таблицы {pg_table}") + self.truncate_table(pg_table) + self.write_dataframe_batch(chunk.iloc[0:0], pg_table, if_exists='fail') + self.logger.log_info(f"Таблица {pg_table} создана") + first_chunk = False + + self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table) + + total_rows += len(chunk) + if chunk_num % self.config.BATCH_SIZE == 0: + self.logger.log_progress(table_name, chunk_num, total_rows) + + self.logger.log_info(f"Всего загружено строк: {total_rows}") + + if total_rows > 0: + self.sync_target_schema(table_name, pg_table) + self.create_timescale_hypertable(table_config) + + if self.can_create_primary_key(table_config, pk_columns): + self.logger.log_info(f"Создание первичного ключа для {pg_table}") + self.create_pg_primary_key(pg_table, pk_columns) + + if indexes: + self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}") + self.create_pg_indexes(pg_table, indexes) + + if foreign_keys: + self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}") + self.create_pg_foreign_keys(pg_table, foreign_keys) + + self.logger.log_info(f"Обновление статистики для {pg_table}") + if total_rows > 1000000: + self.vacuum_analyze_table(pg_table) + else: + self.analyze_table(pg_table) + + self.logger.log_table_success(table_name, total_rows) + return True + + except Exception as e: + if self.is_retryable_mssql_error(e): + raise + self.logger.log_table_failure(table_name, str(e)) + return False + def migrate_incremental_table(self, table_config: TableMigrationConfig) -> bool: """Инкрементальная миграция Life_ таблицы по x_DateTime.""" table_name = table_config.source_table pg_table = table_config.pg_table self.logger.log_table_start(f"{table_name} ({table_config.read_table})") + upsert_staging = f"_stg_upsert_{pg_table}" + delete_staging = f"_stg_delete_{pg_table}" try: if not table_config.life_table: @@ -1188,18 +1369,19 @@ class DatabaseMigrator: ) if not delete_chunk.empty: - self.delete_dataframe_batch(delete_chunk, pg_table, table_config.primary_key) + self.delete_dataframe_batch(delete_chunk, pg_table, table_config.primary_key, staging_table=delete_staging) if write_chunk.empty: pass elif first_chunk: - self.write_dataframe_batch(write_chunk, pg_table, if_exists='append') + write_chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False) + self.write_dataframe_batch_without_sqlalchemy(write_chunk, pg_table) self.create_timescale_hypertable(table_config) if self.can_create_primary_key(table_config, table_config.primary_key): self.create_pg_primary_key(pg_table, table_config.primary_key) first_chunk = False elif table_config.primary_key: - self.upsert_dataframe_batch(write_chunk, pg_table, table_config.primary_key) + self.upsert_dataframe_batch(write_chunk, pg_table, table_config.primary_key, staging_table=upsert_staging) else: self.write_dataframe_batch(write_chunk, pg_table, if_exists='append') @@ -1259,7 +1441,16 @@ class DatabaseMigrator: self.logger.log_error(f"Ошибка при сохранении состояния миграции {pg_table}", state_error) self.logger.log_table_failure(table_name, str(e)) return False - + + finally: + try: + with self.dst_engine.connect() as conn: + conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(upsert_staging)}')) + conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(delete_staging)}')) + conn.commit() + except Exception as cleanup_error: + self.logger.log_warning(f"Не удалось очистить staging-таблицы для {pg_table}: {cleanup_error}") + def run_migration( self, table_names: Optional[List[str]] = None,