import os import re import time import csv import io from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import pandas as pd from sqlalchemy import create_engine, inspect, text from sqlalchemy.exc import DBAPIError, OperationalError from sqlalchemy.pool import NullPool from sqlalchemy.sql import sqltypes from .config import Config, TableMigrationConfig from .email_sender import EmailSender from .logging_utils import MigrationLogger from .table_config_repository import TableConfigRepository class DatabaseMigrator: """Основной класс для миграции данных""" def __init__(self, config: Config): self.config = config # Подключение к PostgreSQL и загрузка конфигурации таблиц из БД self.dst_engine = create_engine(config.POSTGRES_CONNECTION_STRING) self.table_config_repository = TableConfigRepository(config, self.dst_engine) self.table_configs = self.table_config_repository.load_configs(seed_defaults=True) self.config.TABLE_MIGRATIONS = self.table_configs self.config.TABLES_TO_COPY = [table.source_table for table in self.table_configs] self.logger = MigrationLogger(config) self.email_sender = EmailSender(config) self._source_columns_cache: Dict[str, List[Dict[str, Any]]] = {} self._mssql_indexes_cache: Dict[str, List[Dict[str, Any]]] = {} self._mssql_pk_cache: Dict[str, Optional[List[str]]] = {} self._mssql_fk_cache: Dict[str, Optional[List[Dict[str, str]]]] = {} # Подключение к БД self.logger.log_info( f"Подключение к БД МИС (MSSQL), charset={self.config.MSSQL_CHARSET}" ) self.src_engine = self.create_mssql_engine() self.logger.log_info("Подключение к PostgreSQL") def create_mssql_engine(self): """Создание SQLAlchemy engine для MSSQL с явной кодировкой pymssql.""" return create_engine( self.config.MSSQL_CONNECTION_STRING, connect_args={ 'charset': self.config.MSSQL_CHARSET, 'login_timeout': self.config.MSSQL_CONNECT_TIMEOUT, }, poolclass=NullPool, ) def reconnect_mssql_engine(self): """Пересоздание MSSQL engine после сетевого/драйверного сбоя.""" try: self.src_engine.dispose() except Exception: pass self._source_columns_cache.clear() self._mssql_indexes_cache.clear() self._mssql_pk_cache.clear() self._mssql_fk_cache.clear() self.src_engine = self.create_mssql_engine() def is_retryable_mssql_error(self, exception: Exception) -> bool: """Определение временной ошибки MSSQL/pymssql, при которой есть смысл повторить таблицу.""" message = str(exception).lower() retry_markers = ( 'dbprocess is dead', 'unexpected eof', 'adaptive server connection failed', 'server closed the connection unexpectedly', 'connection reset', 'connection refused', 'communication link failure', 'lost connection', 'closed connection', 'not enabled', '08s01', '20002', '20017', ) if any(marker in message for marker in retry_markers): return True return isinstance(exception, (OperationalError, DBAPIError)) def migrate_table_once(self, table_config: TableMigrationConfig, force_full: bool = False) -> bool: """Один проход миграции таблицы без retry-обертки.""" if force_full: self.logger.log_info(f"Force full reload для таблицы {table_config.source_table}") is_initial_force_full = ( table_config.mode == 'incremental' and table_config.initial_load_mode == 'full_then_incremental' and not self.table_exists(table_config.pg_table) and self.get_last_watermark(table_config.pg_table).get('last_x_datetime') is None ) if is_initial_force_full: self.logger.log_info( f"Первый force_full для {table_config.source_table}: " "загрузка выполняется без SQLAlchemy" ) success = self.migrate_full_table_without_sqlalchemy(table_config) else: success = self.migrate_full_table(table_config) if success and table_config.mode == 'incremental' and table_config.life_table: upper_bound = self.get_incremental_upper_bound(table_config) self.save_watermark( table_config.pg_table, upper_bound['last_x_datetime'], upper_bound['last_sequence_value'], 0, 'success', ) self.logger.log_info( f"После force full сохранен watermark для {table_config.pg_table}: " f"{upper_bound}" ) return success if table_config.mode == 'incremental': return self.migrate_incremental_table(table_config) return self.migrate_full_table(table_config) @staticmethod def quote_identifier(identifier: str) -> str: """Экранирование SQL identifier для PostgreSQL/MSSQL.""" return '"' + identifier.replace('"', '""') + '"' @staticmethod def quote_mssql_identifier(identifier: str) -> str: """Экранирование SQL Server identifier.""" return '[' + identifier.replace(']', ']]') + ']' def qualify_table_name(self, table_name: str, schema: Optional[str] = None) -> str: """Полное имя таблицы PostgreSQL с указанием схемы.""" if schema: return f'{self.quote_identifier(schema)}.{self.quote_identifier(table_name)}' return self.quote_identifier(table_name) def compile_pg_type(self, source_type) -> str: """Преобразование SQLAlchemy/MSSQL типа в тип PostgreSQL.""" type_name = source_type.__class__.__name__.lower() if 'uniqueidentifier' in type_name or 'uuid' in type_name: return 'uuid' if 'bit' in type_name or isinstance(source_type, sqltypes.Boolean): return 'boolean' if isinstance(source_type, sqltypes.BigInteger): return 'bigint' if isinstance(source_type, sqltypes.SmallInteger): return 'smallint' if isinstance(source_type, sqltypes.Integer): return 'integer' if isinstance(source_type, sqltypes.Numeric): precision = getattr(source_type, 'precision', None) scale = getattr(source_type, 'scale', None) if precision is not None and scale is not None: return f'numeric({precision}, {scale})' if precision is not None: return f'numeric({precision})' return 'numeric' if isinstance(source_type, (sqltypes.Float, sqltypes.REAL)): return 'double precision' if isinstance(source_type, sqltypes.DateTime): return 'timestamp' if isinstance(source_type, sqltypes.Date): return 'date' if isinstance(source_type, sqltypes.Time): return 'time' if isinstance(source_type, sqltypes.CHAR): length = getattr(source_type, 'length', None) return f'char({length})' if length else 'char' if isinstance(source_type, sqltypes.String): length = getattr(source_type, 'length', None) return f'varchar({length})' if length else 'text' if isinstance(source_type, (sqltypes.Text, sqltypes.UnicodeText)): return 'text' if isinstance(source_type, sqltypes.LargeBinary): return 'bytea' return 'text' def get_source_table_columns(self, table_name: str) -> List[Dict[str, Any]]: """Получение колонок исходной MSSQL таблицы.""" cache_key = table_name.lower() if cache_key not in self._source_columns_cache: self._source_columns_cache[cache_key] = inspect(self.src_engine).get_columns(table_name) return self._source_columns_cache[cache_key] def get_target_table_columns(self, table_name: str) -> List[Dict[str, Any]]: """Получение колонок целевой PostgreSQL таблицы.""" return inspect(self.dst_engine).get_columns(table_name) def sync_target_schema(self, source_table: str, target_table: str): """Добавление новых колонок из MSSQL в PostgreSQL как nullable.""" if not self.table_exists(target_table): return source_columns = self.get_source_table_columns(source_table) target_columns = { column['name'].lower(): column for column in self.get_target_table_columns(target_table) } for source_column in source_columns: column_name = source_column['name'] lower_name = column_name.lower() if lower_name in target_columns: target_type = str(target_columns[lower_name]['type']).lower() source_type = self.compile_pg_type(source_column['type']).lower() if target_type != source_type: self.logger.log_warning( f"Тип колонки {target_table}.{column_name} отличается: " f"source={source_type}, target={target_type}. Автоизменение типа не выполняется" ) continue pg_type = self.compile_pg_type(source_column['type']) self.logger.log_info( f"Обнаружена новая колонка {source_table}.{column_name}; " f"добавление в {target_table} как nullable ({pg_type})" ) if self.config.DRY_RUN: self.logger.log_info( f"DRY RUN: пропущено добавление колонки {target_table}.{column_name}" ) continue sql = text( f'ALTER TABLE {self.qualify_table_name(target_table)} ' f'ADD COLUMN IF NOT EXISTS {self.quote_identifier(column_name)} {pg_type} NULL' ) with self.dst_engine.connect() as conn: conn.execute(sql) conn.commit() def get_table_config(self, table_name: str) -> TableMigrationConfig: """Получение настроек таблицы по имени источника или цели.""" table_config = self.table_config_repository.get_config(table_name) return table_config or TableMigrationConfig(source_table=table_name) def table_exists(self, table_name: str) -> bool: """Проверка существования таблицы в целевой БД.""" return self.table_exists_in_schema(table_name, None) def table_exists_in_schema(self, table_name: str, schema: Optional[str]) -> bool: """Проверка существования таблицы в указанной схеме PostgreSQL.""" query = text(""" SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = COALESCE(:schema, current_schema()) AND table_name = :table_name ) """) with self.dst_engine.connect() as conn: return bool(conn.execute(query, {'schema': schema, 'table_name': table_name}).scalar()) def ensure_state_table(self): """Создание таблицы состояния инкрементальной миграции.""" qualified_state_table = self.qualify_table_name( self.config.STATE_TABLE, self.config.REPLICATOR_SCHEMA, ) sql = f""" CREATE TABLE IF NOT EXISTS {qualified_state_table} ( table_name text PRIMARY KEY, last_x_datetime timestamp NULL, last_sequence_value bigint NULL, last_run_at timestamp NOT NULL DEFAULT now(), rows_copied bigint NOT NULL DEFAULT 0, status text NOT NULL DEFAULT 'pending', error text NULL ) """ with self.dst_engine.connect() as conn: conn.execute(text( f'CREATE SCHEMA IF NOT EXISTS {self.quote_identifier(self.config.REPLICATOR_SCHEMA)}' )) conn.execute(text(sql)) conn.execute(text(f""" ALTER TABLE {qualified_state_table} ADD COLUMN IF NOT EXISTS last_sequence_value bigint NULL """)) conn.execute(text(f""" DELETE FROM {qualified_state_table} AS duplicate_rows USING {qualified_state_table} AS preserved_rows WHERE duplicate_rows.table_name = preserved_rows.table_name AND duplicate_rows.ctid < preserved_rows.ctid """)) conn.execute(text(f""" CREATE UNIQUE INDEX IF NOT EXISTS idx_{self.config.STATE_TABLE}_table_name ON {qualified_state_table} (table_name) """)) conn.commit() def get_last_watermark(self, table_name: str) -> Dict[str, Any]: """Чтение последнего успешно обработанного x_DateTime.""" empty_watermark = {'last_x_datetime': None, 'last_sequence_value': None} if self.config.DRY_RUN and not self.table_exists_in_schema( self.config.STATE_TABLE, self.config.REPLICATOR_SCHEMA, ): return empty_watermark self.ensure_state_table() qualified_state_table = self.qualify_table_name( self.config.STATE_TABLE, self.config.REPLICATOR_SCHEMA, ) sql = text(f""" SELECT last_x_datetime, last_sequence_value FROM {qualified_state_table} WHERE table_name = :table_name """) with self.dst_engine.connect() as conn: row = conn.execute(sql, {'table_name': table_name}).mappings().first() if not row: return empty_watermark return { 'last_x_datetime': row['last_x_datetime'], 'last_sequence_value': row['last_sequence_value'], } def save_watermark( self, table_name: str, last_x_datetime: Optional[datetime], last_sequence_value: Optional[int], rows_copied: int, status: str, error: Optional[str] = None, ): """Сохранение состояния инкрементальной миграции.""" if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: состояние миграции {table_name} не обновляется") return self.ensure_state_table() qualified_state_table = self.qualify_table_name( self.config.STATE_TABLE, self.config.REPLICATOR_SCHEMA, ) sql = text(f""" INSERT INTO {qualified_state_table} (table_name, last_x_datetime, last_sequence_value, last_run_at, rows_copied, status, error) VALUES (:table_name, :last_x_datetime, :last_sequence_value, now(), :rows_copied, :status, :error) ON CONFLICT (table_name) DO UPDATE SET last_x_datetime = EXCLUDED.last_x_datetime, last_sequence_value = EXCLUDED.last_sequence_value, last_run_at = EXCLUDED.last_run_at, rows_copied = EXCLUDED.rows_copied, status = EXCLUDED.status, error = EXCLUDED.error """) with self.dst_engine.connect() as conn: conn.execute(sql, { 'table_name': table_name, 'last_x_datetime': last_x_datetime, 'last_sequence_value': last_sequence_value, 'rows_copied': rows_copied, 'status': status, 'error': error, }) conn.commit() def get_incremental_upper_bound(self, table_config: TableMigrationConfig) -> Dict[str, Any]: """Фиксация верхней границы Life_ на начало запуска.""" select_columns = [ f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_datetime", ] if table_config.sequence_column: select_columns.append( f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_sequence_value" ) else: select_columns.append("CAST(NULL AS bigint) AS max_sequence_value") order_columns = ', '.join([ f"{self.quote_mssql_identifier(column)} DESC" for column in table_config.incremental_order_columns ]) sql = text(f""" SELECT TOP 1 {', '.join(select_columns)} FROM {self.quote_mssql_identifier(table_config.read_table)} ORDER BY {order_columns} """) with self.src_engine.connect() as conn: row = conn.execute(sql).mappings().first() if not row: return {'last_x_datetime': None, 'last_sequence_value': None} return { 'last_x_datetime': row['max_datetime'], 'last_sequence_value': row['max_sequence_value'], } def read_incremental_chunks( self, table_config: TableMigrationConfig, last_watermark: Dict[str, Any], upper_bound: Dict[str, Any], read_limit: Optional[int] = None, ): """Чтение Life_ таблицы по x_DateTime чанками.""" where_parts = [ f"{self.quote_mssql_identifier(table_config.datetime_column)} <= :upper_bound_datetime", ] params = {'upper_bound_datetime': upper_bound['last_x_datetime']} if table_config.sequence_column and upper_bound.get('last_sequence_value') is not None: where_parts.append(f""" ( {self.quote_mssql_identifier(table_config.datetime_column)} < :upper_bound_datetime OR ( {self.quote_mssql_identifier(table_config.datetime_column)} = :upper_bound_datetime AND {self.quote_mssql_identifier(table_config.sequence_column)} <= :upper_bound_sequence ) ) """) params['upper_bound_sequence'] = upper_bound['last_sequence_value'] if last_watermark.get('last_x_datetime') is not None: if table_config.sequence_column and last_watermark.get('last_sequence_value') is not None: where_parts.append(f""" ( {self.quote_mssql_identifier(table_config.datetime_column)} > :last_watermark_datetime OR ( {self.quote_mssql_identifier(table_config.datetime_column)} = :last_watermark_datetime AND {self.quote_mssql_identifier(table_config.sequence_column)} > :last_watermark_sequence ) ) """) params['last_watermark_sequence'] = last_watermark['last_sequence_value'] else: where_parts.append( f"{self.quote_mssql_identifier(table_config.datetime_column)} > :last_watermark_datetime" ) params['last_watermark_datetime'] = last_watermark['last_x_datetime'] order_columns = ', '.join([ self.quote_mssql_identifier(column) for column in table_config.incremental_order_columns ]) top_clause = f"TOP {int(read_limit)} " if read_limit else "" sql = text(f""" SELECT {top_clause}* FROM {self.quote_mssql_identifier(table_config.read_table)} WHERE {' AND '.join(where_parts)} ORDER BY {order_columns} """) return pd.read_sql_query( sql, self.src_engine, params=params, chunksize=self.config.INCREMENTAL_CHUNK_SIZE, ) def read_full_chunks( self, table_name: str, read_limit: Optional[int] = None, ): """Чтение полной таблицы чанками с опциональным лимитом для проверки.""" if read_limit: sql = text(f"SELECT TOP {int(read_limit)} * FROM {self.quote_mssql_identifier(table_name)}") return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE) return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE) def write_dataframe_batch( self, chunk: pd.DataFrame, table_name: str, if_exists: str = 'append', ): """Batch-запись DataFrame в PostgreSQL.""" if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}") return chunk.to_sql( table_name, self.dst_engine, if_exists=if_exists, index=False, chunksize=self.config.WRITE_CHUNK_SIZE, method='multi', ) def read_full_chunks_without_sqlalchemy( self, table_name: str, read_limit: Optional[int] = None, ): """Чтение полной таблицы чанками через DBAPI-курсор (без pandas.read_sql_*).""" src_connection = self.src_engine.raw_connection() cursor = src_connection.cursor() top_clause = f"TOP {int(read_limit)} " if read_limit else "" sql = f"SELECT {top_clause}* FROM {self.quote_mssql_identifier(table_name)}" try: cursor.execute(sql) columns = [column[0] for column in cursor.description] while True: rows = cursor.fetchmany(self.config.FULL_LOAD_CHUNK_SIZE) if not rows: break yield pd.DataFrame.from_records(rows, columns=columns) finally: try: cursor.close() finally: src_connection.close() def write_dataframe_batch_without_sqlalchemy( self, chunk: pd.DataFrame, table_name: str, ): """Batch-запись DataFrame в PostgreSQL через COPY (без pandas.to_sql).""" if chunk.empty: return if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}") return buffer = io.StringIO() chunk.to_csv( buffer, index=False, header=False, sep='\t', na_rep='\\N', date_format='%Y-%m-%d %H:%M:%S.%f', quoting=csv.QUOTE_MINIMAL, escapechar='\\', ) buffer.seek(0) quoted_columns = ', '.join( self.quote_identifier(column_name) for column_name in chunk.columns ) copy_sql = ( f"COPY {self.qualify_table_name(table_name)} ({quoted_columns}) " "FROM STDIN WITH (FORMAT csv, DELIMITER E'\\t', NULL '\\N', ESCAPE '\\', QUOTE '\"')" ) dst_connection = self.dst_engine.raw_connection() cursor = dst_connection.cursor() try: cursor.copy_expert(copy_sql, buffer) dst_connection.commit() except Exception: dst_connection.rollback() raise finally: try: cursor.close() finally: dst_connection.close() def prepare_incremental_chunk( self, chunk: pd.DataFrame, table_config: TableMigrationConfig, ) -> pd.DataFrame: """Удаление служебных Life_ полей перед записью в целевую таблицу.""" exclude_columns = [ column for column in table_config.exclude_columns if column in chunk.columns ] if not exclude_columns: return chunk return chunk.drop(columns=exclude_columns) def split_incremental_chunk( self, chunk: pd.DataFrame, table_config: TableMigrationConfig, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Разделение Life_ чанка на строки для upsert и delete.""" if not table_config.operation_column: return chunk, chunk.iloc[0:0].copy() if table_config.operation_column not in chunk.columns: raise ValueError( f"В таблице {table_config.read_table} не найдено поле {table_config.operation_column}" ) operations = chunk[table_config.operation_column].astype(str).str.lower().str.strip() upsert_operations = {operation.lower() for operation in table_config.upsert_operations} delete_operations = {operation.lower() for operation in table_config.delete_operations} upsert_chunk = chunk[operations.isin(upsert_operations)] delete_chunk = chunk[operations.isin(delete_operations)] return upsert_chunk, delete_chunk def deduplicate_incremental_chunk( self, chunk: pd.DataFrame, primary_key: List[str], ) -> pd.DataFrame: """Схлопывание повторных событий по ключу внутри одного чанка с сохранением последней версии.""" if chunk.empty or not primary_key: return chunk missing_columns = [column for column in primary_key if column not in chunk.columns] if missing_columns: raise ValueError( f"Для дедупликации не найдены ключевые поля: {missing_columns}" ) before_count = len(chunk) deduplicated_chunk = chunk.drop_duplicates(subset=primary_key, keep='last') dropped_rows = before_count - len(deduplicated_chunk) if dropped_rows > 0: self.logger.log_info( f"Схлопнуто {dropped_rows} повторных событий внутри чанка по ключу {primary_key}" ) return deduplicated_chunk def get_effective_delete_count( self, chunk: pd.DataFrame, primary_key: List[str], ) -> int: """Количество фактических delete-операций после схлопывания дублей по ключу.""" if chunk.empty: return 0 if not primary_key: return len(chunk) missing_columns = [column for column in primary_key if column not in chunk.columns] if missing_columns: raise ValueError( f"Для расчета delete count не найдены ключевые поля: {missing_columns}" ) return len(chunk[primary_key].drop_duplicates()) def delete_dataframe_batch( self, chunk: pd.DataFrame, table_name: str, primary_key: List[str], staging_table: Optional[str] = None, ): """Batch delete в PostgreSQL через staging-таблицу с ключами.""" if chunk.empty: return if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущено удаление {len(chunk)} строк из {table_name}") return if not self.table_exists(table_name): self.logger.log_warning(f"Удаление пропущено: таблица {table_name} еще не существует") return if not primary_key: raise ValueError(f"Для удаления из {table_name} не задан primary_key") missing_columns = [column for column in primary_key if column not in chunk.columns] if missing_columns: raise ValueError(f"Для удаления из {table_name} не найдены ключевые поля: {missing_columns}") own_staging = staging_table is None staging = staging_table or f"_stg_delete_{table_name}_{int(time.time() * 1000)}" key_chunk = chunk[primary_key].drop_duplicates() join_condition = ' AND '.join([ f"target.{self.quote_identifier(column)} = source.{self.quote_identifier(column)}" for column in primary_key ]) try: if self.table_exists(staging): with self.dst_engine.connect() as conn: conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}')) conn.commit() else: key_chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False) with self.dst_engine.connect() as conn: conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED')) conn.commit() self.write_dataframe_batch_without_sqlalchemy(key_chunk, staging) sql = f""" DELETE FROM {self.quote_identifier(table_name)} AS target USING {self.quote_identifier(staging)} AS source WHERE {join_condition} """ with self.dst_engine.connect() as conn: conn.execute(text(sql)) if own_staging: conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) conn.commit() except Exception: if own_staging: with self.dst_engine.connect() as conn: conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) conn.commit() raise def upsert_dataframe_batch( self, chunk: pd.DataFrame, table_name: str, primary_key: List[str], staging_table: Optional[str] = None, ): """Batch upsert через staging-таблицу.""" if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущен upsert {len(chunk)} строк в {table_name}") return if not primary_key: self.write_dataframe_batch(chunk, table_name, if_exists='append') return own_staging = staging_table is None staging = staging_table or f"_stg_{table_name}_{int(time.time() * 1000)}" columns = list(chunk.columns) quoted_columns = ', '.join([self.quote_identifier(column) for column in columns]) conflict_columns = ', '.join([self.quote_identifier(column) for column in primary_key]) update_columns = [column for column in columns if column not in primary_key] if update_columns: update_set = ', '.join([ f"{self.quote_identifier(column)} = EXCLUDED.{self.quote_identifier(column)}" for column in update_columns ]) conflict_action = f"DO UPDATE SET {update_set}" else: conflict_action = "DO NOTHING" try: if self.table_exists(staging): with self.dst_engine.connect() as conn: conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}')) conn.commit() else: chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False) with self.dst_engine.connect() as conn: conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED')) conn.commit() self.write_dataframe_batch_without_sqlalchemy(chunk, staging) sql = f""" INSERT INTO {self.quote_identifier(table_name)} ({quoted_columns}) SELECT {quoted_columns} FROM {self.quote_identifier(staging)} ON CONFLICT ({conflict_columns}) {conflict_action} """ with self.dst_engine.connect() as conn: conn.execute(text(sql)) if own_staging: conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) conn.commit() except Exception: if own_staging: with self.dst_engine.connect() as conn: conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) conn.commit() raise def create_timescale_hypertable(self, table_config: TableMigrationConfig): """Преобразование PostgreSQL таблицы в TimescaleDB hypertable.""" if not self.config.ENABLE_TIMESCALE or not table_config.timescale: return if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущено создание TimescaleDB hypertable для {table_config.pg_table}") return time_column = table_config.timescale_time_column or table_config.datetime_column table_name = table_config.pg_table try: with self.dst_engine.connect() as conn: conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb")) conn.execute(text(""" SELECT create_hypertable( :table_name, :time_column, if_not_exists => TRUE, migrate_data => TRUE ) """), { 'table_name': table_name, 'time_column': time_column, }) conn.commit() self.logger.log_info(f"Таблица {table_name} преобразована в TimescaleDB hypertable") except Exception as e: self.logger.log_error(f"Ошибка при создании TimescaleDB hypertable для {table_name}", e) def can_create_primary_key( self, table_config: TableMigrationConfig, pk_columns: List[str], ) -> bool: """Проверка совместимости primary key с TimescaleDB.""" if not pk_columns: return False if not self.config.ENABLE_TIMESCALE or not table_config.timescale: return True time_column = table_config.timescale_time_column or table_config.datetime_column if time_column in pk_columns: return True self.logger.log_warning( f"Primary key для {table_config.pg_table} пропущен: TimescaleDB hypertable " f"требует включить time column {time_column} в уникальные ограничения" ) return False def get_mssql_indexes(self, table_name: str) -> List[Dict[str, Any]]: """Получение индексов из MSSQL таблицы""" cache_key = table_name.lower() if cache_key in self._mssql_indexes_cache: return self._mssql_indexes_cache[cache_key] indexes = [] query = f""" SELECT i.name as index_name, i.is_unique, i.type_desc as index_type, c.name as column_name, ic.key_ordinal as column_position, ic.is_descending_key FROM sys.indexes i INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id INNER JOIN sys.tables t ON i.object_id = t.object_id WHERE t.name = '{table_name}' AND i.type_desc NOT IN ('HEAP', 'CLUSTERED') AND i.is_primary_key = 0 ORDER BY i.name, ic.key_ordinal """ try: index_df = pd.read_sql_query(query, self.src_engine) if not index_df.empty: for index_name in index_df['index_name'].unique(): index_data = index_df[index_df['index_name'] == index_name] unique = bool(index_data.iloc[0]['is_unique']) columns = [] for _, row in index_data.sort_values('column_position').iterrows(): column_name = row['column_name'] column_name = column_name.replace('[', '').replace(']', '') columns.append(column_name) indexes.append({ 'name': index_name, 'unique': unique, 'columns': columns }) self.logger.log_info(f"Найдено {len(indexes)} индексов для таблицы {table_name}") except Exception as e: self.logger.log_error(f"Ошибка при получении индексов для таблицы {table_name}", e) self._mssql_indexes_cache[cache_key] = indexes return indexes def create_pg_indexes(self, table_name: str, indexes: List[Dict[str, Any]]): """Создание индексов в PostgreSQL""" if not indexes: return if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущено создание {len(indexes)} индексов для {table_name}") return for index_info in indexes: index_name = f"idx_{table_name}_{index_info['name'].lower()}" index_name = re.sub(r'[^a-z0-9_]', '_', index_name) columns = ', '.join([f'"{col}"' for col in index_info['columns']]) unique_str = 'UNIQUE ' if index_info['unique'] else '' sql = f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ({columns})' try: with self.dst_engine.connect() as conn: conn.execute(text(sql)) conn.commit() self.logger.log_info(f"Создан индекс: {index_name} на столбцах: {columns}") except Exception as e: self.logger.log_error(f"Ошибка при создании индекса {index_name}", e) def get_mssql_primary_key(self, table_name: str) -> Optional[List[str]]: """Получение информации о первичном ключе из MSSQL""" cache_key = table_name.lower() if cache_key in self._mssql_pk_cache: return self._mssql_pk_cache[cache_key] query = f""" SELECT c.name as column_name FROM sys.indexes i INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id INNER JOIN sys.tables t ON i.object_id = t.object_id WHERE t.name = '{table_name}' AND i.is_primary_key = 1 ORDER BY ic.key_ordinal """ try: pk_df = pd.read_sql_query(query, self.src_engine) if not pk_df.empty: pk_columns = [row['column_name'] for _, row in pk_df.iterrows()] self.logger.log_info(f"Найден первичный ключ для таблицы {table_name}: {pk_columns}") self._mssql_pk_cache[cache_key] = pk_columns return pk_columns except Exception as e: self.logger.log_error(f"Ошибка при получении первичного ключа для таблицы {table_name}", e) self._mssql_pk_cache[cache_key] = None return None def create_pg_primary_key(self, table_name: str, pk_columns: List[str]): """Создание первичного ключа в PostgreSQL""" if not pk_columns: return if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущено создание первичного ключа для {table_name}") return columns = ', '.join([f'"{col}"' for col in pk_columns]) pk_name = f"pk_{table_name}" sql = f'ALTER TABLE "{table_name}" ADD CONSTRAINT "{pk_name}" PRIMARY KEY ({columns})' try: with self.dst_engine.connect() as conn: conn.execute(text(sql)) conn.commit() self.logger.log_info(f"Создан первичный ключ: {pk_name} на столбцах: {columns}") except Exception as e: self.logger.log_error(f"Ошибка при создании первичного ключа {pk_name}", e) def has_pg_primary_key(self, table_name: str) -> bool: """Проверка наличия primary key в PostgreSQL.""" sql = text(""" SELECT EXISTS ( SELECT 1 FROM pg_constraint c JOIN pg_class t ON t.oid = c.conrelid JOIN pg_namespace n ON n.oid = t.relnamespace WHERE c.contype = 'p' AND n.nspname = current_schema() AND t.relname = :table_name ) """) with self.dst_engine.connect() as conn: return bool(conn.execute(sql, {'table_name': table_name}).scalar()) def ensure_pg_primary_key(self, table_name: str, pk_columns: List[str]): """Создание primary key, если его еще нет.""" if not pk_columns: return if self.has_pg_primary_key(table_name): return self.create_pg_primary_key(table_name, pk_columns) def get_mssql_foreign_keys(self, table_name: str) -> Optional[List[Dict[str, str]]]: """Получение информации о внешних ключах из MSSQL""" cache_key = table_name.lower() if cache_key in self._mssql_fk_cache: return self._mssql_fk_cache[cache_key] query = f""" SELECT fk.name as fk_name, pc.name as parent_column, rc.name as referenced_column, rt.name as referenced_table FROM sys.foreign_keys fk INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id INNER JOIN sys.columns pc ON fkc.parent_object_id = pc.object_id AND fkc.parent_column_id = pc.column_id INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id INNER JOIN sys.tables pt ON fkc.parent_object_id = pt.object_id WHERE pt.name = '{table_name}' ORDER BY fk.name, fkc.constraint_column_id """ try: fk_df = pd.read_sql_query(query, self.src_engine) if not fk_df.empty: fks = [] for fk_name in fk_df['fk_name'].unique(): fk_data = fk_df[fk_df['fk_name'] == fk_name] fks.append({ 'name': fk_name, 'parent_column': fk_data.iloc[0]['parent_column'], 'referenced_table': fk_data.iloc[0]['referenced_table'].lower(), 'referenced_column': fk_data.iloc[0]['referenced_column'] }) self.logger.log_info(f"Найдено {len(fks)} внешних ключей для таблицы {table_name}") self._mssql_fk_cache[cache_key] = fks return fks except Exception as e: self.logger.log_error(f"Ошибка при получении внешних ключей для таблицы {table_name}", e) self._mssql_fk_cache[cache_key] = None return None def create_pg_foreign_keys(self, table_name: str, foreign_keys: List[Dict[str, str]]): """Создание внешних ключей в PostgreSQL""" if not foreign_keys: return if not self.config.CREATE_FOREIGN_KEYS: self.logger.log_info(f"Создание внешних ключей отключено конфигом для {table_name}") return if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущено создание {len(foreign_keys)} внешних ключей для {table_name}") return for fk_info in foreign_keys: if not self.table_exists(fk_info['referenced_table']): self.logger.log_warning( f"Внешний ключ для {table_name}.{fk_info['parent_column']} пропущен: " f"таблица {fk_info['referenced_table']} отсутствует в PostgreSQL" ) continue fk_name = f"fk_{table_name}_{fk_info['parent_column']}" sql = f""" ALTER TABLE "{table_name}" ADD CONSTRAINT "{fk_name}" FOREIGN KEY ("{fk_info['parent_column']}") REFERENCES "{fk_info['referenced_table']}" ("{fk_info['referenced_column']}") """ try: with self.dst_engine.connect() as conn: conn.execute(text(sql)) conn.commit() self.logger.log_info(f"Создан внешний ключ: {fk_name}") except Exception as e: self.logger.log_error(f"Ошибка при создании внешнего ключа {fk_name}", e) def analyze_table(self, table_name: str): """Выполнение ANALYZE для таблицы""" if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущен ANALYZE для {table_name}") return try: with self.dst_engine.connect() as conn: sql = f'ANALYZE "{table_name}"' conn.execute(text(sql)) conn.commit() self.logger.log_info(f"Выполнен ANALYZE для таблицы {table_name}") except Exception as e: self.logger.log_error(f"Ошибка при выполнении ANALYZE для таблицы {table_name}", e) def vacuum_analyze_table(self, table_name: str): """Выполнение VACUUM ANALYZE для таблицы""" if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущен VACUUM ANALYZE для {table_name}") return try: with self.dst_engine.connect() as conn: sql = f'VACUUM ANALYZE "{table_name}"' conn.execute(text(sql)) conn.commit() self.logger.log_info(f"Выполнен VACUUM ANALYZE для таблицы {table_name}") except Exception as e: self.logger.log_error(f"Ошибка при выполнении VACUUM ANALYZE для таблицы {table_name}", e) def update_table_statistics(self): """Обновление статистики для всей базы данных""" if self.config.DRY_RUN: self.logger.log_info("DRY RUN: пропущен ANALYZE для всей базы данных") return try: with self.dst_engine.connect() as conn: conn.execute(text('ANALYZE')) conn.commit() self.logger.log_info("Выполнен ANALYZE для всей базы данных") except Exception as e: self.logger.log_error("Ошибка при выполнении ANALYZE для всей базы данных", e) def truncate_table(self, table_name: str): """Очистка таблицы""" if self.config.DRY_RUN: self.logger.log_info(f"DRY RUN: пропущено удаление таблицы {table_name}") return table_exists = self.table_exists(table_name) if table_exists: try: query = text(f'DROP TABLE IF EXISTS {self.quote_identifier(table_name)} CASCADE') with self.dst_engine.connect() as connection: connection.execute(query) connection.commit() self.logger.log_info(f"Таблица {table_name} удалена") except Exception as e: self.logger.log_error(f"Ошибка при удалении таблицы {table_name}", e) else: self.logger.log_info(f"Таблица {table_name} не существует в целевой БД") def migrate_table(self, table: Any, force_full: bool = False) -> bool: """Миграция одной таблицы""" table_config = table if isinstance(table, TableMigrationConfig) else self.get_table_config(table) max_attempts = max(1, self.config.MSSQL_TABLE_RETRIES + 1) for attempt in range(1, max_attempts + 1): try: return self.migrate_table_once(table_config, force_full=force_full) except Exception as e: if not self.is_retryable_mssql_error(e) or attempt >= max_attempts: raise self.logger.log_warning( f"Временная ошибка MSSQL при обработке {table_config.source_table}, " f"попытка {attempt} из {max_attempts}: {e}" ) self.reconnect_mssql_engine() time.sleep(min(attempt * 5, 15)) return False def migrate_full_table( self, table_config: TableMigrationConfig, read_limit: Optional[int] = None, ) -> bool: """Полная миграция одной таблицы с пересозданием целевой таблицы.""" table_name = table_config.source_table pg_table = table_config.pg_table self.logger.log_table_start(table_name) try: # Получаем метаданные indexes = self.get_mssql_indexes(table_name) pk_columns = self.get_mssql_primary_key(table_name) foreign_keys = self.get_mssql_foreign_keys(table_name) # Читаем данные self.logger.log_info(f"Чтение данных из {table_name}") chunks = self.read_full_chunks(table_name, read_limit=read_limit) # Загружаем данные first_chunk = True total_rows = 0 for chunk_num, chunk in enumerate(chunks, 1): if first_chunk: # Дропаем целевую таблицу только после успешного чтения первого чанка, # чтобы не уничтожить данные при недоступном MSSQL self.logger.log_info(f"Очистка целевой таблицы {pg_table}") self.truncate_table(pg_table) chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False) self.logger.log_info(f"Таблица {pg_table} создана") first_chunk = False self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table) total_rows += len(chunk) if chunk_num % self.config.BATCH_SIZE == 0: self.logger.log_progress(table_name, chunk_num, total_rows) self.logger.log_info(f"Всего загружено строк: {total_rows}") if total_rows > 0: self.sync_target_schema(table_name, pg_table) self.create_timescale_hypertable(table_config) if self.can_create_primary_key(table_config, pk_columns): self.logger.log_info(f"Создание первичного ключа для {pg_table}") self.create_pg_primary_key(pg_table, pk_columns) if indexes: self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}") self.create_pg_indexes(pg_table, indexes) if foreign_keys: self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}") self.create_pg_foreign_keys(pg_table, foreign_keys) self.logger.log_info(f"Обновление статистики для {pg_table}") if total_rows > 1000000: self.vacuum_analyze_table(pg_table) else: self.analyze_table(pg_table) self.logger.log_table_success(table_name, total_rows) return True except Exception as e: if self.is_retryable_mssql_error(e): raise self.logger.log_table_failure(table_name, str(e)) return False def migrate_full_table_without_sqlalchemy( self, table_config: TableMigrationConfig, read_limit: Optional[int] = None, ) -> bool: """Полная миграция одной таблицы без SQLAlchemy в этапе чтения/заливки данных.""" table_name = table_config.source_table pg_table = table_config.pg_table self.logger.log_table_start(table_name) try: indexes = self.get_mssql_indexes(table_name) pk_columns = self.get_mssql_primary_key(table_name) foreign_keys = self.get_mssql_foreign_keys(table_name) self.logger.log_info(f"Чтение данных из {table_name} без SQLAlchemy") chunks = self.read_full_chunks_without_sqlalchemy(table_name, read_limit=read_limit) first_chunk = True total_rows = 0 for chunk_num, chunk in enumerate(chunks, 1): if first_chunk: # Дропаем целевую таблицу только после успешного чтения первого чанка, # чтобы не уничтожить данные при недоступном MSSQL self.logger.log_info(f"Очистка целевой таблицы {pg_table}") self.truncate_table(pg_table) self.write_dataframe_batch(chunk.iloc[0:0], pg_table, if_exists='fail') self.logger.log_info(f"Таблица {pg_table} создана") first_chunk = False self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table) total_rows += len(chunk) if chunk_num % self.config.BATCH_SIZE == 0: self.logger.log_progress(table_name, chunk_num, total_rows) self.logger.log_info(f"Всего загружено строк: {total_rows}") if total_rows > 0: self.sync_target_schema(table_name, pg_table) self.create_timescale_hypertable(table_config) if self.can_create_primary_key(table_config, pk_columns): self.logger.log_info(f"Создание первичного ключа для {pg_table}") self.create_pg_primary_key(pg_table, pk_columns) if indexes: self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}") self.create_pg_indexes(pg_table, indexes) if foreign_keys: self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}") self.create_pg_foreign_keys(pg_table, foreign_keys) self.logger.log_info(f"Обновление статистики для {pg_table}") if total_rows > 1000000: self.vacuum_analyze_table(pg_table) else: self.analyze_table(pg_table) self.logger.log_table_success(table_name, total_rows) return True except Exception as e: if self.is_retryable_mssql_error(e): raise self.logger.log_table_failure(table_name, str(e)) return False def migrate_incremental_table(self, table_config: TableMigrationConfig) -> bool: """Инкрементальная миграция Life_ таблицы по x_DateTime.""" table_name = table_config.source_table pg_table = table_config.pg_table self.logger.log_table_start(f"{table_name} ({table_config.read_table})") upsert_staging = f"_stg_upsert_{pg_table}" delete_staging = f"_stg_delete_{pg_table}" try: if not table_config.life_table: raise ValueError(f"Для инкрементальной миграции {table_name} не задана life_table") last_watermark = self.get_last_watermark(pg_table) upper_bound = self.get_incremental_upper_bound(table_config) target_exists = self.table_exists(pg_table) if upper_bound['last_x_datetime'] is None: self.logger.log_info(f"В {table_config.read_table} нет данных для инкрементальной миграции") self.save_watermark(pg_table, last_watermark['last_x_datetime'], last_watermark['last_sequence_value'], 0, 'success') self.logger.log_table_success(table_name, 0) return True if ( table_config.initial_load_mode == 'full_then_incremental' and not target_exists and last_watermark['last_x_datetime'] is None ): self.logger.log_info( f"Первичная загрузка {table_name}: full snapshot из {table_config.source_table}, " f"затем watermark по {table_config.read_table}" ) success = self.migrate_full_table(table_config, read_limit=self.config.READ_LIMIT) if success: self.save_watermark( pg_table, upper_bound['last_x_datetime'], upper_bound['last_sequence_value'], 0, 'success', ) return success self.logger.log_info( f"Инкрементальная миграция {table_config.read_table}: " f"{table_config.datetime_column} > {last_watermark}, <= {upper_bound}" ) first_chunk = not target_exists total_rows = 0 total_events = 0 max_seen_watermark = dict(last_watermark) chunks = self.read_incremental_chunks( table_config, last_watermark, upper_bound, read_limit=self.config.READ_LIMIT, ) if target_exists: self.sync_target_schema(table_name, pg_table) if table_config.primary_key and target_exists and self.can_create_primary_key(table_config, table_config.primary_key): self.ensure_pg_primary_key(pg_table, table_config.primary_key) for chunk_num, chunk in enumerate(chunks, 1): if chunk.empty: continue if table_config.datetime_column not in chunk.columns: raise ValueError( f"В таблице {table_config.read_table} не найдено поле {table_config.datetime_column}" ) upsert_chunk, delete_chunk = self.split_incremental_chunk(chunk, table_config) write_chunk = self.prepare_incremental_chunk(upsert_chunk, table_config) write_chunk = self.deduplicate_incremental_chunk(write_chunk, table_config.primary_key) delete_count = self.get_effective_delete_count(delete_chunk, table_config.primary_key) missing_pk_columns = [ column for column in table_config.primary_key if column not in write_chunk.columns ] if not write_chunk.empty and missing_pk_columns: raise ValueError( f"После исключения служебных полей отсутствуют ключевые поля: {missing_pk_columns}" ) if not delete_chunk.empty: self.delete_dataframe_batch(delete_chunk, pg_table, table_config.primary_key, staging_table=delete_staging) if write_chunk.empty: pass elif first_chunk: write_chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False) self.write_dataframe_batch_without_sqlalchemy(write_chunk, pg_table) self.create_timescale_hypertable(table_config) if self.can_create_primary_key(table_config, table_config.primary_key): self.create_pg_primary_key(pg_table, table_config.primary_key) first_chunk = False elif table_config.primary_key: self.upsert_dataframe_batch(write_chunk, pg_table, table_config.primary_key, staging_table=upsert_staging) else: self.write_dataframe_batch(write_chunk, pg_table, if_exists='append') total_events += len(chunk) total_rows += len(write_chunk) + delete_count last_row = chunk.iloc[-1] max_seen_watermark = { 'last_x_datetime': last_row[table_config.datetime_column], 'last_sequence_value': ( int(last_row[table_config.sequence_column]) if table_config.sequence_column and pd.notna(last_row[table_config.sequence_column]) else None ), } if chunk_num % self.config.BATCH_SIZE == 0: self.logger.log_progress(table_name, chunk_num, total_rows) if total_rows == 0: max_seen_watermark = upper_bound self.save_watermark( pg_table, max_seen_watermark['last_x_datetime'], max_seen_watermark['last_sequence_value'], total_rows, 'success', ) self.logger.log_info(f"Всего инкрементально загружено строк: {total_rows}") if total_events != total_rows: self.logger.log_info( f"Получено событий из Life_: {total_events}, фактически применено изменений: {total_rows}" ) if total_rows > 1000000: self.vacuum_analyze_table(pg_table) elif total_rows > 0: self.analyze_table(pg_table) self.logger.log_table_success(table_name, total_rows) return True except Exception as e: if self.is_retryable_mssql_error(e): raise try: failed_watermark = self.get_last_watermark(pg_table) self.save_watermark( pg_table, failed_watermark['last_x_datetime'], failed_watermark['last_sequence_value'], 0, 'failed', str(e), ) except Exception as state_error: self.logger.log_error(f"Ошибка при сохранении состояния миграции {pg_table}", state_error) self.logger.log_table_failure(table_name, str(e)) return False finally: try: with self.dst_engine.connect() as conn: conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(upsert_staging)}')) conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(delete_staging)}')) conn.commit() except Exception as cleanup_error: self.logger.log_warning(f"Не удалось очистить staging-таблицы для {pg_table}: {cleanup_error}") def run_migration( self, table_names: Optional[List[str]] = None, send_email: bool = False, dry_run: Optional[bool] = None, read_limit: Optional[int] = None, force_full: bool = False, ): """Запуск полной миграции""" if dry_run is not None: self.config.DRY_RUN = dry_run if read_limit is not None: self.config.READ_LIMIT = read_limit table_configs = self.table_configs if table_names: requested = {table_name.lower() for table_name in table_names} table_configs = [ table_config for table_config in self.table_configs if table_config.source_table.lower() in requested or table_config.pg_table in requested ] self.logger.stats['total_tables'] = len(table_configs) self.logger.log_info("="*60) self.logger.log_info("НАЧАЛО МИГРАЦИИ ДАННЫХ") self.logger.log_info(f"Время начала: {self.logger.start_time}") self.logger.log_info(f"Количество таблиц для обработки: {len(table_configs)}") self.logger.log_info(f"Force full reload: {force_full}") self.logger.log_info("="*60) # Миграция таблиц for table_config in table_configs: success = self.migrate_table(table_config, force_full=force_full) if not success: self.logger.log_warning(f"Таблица {table_config.source_table} пропущена из-за ошибки") # Финальная статистика self.logger.log_info("Обновление статистики для всей базы данных...") self.update_table_statistics() # Генерация отчета report = self.logger.generate_report() # Вывод итогов self.logger.log_info("="*60) self.logger.log_info("ИТОГОВАЯ СТАТИСТИКА") self.logger.log_info("="*60) self.logger.log_info(f"Успешно скопировано таблиц: {report['summary']['successful_tables']}") self.logger.log_info(f"Не удалось скопировать таблиц: {report['summary']['failed_tables']}") self.logger.log_info(f"Всего строк: {report['summary']['total_rows']}") self.logger.log_info(f"Процент успеха: {report['summary']['success_rate']:.2f}%") self.logger.log_info(f"Продолжительность: {report['summary']['duration']}") self.logger.log_info("="*60) # Отправка email if send_email: self.send_notification(report) else: self.logger.log_info("Email отключен для этого запуска") return report def backfill_watermarks( self, table_names: Optional[List[str]] = None, overwrite: bool = False, dry_run: bool = False, ) -> Dict[str, Any]: """Заполнение migration_state верхними границами Life_ без перезаливки данных.""" original_dry_run = self.config.DRY_RUN self.config.DRY_RUN = dry_run try: table_configs = self.table_configs if table_names: requested = {table_name.lower() for table_name in table_names} table_configs = [ table_config for table_config in self.table_configs if table_config.source_table.lower() in requested or table_config.pg_table in requested ] summary = { 'processed': 0, 'updated': 0, 'skipped': 0, 'failed': 0, 'tables': [], } self.logger.log_info("=" * 60) self.logger.log_info("НАЧАЛО BACKFILL WATERMARKS") self.logger.log_info(f"Количество таблиц для обработки: {len(table_configs)}") self.logger.log_info(f"Overwrite existing: {overwrite}") self.logger.log_info(f"Dry run: {dry_run}") self.logger.log_info("=" * 60) for table_config in table_configs: summary['processed'] += 1 table_name = table_config.source_table pg_table = table_config.pg_table if table_config.mode != 'incremental' or not table_config.life_table: self.logger.log_info( f"Пропуск {table_name}: таблица не настроена на incremental через Life_" ) summary['skipped'] += 1 summary['tables'].append({ 'table': table_name, 'status': 'skipped', 'reason': 'not_incremental', }) continue existing_watermark = self.get_last_watermark(pg_table) if existing_watermark['last_x_datetime'] is not None and not overwrite: self.logger.log_info( f"Пропуск {table_name}: watermark уже существует для {pg_table}" ) summary['skipped'] += 1 summary['tables'].append({ 'table': table_name, 'status': 'skipped', 'reason': 'watermark_exists', 'watermark': existing_watermark, }) continue try: upper_bound = self.get_incremental_upper_bound(table_config) if upper_bound['last_x_datetime'] is None: self.logger.log_warning( f"Пропуск {table_name}: в {table_config.life_table} нет данных" ) summary['skipped'] += 1 summary['tables'].append({ 'table': table_name, 'status': 'skipped', 'reason': 'life_empty', }) continue self.save_watermark( pg_table, upper_bound['last_x_datetime'], upper_bound['last_sequence_value'], 0, 'success', ) self.logger.log_info( f"Watermark сохранен для {pg_table}: {upper_bound}" ) summary['updated'] += 1 summary['tables'].append({ 'table': table_name, 'status': 'updated', 'watermark': upper_bound, }) except Exception as e: self.logger.log_error( f"Ошибка backfill watermark для {table_name}", e, ) summary['failed'] += 1 summary['tables'].append({ 'table': table_name, 'status': 'failed', 'error': str(e), }) self.logger.log_info("=" * 60) self.logger.log_info("ИТОГ BACKFILL WATERMARKS") self.logger.log_info( f"Обработано: {summary['processed']}, обновлено: {summary['updated']}, " f"пропущено: {summary['skipped']}, ошибок: {summary['failed']}" ) self.logger.log_info("=" * 60) return summary finally: self.config.DRY_RUN = original_dry_run def send_notification(self, report: Dict[str, Any]): """Отправка уведомления по email""" # Формируем тело письма subject = f"{self.config.EMAIL_SUBJECT} - {datetime.now().strftime('%Y-%m-%d %H:%M')}" body = f""" Результат миграции данных MSSQL → PostgreSQL ОБЩАЯ ИНФОРМАЦИЯ: ------------------ Время начала: {report['summary']['start_time']} Время окончания: {report['summary']['end_time']} Продолжительность: {report['summary']['duration']} СТАТИСТИКА: ----------- Всего таблиц: {report['summary']['total_tables']} Успешно скопировано: {report['summary']['successful_tables']} Не удалось скопировать: {report['summary']['failed_tables']} Процент успеха: {report['summary']['success_rate']:.2f}% Всего строк: {report['summary']['total_rows']} УСПЕШНЫЕ ТАБЛИЦЫ ({len(report['successful_tables'])}): ------------------- {chr(10).join([f"- {t['name']}: {t['rows']} строк" for t in report['successful_tables']])} ПРОВАЛЕННЫЕ ТАБЛИЦЫ ({len(report['failed_tables'])}): --------------------- {chr(10).join([f"- {t['name']}: {t['error']}" for t in report['failed_tables']])} ОШИБКИ ({len(report['errors'])}): -------- {chr(10).join([f"- {e['message']}" for e in report['errors'][:5]])} {"..." if len(report['errors']) > 5 else ""} Лог-файл: {self.logger.log_file} Отчет в формате JSON также прикреплен к письму. """ # Вложения attachments = [ self.logger.log_file, os.path.join(self.config.LOG_DIR, f"migration_report_{self.logger.timestamp}.json") ] # Отправка self.email_sender.send_email(subject, body, attachments) def send_failure_notification( self, error: str, table_names: Optional[List[str]] = None, job_id: Optional[str] = None, ): """Отправка уведомления о провале job до формирования стандартного отчета.""" subject = f"{self.config.EMAIL_SUBJECT} - FAILED - {datetime.now().strftime('%Y-%m-%d %H:%M')}" tables_text = ', '.join(table_names) if table_names else 'all configured tables' body = f""" Результат миграции данных MSSQL → PostgreSQL СТАТУС: ------- Выполнение job завершилось ошибкой до формирования итогового отчета. ДЕТАЛИ: ------- Job ID: {job_id or 'n/a'} Таблицы: {tables_text} Ошибка: {error} Лог-файл: {self.logger.log_file} """ attachments = [self.logger.log_file] self.email_sender.send_email(subject, body, attachments) def cleanup_old_logs(self, days_to_keep: int = 7): """Очистка старых логов""" try: cutoff_date = datetime.now().timestamp() - (days_to_keep * 24 * 3600) for filename in os.listdir(self.config.LOG_DIR): filepath = os.path.join(self.config.LOG_DIR, filename) # Проверяем, что это лог-файл if filename.endswith('.log') or filename.endswith('.json'): # Получаем время последнего изменения file_mtime = os.path.getmtime(filepath) # Удаляем старые файлы if file_mtime < cutoff_date: os.remove(filepath) self.logger.log_info(f"Удален старый файл: {filename}") except Exception as e: self.logger.log_error("Ошибка при очистке старых логов", e)