Files
syncio/app/migrator.py
2026-05-29 06:47:29 +09:00

1730 lines
76 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import time
import csv
import io
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.exc import DBAPIError, OperationalError
from sqlalchemy.pool import NullPool
from sqlalchemy.sql import sqltypes
from .config import Config, TableMigrationConfig
from .email_sender import EmailSender
from .logging_utils import MigrationLogger
from .table_config_repository import TableConfigRepository
class DatabaseMigrator:
"""Основной класс для миграции данных"""
def __init__(self, config: Config):
self.config = config
# Подключение к PostgreSQL и загрузка конфигурации таблиц из БД
self.dst_engine = create_engine(config.POSTGRES_CONNECTION_STRING)
self.table_config_repository = TableConfigRepository(config, self.dst_engine)
self.table_configs = self.table_config_repository.load_configs(seed_defaults=True)
self.config.TABLE_MIGRATIONS = self.table_configs
self.config.TABLES_TO_COPY = [table.source_table for table in self.table_configs]
self.logger = MigrationLogger(config)
self.email_sender = EmailSender(config)
self._source_columns_cache: Dict[str, List[Dict[str, Any]]] = {}
self._mssql_indexes_cache: Dict[str, List[Dict[str, Any]]] = {}
self._mssql_pk_cache: Dict[str, Optional[List[str]]] = {}
self._mssql_fk_cache: Dict[str, Optional[List[Dict[str, str]]]] = {}
# Подключение к БД
self.logger.log_info(
f"Подключение к БД МИС (MSSQL), charset={self.config.MSSQL_CHARSET}"
)
self.src_engine = self.create_mssql_engine()
self.logger.log_info("Подключение к PostgreSQL")
def create_mssql_engine(self):
"""Создание SQLAlchemy engine для MSSQL с явной кодировкой pymssql."""
return create_engine(
self.config.MSSQL_CONNECTION_STRING,
connect_args={
'charset': self.config.MSSQL_CHARSET,
'login_timeout': self.config.MSSQL_CONNECT_TIMEOUT,
},
poolclass=NullPool,
)
def reconnect_mssql_engine(self):
"""Пересоздание MSSQL engine после сетевого/драйверного сбоя."""
try:
self.src_engine.dispose()
except Exception:
pass
self._source_columns_cache.clear()
self._mssql_indexes_cache.clear()
self._mssql_pk_cache.clear()
self._mssql_fk_cache.clear()
self.src_engine = self.create_mssql_engine()
def is_retryable_mssql_error(self, exception: Exception) -> bool:
"""Определение временной ошибки MSSQL/pymssql, при которой есть смысл повторить таблицу."""
message = str(exception).lower()
retry_markers = (
'dbprocess is dead',
'unexpected eof',
'adaptive server connection failed',
'server closed the connection unexpectedly',
'connection reset',
'connection refused',
'communication link failure',
'lost connection',
'closed connection',
'not enabled',
'08s01',
'20002',
'20017',
)
if any(marker in message for marker in retry_markers):
return True
return isinstance(exception, (OperationalError, DBAPIError))
def migrate_table_once(self, table_config: TableMigrationConfig, force_full: bool = False) -> bool:
"""Один проход миграции таблицы без retry-обертки."""
if force_full:
self.logger.log_info(f"Force full reload для таблицы {table_config.source_table}")
is_initial_force_full = (
table_config.mode == 'incremental'
and table_config.initial_load_mode == 'full_then_incremental'
and not self.table_exists(table_config.pg_table)
and self.get_last_watermark(table_config.pg_table).get('last_x_datetime') is None
)
if is_initial_force_full:
self.logger.log_info(
f"Первый force_full для {table_config.source_table}: "
"загрузка выполняется без SQLAlchemy"
)
success = self.migrate_full_table_without_sqlalchemy(table_config)
else:
success = self.migrate_full_table(table_config)
if success and table_config.mode == 'incremental' and table_config.life_table:
upper_bound = self.get_incremental_upper_bound(table_config)
self.save_watermark(
table_config.pg_table,
upper_bound['last_x_datetime'],
upper_bound['last_sequence_value'],
0,
'success',
)
self.logger.log_info(
f"После force full сохранен watermark для {table_config.pg_table}: "
f"{upper_bound}"
)
return success
if table_config.mode == 'incremental':
return self.migrate_incremental_table(table_config)
return self.migrate_full_table(table_config)
@staticmethod
def quote_identifier(identifier: str) -> str:
"""Экранирование SQL identifier для PostgreSQL/MSSQL."""
return '"' + identifier.replace('"', '""') + '"'
@staticmethod
def quote_mssql_identifier(identifier: str) -> str:
"""Экранирование SQL Server identifier."""
return '[' + identifier.replace(']', ']]') + ']'
def qualify_table_name(self, table_name: str, schema: Optional[str] = None) -> str:
"""Полное имя таблицы PostgreSQL с указанием схемы."""
if schema:
return f'{self.quote_identifier(schema)}.{self.quote_identifier(table_name)}'
return self.quote_identifier(table_name)
def compile_pg_type(self, source_type) -> str:
"""Преобразование SQLAlchemy/MSSQL типа в тип PostgreSQL."""
type_name = source_type.__class__.__name__.lower()
if 'uniqueidentifier' in type_name or 'uuid' in type_name:
return 'uuid'
if 'bit' in type_name or isinstance(source_type, sqltypes.Boolean):
return 'boolean'
if isinstance(source_type, sqltypes.BigInteger):
return 'bigint'
if isinstance(source_type, sqltypes.SmallInteger):
return 'smallint'
if isinstance(source_type, sqltypes.Integer):
return 'integer'
if isinstance(source_type, sqltypes.Numeric):
precision = getattr(source_type, 'precision', None)
scale = getattr(source_type, 'scale', None)
if precision is not None and scale is not None:
return f'numeric({precision}, {scale})'
if precision is not None:
return f'numeric({precision})'
return 'numeric'
if isinstance(source_type, (sqltypes.Float, sqltypes.REAL)):
return 'double precision'
if isinstance(source_type, sqltypes.DateTime):
return 'timestamp'
if isinstance(source_type, sqltypes.Date):
return 'date'
if isinstance(source_type, sqltypes.Time):
return 'time'
if isinstance(source_type, sqltypes.CHAR):
length = getattr(source_type, 'length', None)
return f'char({length})' if length else 'char'
if isinstance(source_type, sqltypes.String):
length = getattr(source_type, 'length', None)
return f'varchar({length})' if length else 'text'
if isinstance(source_type, (sqltypes.Text, sqltypes.UnicodeText)):
return 'text'
if isinstance(source_type, sqltypes.LargeBinary):
return 'bytea'
return 'text'
def get_source_table_columns(self, table_name: str) -> List[Dict[str, Any]]:
"""Получение колонок исходной MSSQL таблицы."""
cache_key = table_name.lower()
if cache_key not in self._source_columns_cache:
self._source_columns_cache[cache_key] = inspect(self.src_engine).get_columns(table_name)
return self._source_columns_cache[cache_key]
def get_target_table_columns(self, table_name: str) -> List[Dict[str, Any]]:
"""Получение колонок целевой PostgreSQL таблицы."""
return inspect(self.dst_engine).get_columns(table_name)
def sync_target_schema(self, source_table: str, target_table: str):
"""Добавление новых колонок из MSSQL в PostgreSQL как nullable."""
if not self.table_exists(target_table):
return
source_columns = self.get_source_table_columns(source_table)
target_columns = {
column['name'].lower(): column
for column in self.get_target_table_columns(target_table)
}
for source_column in source_columns:
column_name = source_column['name']
lower_name = column_name.lower()
if lower_name in target_columns:
target_type = str(target_columns[lower_name]['type']).lower()
source_type = self.compile_pg_type(source_column['type']).lower()
if target_type != source_type:
self.logger.log_warning(
f"Тип колонки {target_table}.{column_name} отличается: "
f"source={source_type}, target={target_type}. Автоизменение типа не выполняется"
)
continue
pg_type = self.compile_pg_type(source_column['type'])
self.logger.log_info(
f"Обнаружена новая колонка {source_table}.{column_name}; "
f"добавление в {target_table} как nullable ({pg_type})"
)
if self.config.DRY_RUN:
self.logger.log_info(
f"DRY RUN: пропущено добавление колонки {target_table}.{column_name}"
)
continue
sql = text(
f'ALTER TABLE {self.qualify_table_name(target_table)} '
f'ADD COLUMN IF NOT EXISTS {self.quote_identifier(column_name)} {pg_type} NULL'
)
with self.dst_engine.connect() as conn:
conn.execute(sql)
conn.commit()
def get_table_config(self, table_name: str) -> TableMigrationConfig:
"""Получение настроек таблицы по имени источника или цели."""
table_config = self.table_config_repository.get_config(table_name)
return table_config or TableMigrationConfig(source_table=table_name)
def table_exists(self, table_name: str) -> bool:
"""Проверка существования таблицы в целевой БД."""
return self.table_exists_in_schema(table_name, None)
def table_exists_in_schema(self, table_name: str, schema: Optional[str]) -> bool:
"""Проверка существования таблицы в указанной схеме PostgreSQL."""
query = text("""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = COALESCE(:schema, current_schema())
AND table_name = :table_name
)
""")
with self.dst_engine.connect() as conn:
return bool(conn.execute(query, {'schema': schema, 'table_name': table_name}).scalar())
def ensure_state_table(self):
"""Создание таблицы состояния инкрементальной миграции."""
qualified_state_table = self.qualify_table_name(
self.config.STATE_TABLE,
self.config.REPLICATOR_SCHEMA,
)
sql = f"""
CREATE TABLE IF NOT EXISTS {qualified_state_table} (
table_name text PRIMARY KEY,
last_x_datetime timestamp NULL,
last_sequence_value bigint NULL,
last_run_at timestamp NOT NULL DEFAULT now(),
rows_copied bigint NOT NULL DEFAULT 0,
status text NOT NULL DEFAULT 'pending',
error text NULL
)
"""
with self.dst_engine.connect() as conn:
conn.execute(text(
f'CREATE SCHEMA IF NOT EXISTS {self.quote_identifier(self.config.REPLICATOR_SCHEMA)}'
))
conn.execute(text(sql))
conn.execute(text(f"""
ALTER TABLE {qualified_state_table}
ADD COLUMN IF NOT EXISTS last_sequence_value bigint NULL
"""))
conn.execute(text(f"""
DELETE FROM {qualified_state_table} AS duplicate_rows
USING {qualified_state_table} AS preserved_rows
WHERE duplicate_rows.table_name = preserved_rows.table_name
AND duplicate_rows.ctid < preserved_rows.ctid
"""))
conn.execute(text(f"""
CREATE UNIQUE INDEX IF NOT EXISTS idx_{self.config.STATE_TABLE}_table_name
ON {qualified_state_table} (table_name)
"""))
conn.commit()
def get_last_watermark(self, table_name: str) -> Dict[str, Any]:
"""Чтение последнего успешно обработанного x_DateTime."""
empty_watermark = {'last_x_datetime': None, 'last_sequence_value': None}
if self.config.DRY_RUN and not self.table_exists_in_schema(
self.config.STATE_TABLE,
self.config.REPLICATOR_SCHEMA,
):
return empty_watermark
self.ensure_state_table()
qualified_state_table = self.qualify_table_name(
self.config.STATE_TABLE,
self.config.REPLICATOR_SCHEMA,
)
sql = text(f"""
SELECT last_x_datetime, last_sequence_value
FROM {qualified_state_table}
WHERE table_name = :table_name
""")
with self.dst_engine.connect() as conn:
row = conn.execute(sql, {'table_name': table_name}).mappings().first()
if not row:
return empty_watermark
return {
'last_x_datetime': row['last_x_datetime'],
'last_sequence_value': row['last_sequence_value'],
}
def save_watermark(
self,
table_name: str,
last_x_datetime: Optional[datetime],
last_sequence_value: Optional[int],
rows_copied: int,
status: str,
error: Optional[str] = None,
):
"""Сохранение состояния инкрементальной миграции."""
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: состояние миграции {table_name} не обновляется")
return
self.ensure_state_table()
qualified_state_table = self.qualify_table_name(
self.config.STATE_TABLE,
self.config.REPLICATOR_SCHEMA,
)
sql = text(f"""
INSERT INTO {qualified_state_table}
(table_name, last_x_datetime, last_sequence_value, last_run_at, rows_copied, status, error)
VALUES
(:table_name, :last_x_datetime, :last_sequence_value, now(), :rows_copied, :status, :error)
ON CONFLICT (table_name) DO UPDATE SET
last_x_datetime = EXCLUDED.last_x_datetime,
last_sequence_value = EXCLUDED.last_sequence_value,
last_run_at = EXCLUDED.last_run_at,
rows_copied = EXCLUDED.rows_copied,
status = EXCLUDED.status,
error = EXCLUDED.error
""")
with self.dst_engine.connect() as conn:
conn.execute(sql, {
'table_name': table_name,
'last_x_datetime': last_x_datetime,
'last_sequence_value': last_sequence_value,
'rows_copied': rows_copied,
'status': status,
'error': error,
})
conn.commit()
def get_incremental_upper_bound(self, table_config: TableMigrationConfig) -> Dict[str, Any]:
"""Фиксация верхней границы Life_ на начало запуска."""
select_columns = [
f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_datetime",
]
if table_config.sequence_column:
select_columns.append(
f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_sequence_value"
)
else:
select_columns.append("CAST(NULL AS bigint) AS max_sequence_value")
order_columns = ', '.join([
f"{self.quote_mssql_identifier(column)} DESC"
for column in table_config.incremental_order_columns
])
sql = text(f"""
SELECT TOP 1 {', '.join(select_columns)}
FROM {self.quote_mssql_identifier(table_config.read_table)}
ORDER BY {order_columns}
""")
with self.src_engine.connect() as conn:
row = conn.execute(sql).mappings().first()
if not row:
return {'last_x_datetime': None, 'last_sequence_value': None}
return {
'last_x_datetime': row['max_datetime'],
'last_sequence_value': row['max_sequence_value'],
}
def read_incremental_chunks(
self,
table_config: TableMigrationConfig,
last_watermark: Dict[str, Any],
upper_bound: Dict[str, Any],
read_limit: Optional[int] = None,
):
"""Чтение Life_ таблицы по x_DateTime чанками."""
where_parts = [
f"{self.quote_mssql_identifier(table_config.datetime_column)} <= :upper_bound_datetime",
]
params = {'upper_bound_datetime': upper_bound['last_x_datetime']}
if table_config.sequence_column and upper_bound.get('last_sequence_value') is not None:
where_parts.append(f"""
(
{self.quote_mssql_identifier(table_config.datetime_column)} < :upper_bound_datetime
OR (
{self.quote_mssql_identifier(table_config.datetime_column)} = :upper_bound_datetime
AND {self.quote_mssql_identifier(table_config.sequence_column)} <= :upper_bound_sequence
)
)
""")
params['upper_bound_sequence'] = upper_bound['last_sequence_value']
if last_watermark.get('last_x_datetime') is not None:
if table_config.sequence_column and last_watermark.get('last_sequence_value') is not None:
where_parts.append(f"""
(
{self.quote_mssql_identifier(table_config.datetime_column)} > :last_watermark_datetime
OR (
{self.quote_mssql_identifier(table_config.datetime_column)} = :last_watermark_datetime
AND {self.quote_mssql_identifier(table_config.sequence_column)} > :last_watermark_sequence
)
)
""")
params['last_watermark_sequence'] = last_watermark['last_sequence_value']
else:
where_parts.append(
f"{self.quote_mssql_identifier(table_config.datetime_column)} > :last_watermark_datetime"
)
params['last_watermark_datetime'] = last_watermark['last_x_datetime']
order_columns = ', '.join([
self.quote_mssql_identifier(column)
for column in table_config.incremental_order_columns
])
top_clause = f"TOP {int(read_limit)} " if read_limit else ""
sql = text(f"""
SELECT {top_clause}*
FROM {self.quote_mssql_identifier(table_config.read_table)}
WHERE {' AND '.join(where_parts)}
ORDER BY {order_columns}
""")
return pd.read_sql_query(
sql,
self.src_engine,
params=params,
chunksize=self.config.INCREMENTAL_CHUNK_SIZE,
)
def read_full_chunks(
self,
table_name: str,
read_limit: Optional[int] = None,
):
"""Чтение полной таблицы чанками с опциональным лимитом для проверки."""
if read_limit:
sql = text(f"SELECT TOP {int(read_limit)} * FROM {self.quote_mssql_identifier(table_name)}")
return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE)
return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE)
def write_dataframe_batch(
self,
chunk: pd.DataFrame,
table_name: str,
if_exists: str = 'append',
):
"""Batch-запись DataFrame в PostgreSQL."""
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}")
return
chunk.to_sql(
table_name,
self.dst_engine,
if_exists=if_exists,
index=False,
chunksize=self.config.WRITE_CHUNK_SIZE,
method='multi',
)
def read_full_chunks_without_sqlalchemy(
self,
table_name: str,
read_limit: Optional[int] = None,
):
"""Чтение полной таблицы чанками через DBAPI-курсор (без pandas.read_sql_*)."""
src_connection = self.src_engine.raw_connection()
cursor = src_connection.cursor()
top_clause = f"TOP {int(read_limit)} " if read_limit else ""
sql = f"SELECT {top_clause}* FROM {self.quote_mssql_identifier(table_name)}"
try:
cursor.execute(sql)
columns = [column[0] for column in cursor.description]
while True:
rows = cursor.fetchmany(self.config.FULL_LOAD_CHUNK_SIZE)
if not rows:
break
yield pd.DataFrame.from_records(rows, columns=columns)
finally:
try:
cursor.close()
finally:
src_connection.close()
def write_dataframe_batch_without_sqlalchemy(
self,
chunk: pd.DataFrame,
table_name: str,
):
"""Batch-запись DataFrame в PostgreSQL через COPY (без pandas.to_sql)."""
if chunk.empty:
return
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}")
return
buffer = io.StringIO()
chunk.to_csv(
buffer,
index=False,
header=False,
sep='\t',
na_rep='\\N',
date_format='%Y-%m-%d %H:%M:%S.%f',
quoting=csv.QUOTE_MINIMAL,
escapechar='\\',
)
buffer.seek(0)
quoted_columns = ', '.join(
self.quote_identifier(column_name)
for column_name in chunk.columns
)
copy_sql = (
f"COPY {self.qualify_table_name(table_name)} ({quoted_columns}) "
"FROM STDIN WITH (FORMAT csv, DELIMITER E'\\t', NULL '\\N', ESCAPE '\\', QUOTE '\"')"
)
dst_connection = self.dst_engine.raw_connection()
cursor = dst_connection.cursor()
try:
cursor.copy_expert(copy_sql, buffer)
dst_connection.commit()
except Exception:
dst_connection.rollback()
raise
finally:
try:
cursor.close()
finally:
dst_connection.close()
def prepare_incremental_chunk(
self,
chunk: pd.DataFrame,
table_config: TableMigrationConfig,
) -> pd.DataFrame:
"""Удаление служебных Life_ полей перед записью в целевую таблицу."""
exclude_columns = [
column for column in table_config.exclude_columns
if column in chunk.columns
]
if not exclude_columns:
return chunk
return chunk.drop(columns=exclude_columns)
def split_incremental_chunk(
self,
chunk: pd.DataFrame,
table_config: TableMigrationConfig,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Разделение Life_ чанка на строки для upsert и delete."""
if not table_config.operation_column:
return chunk, chunk.iloc[0:0].copy()
if table_config.operation_column not in chunk.columns:
raise ValueError(
f"В таблице {table_config.read_table} не найдено поле {table_config.operation_column}"
)
operations = chunk[table_config.operation_column].astype(str).str.lower().str.strip()
upsert_operations = {operation.lower() for operation in table_config.upsert_operations}
delete_operations = {operation.lower() for operation in table_config.delete_operations}
upsert_chunk = chunk[operations.isin(upsert_operations)]
delete_chunk = chunk[operations.isin(delete_operations)]
return upsert_chunk, delete_chunk
def deduplicate_incremental_chunk(
self,
chunk: pd.DataFrame,
primary_key: List[str],
) -> pd.DataFrame:
"""Схлопывание повторных событий по ключу внутри одного чанка с сохранением последней версии."""
if chunk.empty or not primary_key:
return chunk
missing_columns = [column for column in primary_key if column not in chunk.columns]
if missing_columns:
raise ValueError(
f"Для дедупликации не найдены ключевые поля: {missing_columns}"
)
before_count = len(chunk)
deduplicated_chunk = chunk.drop_duplicates(subset=primary_key, keep='last')
dropped_rows = before_count - len(deduplicated_chunk)
if dropped_rows > 0:
self.logger.log_info(
f"Схлопнуто {dropped_rows} повторных событий внутри чанка по ключу {primary_key}"
)
return deduplicated_chunk
def get_effective_delete_count(
self,
chunk: pd.DataFrame,
primary_key: List[str],
) -> int:
"""Количество фактических delete-операций после схлопывания дублей по ключу."""
if chunk.empty:
return 0
if not primary_key:
return len(chunk)
missing_columns = [column for column in primary_key if column not in chunk.columns]
if missing_columns:
raise ValueError(
f"Для расчета delete count не найдены ключевые поля: {missing_columns}"
)
return len(chunk[primary_key].drop_duplicates())
def delete_dataframe_batch(
self,
chunk: pd.DataFrame,
table_name: str,
primary_key: List[str],
staging_table: Optional[str] = None,
):
"""Batch delete в PostgreSQL через staging-таблицу с ключами."""
if chunk.empty:
return
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущено удаление {len(chunk)} строк из {table_name}")
return
if not self.table_exists(table_name):
self.logger.log_warning(f"Удаление пропущено: таблица {table_name} еще не существует")
return
if not primary_key:
raise ValueError(f"Для удаления из {table_name} не задан primary_key")
missing_columns = [column for column in primary_key if column not in chunk.columns]
if missing_columns:
raise ValueError(f"Для удаления из {table_name} не найдены ключевые поля: {missing_columns}")
own_staging = staging_table is None
staging = staging_table or f"_stg_delete_{table_name}_{int(time.time() * 1000)}"
key_chunk = chunk[primary_key].drop_duplicates()
join_condition = ' AND '.join([
f"target.{self.quote_identifier(column)} = source.{self.quote_identifier(column)}"
for column in primary_key
])
try:
if self.table_exists(staging):
with self.dst_engine.connect() as conn:
conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}'))
conn.commit()
else:
key_chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False)
with self.dst_engine.connect() as conn:
conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED'))
conn.commit()
self.write_dataframe_batch_without_sqlalchemy(key_chunk, staging)
sql = f"""
DELETE FROM {self.quote_identifier(table_name)} AS target
USING {self.quote_identifier(staging)} AS source
WHERE {join_condition}
"""
with self.dst_engine.connect() as conn:
conn.execute(text(sql))
if own_staging:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
except Exception:
if own_staging:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
raise
def upsert_dataframe_batch(
self,
chunk: pd.DataFrame,
table_name: str,
primary_key: List[str],
staging_table: Optional[str] = None,
):
"""Batch upsert через staging-таблицу."""
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущен upsert {len(chunk)} строк в {table_name}")
return
if not primary_key:
self.write_dataframe_batch(chunk, table_name, if_exists='append')
return
own_staging = staging_table is None
staging = staging_table or f"_stg_{table_name}_{int(time.time() * 1000)}"
columns = list(chunk.columns)
quoted_columns = ', '.join([self.quote_identifier(column) for column in columns])
conflict_columns = ', '.join([self.quote_identifier(column) for column in primary_key])
update_columns = [column for column in columns if column not in primary_key]
if update_columns:
update_set = ', '.join([
f"{self.quote_identifier(column)} = EXCLUDED.{self.quote_identifier(column)}"
for column in update_columns
])
conflict_action = f"DO UPDATE SET {update_set}"
else:
conflict_action = "DO NOTHING"
try:
if self.table_exists(staging):
with self.dst_engine.connect() as conn:
conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}'))
conn.commit()
else:
chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False)
with self.dst_engine.connect() as conn:
conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED'))
conn.commit()
self.write_dataframe_batch_without_sqlalchemy(chunk, staging)
sql = f"""
INSERT INTO {self.quote_identifier(table_name)} ({quoted_columns})
SELECT {quoted_columns}
FROM {self.quote_identifier(staging)}
ON CONFLICT ({conflict_columns}) {conflict_action}
"""
with self.dst_engine.connect() as conn:
conn.execute(text(sql))
if own_staging:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
except Exception:
if own_staging:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
raise
def create_timescale_hypertable(self, table_config: TableMigrationConfig):
"""Преобразование PostgreSQL таблицы в TimescaleDB hypertable."""
if not self.config.ENABLE_TIMESCALE or not table_config.timescale:
return
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущено создание TimescaleDB hypertable для {table_config.pg_table}")
return
time_column = table_config.timescale_time_column or table_config.datetime_column
table_name = table_config.pg_table
try:
with self.dst_engine.connect() as conn:
conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb"))
conn.execute(text("""
SELECT create_hypertable(
:table_name,
:time_column,
if_not_exists => TRUE,
migrate_data => TRUE
)
"""), {
'table_name': table_name,
'time_column': time_column,
})
conn.commit()
self.logger.log_info(f"Таблица {table_name} преобразована в TimescaleDB hypertable")
except Exception as e:
self.logger.log_error(f"Ошибка при создании TimescaleDB hypertable для {table_name}", e)
def can_create_primary_key(
self,
table_config: TableMigrationConfig,
pk_columns: List[str],
) -> bool:
"""Проверка совместимости primary key с TimescaleDB."""
if not pk_columns:
return False
if not self.config.ENABLE_TIMESCALE or not table_config.timescale:
return True
time_column = table_config.timescale_time_column or table_config.datetime_column
if time_column in pk_columns:
return True
self.logger.log_warning(
f"Primary key для {table_config.pg_table} пропущен: TimescaleDB hypertable "
f"требует включить time column {time_column} в уникальные ограничения"
)
return False
def get_mssql_indexes(self, table_name: str) -> List[Dict[str, Any]]:
"""Получение индексов из MSSQL таблицы"""
cache_key = table_name.lower()
if cache_key in self._mssql_indexes_cache:
return self._mssql_indexes_cache[cache_key]
indexes = []
query = f"""
SELECT
i.name as index_name,
i.is_unique,
i.type_desc as index_type,
c.name as column_name,
ic.key_ordinal as column_position,
ic.is_descending_key
FROM sys.indexes i
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
INNER JOIN sys.tables t ON i.object_id = t.object_id
WHERE t.name = '{table_name}'
AND i.type_desc NOT IN ('HEAP', 'CLUSTERED')
AND i.is_primary_key = 0
ORDER BY i.name, ic.key_ordinal
"""
try:
index_df = pd.read_sql_query(query, self.src_engine)
if not index_df.empty:
for index_name in index_df['index_name'].unique():
index_data = index_df[index_df['index_name'] == index_name]
unique = bool(index_data.iloc[0]['is_unique'])
columns = []
for _, row in index_data.sort_values('column_position').iterrows():
column_name = row['column_name']
column_name = column_name.replace('[', '').replace(']', '')
columns.append(column_name)
indexes.append({
'name': index_name,
'unique': unique,
'columns': columns
})
self.logger.log_info(f"Найдено {len(indexes)} индексов для таблицы {table_name}")
except Exception as e:
self.logger.log_error(f"Ошибка при получении индексов для таблицы {table_name}", e)
self._mssql_indexes_cache[cache_key] = indexes
return indexes
def create_pg_indexes(self, table_name: str, indexes: List[Dict[str, Any]]):
"""Создание индексов в PostgreSQL"""
if not indexes:
return
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущено создание {len(indexes)} индексов для {table_name}")
return
for index_info in indexes:
index_name = f"idx_{table_name}_{index_info['name'].lower()}"
index_name = re.sub(r'[^a-z0-9_]', '_', index_name)
columns = ', '.join([f'"{col}"' for col in index_info['columns']])
unique_str = 'UNIQUE ' if index_info['unique'] else ''
sql = f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ({columns})'
try:
with self.dst_engine.connect() as conn:
conn.execute(text(sql))
conn.commit()
self.logger.log_info(f"Создан индекс: {index_name} на столбцах: {columns}")
except Exception as e:
self.logger.log_error(f"Ошибка при создании индекса {index_name}", e)
def get_mssql_primary_key(self, table_name: str) -> Optional[List[str]]:
"""Получение информации о первичном ключе из MSSQL"""
cache_key = table_name.lower()
if cache_key in self._mssql_pk_cache:
return self._mssql_pk_cache[cache_key]
query = f"""
SELECT
c.name as column_name
FROM sys.indexes i
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
INNER JOIN sys.tables t ON i.object_id = t.object_id
WHERE t.name = '{table_name}'
AND i.is_primary_key = 1
ORDER BY ic.key_ordinal
"""
try:
pk_df = pd.read_sql_query(query, self.src_engine)
if not pk_df.empty:
pk_columns = [row['column_name'] for _, row in pk_df.iterrows()]
self.logger.log_info(f"Найден первичный ключ для таблицы {table_name}: {pk_columns}")
self._mssql_pk_cache[cache_key] = pk_columns
return pk_columns
except Exception as e:
self.logger.log_error(f"Ошибка при получении первичного ключа для таблицы {table_name}", e)
self._mssql_pk_cache[cache_key] = None
return None
def create_pg_primary_key(self, table_name: str, pk_columns: List[str]):
"""Создание первичного ключа в PostgreSQL"""
if not pk_columns:
return
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущено создание первичного ключа для {table_name}")
return
columns = ', '.join([f'"{col}"' for col in pk_columns])
pk_name = f"pk_{table_name}"
sql = f'ALTER TABLE "{table_name}" ADD CONSTRAINT "{pk_name}" PRIMARY KEY ({columns})'
try:
with self.dst_engine.connect() as conn:
conn.execute(text(sql))
conn.commit()
self.logger.log_info(f"Создан первичный ключ: {pk_name} на столбцах: {columns}")
except Exception as e:
self.logger.log_error(f"Ошибка при создании первичного ключа {pk_name}", e)
def has_pg_primary_key(self, table_name: str) -> bool:
"""Проверка наличия primary key в PostgreSQL."""
sql = text("""
SELECT EXISTS (
SELECT 1
FROM pg_constraint c
JOIN pg_class t ON t.oid = c.conrelid
JOIN pg_namespace n ON n.oid = t.relnamespace
WHERE c.contype = 'p'
AND n.nspname = current_schema()
AND t.relname = :table_name
)
""")
with self.dst_engine.connect() as conn:
return bool(conn.execute(sql, {'table_name': table_name}).scalar())
def ensure_pg_primary_key(self, table_name: str, pk_columns: List[str]):
"""Создание primary key, если его еще нет."""
if not pk_columns:
return
if self.has_pg_primary_key(table_name):
return
self.create_pg_primary_key(table_name, pk_columns)
def get_mssql_foreign_keys(self, table_name: str) -> Optional[List[Dict[str, str]]]:
"""Получение информации о внешних ключах из MSSQL"""
cache_key = table_name.lower()
if cache_key in self._mssql_fk_cache:
return self._mssql_fk_cache[cache_key]
query = f"""
SELECT
fk.name as fk_name,
pc.name as parent_column,
rc.name as referenced_column,
rt.name as referenced_table
FROM sys.foreign_keys fk
INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
INNER JOIN sys.columns pc ON fkc.parent_object_id = pc.object_id AND fkc.parent_column_id = pc.column_id
INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id
INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id
INNER JOIN sys.tables pt ON fkc.parent_object_id = pt.object_id
WHERE pt.name = '{table_name}'
ORDER BY fk.name, fkc.constraint_column_id
"""
try:
fk_df = pd.read_sql_query(query, self.src_engine)
if not fk_df.empty:
fks = []
for fk_name in fk_df['fk_name'].unique():
fk_data = fk_df[fk_df['fk_name'] == fk_name]
fks.append({
'name': fk_name,
'parent_column': fk_data.iloc[0]['parent_column'],
'referenced_table': fk_data.iloc[0]['referenced_table'].lower(),
'referenced_column': fk_data.iloc[0]['referenced_column']
})
self.logger.log_info(f"Найдено {len(fks)} внешних ключей для таблицы {table_name}")
self._mssql_fk_cache[cache_key] = fks
return fks
except Exception as e:
self.logger.log_error(f"Ошибка при получении внешних ключей для таблицы {table_name}", e)
self._mssql_fk_cache[cache_key] = None
return None
def create_pg_foreign_keys(self, table_name: str, foreign_keys: List[Dict[str, str]]):
"""Создание внешних ключей в PostgreSQL"""
if not foreign_keys:
return
if not self.config.CREATE_FOREIGN_KEYS:
self.logger.log_info(f"Создание внешних ключей отключено конфигом для {table_name}")
return
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущено создание {len(foreign_keys)} внешних ключей для {table_name}")
return
for fk_info in foreign_keys:
if not self.table_exists(fk_info['referenced_table']):
self.logger.log_warning(
f"Внешний ключ для {table_name}.{fk_info['parent_column']} пропущен: "
f"таблица {fk_info['referenced_table']} отсутствует в PostgreSQL"
)
continue
fk_name = f"fk_{table_name}_{fk_info['parent_column']}"
sql = f"""
ALTER TABLE "{table_name}"
ADD CONSTRAINT "{fk_name}"
FOREIGN KEY ("{fk_info['parent_column']}")
REFERENCES "{fk_info['referenced_table']}" ("{fk_info['referenced_column']}")
"""
try:
with self.dst_engine.connect() as conn:
conn.execute(text(sql))
conn.commit()
self.logger.log_info(f"Создан внешний ключ: {fk_name}")
except Exception as e:
self.logger.log_error(f"Ошибка при создании внешнего ключа {fk_name}", e)
def analyze_table(self, table_name: str):
"""Выполнение ANALYZE для таблицы"""
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущен ANALYZE для {table_name}")
return
try:
with self.dst_engine.connect() as conn:
sql = f'ANALYZE "{table_name}"'
conn.execute(text(sql))
conn.commit()
self.logger.log_info(f"Выполнен ANALYZE для таблицы {table_name}")
except Exception as e:
self.logger.log_error(f"Ошибка при выполнении ANALYZE для таблицы {table_name}", e)
def vacuum_analyze_table(self, table_name: str):
"""Выполнение VACUUM ANALYZE для таблицы"""
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущен VACUUM ANALYZE для {table_name}")
return
try:
with self.dst_engine.connect() as conn:
sql = f'VACUUM ANALYZE "{table_name}"'
conn.execute(text(sql))
conn.commit()
self.logger.log_info(f"Выполнен VACUUM ANALYZE для таблицы {table_name}")
except Exception as e:
self.logger.log_error(f"Ошибка при выполнении VACUUM ANALYZE для таблицы {table_name}", e)
def update_table_statistics(self):
"""Обновление статистики для всей базы данных"""
if self.config.DRY_RUN:
self.logger.log_info("DRY RUN: пропущен ANALYZE для всей базы данных")
return
try:
with self.dst_engine.connect() as conn:
conn.execute(text('ANALYZE'))
conn.commit()
self.logger.log_info("Выполнен ANALYZE для всей базы данных")
except Exception as e:
self.logger.log_error("Ошибка при выполнении ANALYZE для всей базы данных", e)
def truncate_table(self, table_name: str):
"""Очистка таблицы"""
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущено удаление таблицы {table_name}")
return
table_exists = self.table_exists(table_name)
if table_exists:
try:
query = text(f'DROP TABLE IF EXISTS {self.quote_identifier(table_name)} CASCADE')
with self.dst_engine.connect() as connection:
connection.execute(query)
connection.commit()
self.logger.log_info(f"Таблица {table_name} удалена")
except Exception as e:
self.logger.log_error(f"Ошибка при удалении таблицы {table_name}", e)
else:
self.logger.log_info(f"Таблица {table_name} не существует в целевой БД")
def migrate_table(self, table: Any, force_full: bool = False) -> bool:
"""Миграция одной таблицы"""
table_config = table if isinstance(table, TableMigrationConfig) else self.get_table_config(table)
max_attempts = max(1, self.config.MSSQL_TABLE_RETRIES + 1)
for attempt in range(1, max_attempts + 1):
try:
return self.migrate_table_once(table_config, force_full=force_full)
except Exception as e:
if not self.is_retryable_mssql_error(e) or attempt >= max_attempts:
raise
self.logger.log_warning(
f"Временная ошибка MSSQL при обработке {table_config.source_table}, "
f"попытка {attempt} из {max_attempts}: {e}"
)
self.reconnect_mssql_engine()
time.sleep(min(attempt * 5, 15))
return False
def migrate_full_table(
self,
table_config: TableMigrationConfig,
read_limit: Optional[int] = None,
) -> bool:
"""Полная миграция одной таблицы с пересозданием целевой таблицы."""
table_name = table_config.source_table
pg_table = table_config.pg_table
self.logger.log_table_start(table_name)
try:
# Получаем метаданные
indexes = self.get_mssql_indexes(table_name)
pk_columns = self.get_mssql_primary_key(table_name)
foreign_keys = self.get_mssql_foreign_keys(table_name)
# Читаем данные
self.logger.log_info(f"Чтение данных из {table_name}")
chunks = self.read_full_chunks(table_name, read_limit=read_limit)
# Загружаем данные
first_chunk = True
total_rows = 0
for chunk_num, chunk in enumerate(chunks, 1):
if first_chunk:
# Дропаем целевую таблицу только после успешного чтения первого чанка,
# чтобы не уничтожить данные при недоступном MSSQL
self.logger.log_info(f"Очистка целевой таблицы {pg_table}")
self.truncate_table(pg_table)
chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False)
self.logger.log_info(f"Таблица {pg_table} создана")
first_chunk = False
self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table)
total_rows += len(chunk)
if chunk_num % self.config.BATCH_SIZE == 0:
self.logger.log_progress(table_name, chunk_num, total_rows)
self.logger.log_info(f"Всего загружено строк: {total_rows}")
if total_rows > 0:
self.sync_target_schema(table_name, pg_table)
self.create_timescale_hypertable(table_config)
if self.can_create_primary_key(table_config, pk_columns):
self.logger.log_info(f"Создание первичного ключа для {pg_table}")
self.create_pg_primary_key(pg_table, pk_columns)
if indexes:
self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}")
self.create_pg_indexes(pg_table, indexes)
if foreign_keys:
self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}")
self.create_pg_foreign_keys(pg_table, foreign_keys)
self.logger.log_info(f"Обновление статистики для {pg_table}")
if total_rows > 1000000:
self.vacuum_analyze_table(pg_table)
else:
self.analyze_table(pg_table)
self.logger.log_table_success(table_name, total_rows)
return True
except Exception as e:
if self.is_retryable_mssql_error(e):
raise
self.logger.log_table_failure(table_name, str(e))
return False
def migrate_full_table_without_sqlalchemy(
self,
table_config: TableMigrationConfig,
read_limit: Optional[int] = None,
) -> bool:
"""Полная миграция одной таблицы без SQLAlchemy в этапе чтения/заливки данных."""
table_name = table_config.source_table
pg_table = table_config.pg_table
self.logger.log_table_start(table_name)
try:
indexes = self.get_mssql_indexes(table_name)
pk_columns = self.get_mssql_primary_key(table_name)
foreign_keys = self.get_mssql_foreign_keys(table_name)
self.logger.log_info(f"Чтение данных из {table_name} без SQLAlchemy")
chunks = self.read_full_chunks_without_sqlalchemy(table_name, read_limit=read_limit)
first_chunk = True
total_rows = 0
for chunk_num, chunk in enumerate(chunks, 1):
if first_chunk:
# Дропаем целевую таблицу только после успешного чтения первого чанка,
# чтобы не уничтожить данные при недоступном MSSQL
self.logger.log_info(f"Очистка целевой таблицы {pg_table}")
self.truncate_table(pg_table)
self.write_dataframe_batch(chunk.iloc[0:0], pg_table, if_exists='fail')
self.logger.log_info(f"Таблица {pg_table} создана")
first_chunk = False
self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table)
total_rows += len(chunk)
if chunk_num % self.config.BATCH_SIZE == 0:
self.logger.log_progress(table_name, chunk_num, total_rows)
self.logger.log_info(f"Всего загружено строк: {total_rows}")
if total_rows > 0:
self.sync_target_schema(table_name, pg_table)
self.create_timescale_hypertable(table_config)
if self.can_create_primary_key(table_config, pk_columns):
self.logger.log_info(f"Создание первичного ключа для {pg_table}")
self.create_pg_primary_key(pg_table, pk_columns)
if indexes:
self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}")
self.create_pg_indexes(pg_table, indexes)
if foreign_keys:
self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}")
self.create_pg_foreign_keys(pg_table, foreign_keys)
self.logger.log_info(f"Обновление статистики для {pg_table}")
if total_rows > 1000000:
self.vacuum_analyze_table(pg_table)
else:
self.analyze_table(pg_table)
self.logger.log_table_success(table_name, total_rows)
return True
except Exception as e:
if self.is_retryable_mssql_error(e):
raise
self.logger.log_table_failure(table_name, str(e))
return False
def migrate_incremental_table(self, table_config: TableMigrationConfig) -> bool:
"""Инкрементальная миграция Life_ таблицы по x_DateTime."""
table_name = table_config.source_table
pg_table = table_config.pg_table
self.logger.log_table_start(f"{table_name} ({table_config.read_table})")
upsert_staging = f"_stg_upsert_{pg_table}"
delete_staging = f"_stg_delete_{pg_table}"
try:
if not table_config.life_table:
raise ValueError(f"Для инкрементальной миграции {table_name} не задана life_table")
last_watermark = self.get_last_watermark(pg_table)
upper_bound = self.get_incremental_upper_bound(table_config)
target_exists = self.table_exists(pg_table)
if upper_bound['last_x_datetime'] is None:
self.logger.log_info(f"В {table_config.read_table} нет данных для инкрементальной миграции")
self.save_watermark(pg_table, last_watermark['last_x_datetime'], last_watermark['last_sequence_value'], 0, 'success')
self.logger.log_table_success(table_name, 0)
return True
if (
table_config.initial_load_mode == 'full_then_incremental'
and not target_exists
and last_watermark['last_x_datetime'] is None
):
self.logger.log_info(
f"Первичная загрузка {table_name}: full snapshot из {table_config.source_table}, "
f"затем watermark по {table_config.read_table}"
)
success = self.migrate_full_table(table_config, read_limit=self.config.READ_LIMIT)
if success:
self.save_watermark(
pg_table,
upper_bound['last_x_datetime'],
upper_bound['last_sequence_value'],
0,
'success',
)
return success
self.logger.log_info(
f"Инкрементальная миграция {table_config.read_table}: "
f"{table_config.datetime_column} > {last_watermark}, <= {upper_bound}"
)
first_chunk = not target_exists
total_rows = 0
total_events = 0
max_seen_watermark = dict(last_watermark)
chunks = self.read_incremental_chunks(
table_config,
last_watermark,
upper_bound,
read_limit=self.config.READ_LIMIT,
)
if target_exists:
self.sync_target_schema(table_name, pg_table)
if table_config.primary_key and target_exists and self.can_create_primary_key(table_config, table_config.primary_key):
self.ensure_pg_primary_key(pg_table, table_config.primary_key)
for chunk_num, chunk in enumerate(chunks, 1):
if chunk.empty:
continue
if table_config.datetime_column not in chunk.columns:
raise ValueError(
f"В таблице {table_config.read_table} не найдено поле {table_config.datetime_column}"
)
upsert_chunk, delete_chunk = self.split_incremental_chunk(chunk, table_config)
write_chunk = self.prepare_incremental_chunk(upsert_chunk, table_config)
write_chunk = self.deduplicate_incremental_chunk(write_chunk, table_config.primary_key)
delete_count = self.get_effective_delete_count(delete_chunk, table_config.primary_key)
missing_pk_columns = [
column for column in table_config.primary_key
if column not in write_chunk.columns
]
if not write_chunk.empty and missing_pk_columns:
raise ValueError(
f"После исключения служебных полей отсутствуют ключевые поля: {missing_pk_columns}"
)
if not delete_chunk.empty:
self.delete_dataframe_batch(delete_chunk, pg_table, table_config.primary_key, staging_table=delete_staging)
if write_chunk.empty:
pass
elif first_chunk:
write_chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False)
self.write_dataframe_batch_without_sqlalchemy(write_chunk, pg_table)
self.create_timescale_hypertable(table_config)
if self.can_create_primary_key(table_config, table_config.primary_key):
self.create_pg_primary_key(pg_table, table_config.primary_key)
first_chunk = False
elif table_config.primary_key:
self.upsert_dataframe_batch(write_chunk, pg_table, table_config.primary_key, staging_table=upsert_staging)
else:
self.write_dataframe_batch(write_chunk, pg_table, if_exists='append')
total_events += len(chunk)
total_rows += len(write_chunk) + delete_count
last_row = chunk.iloc[-1]
max_seen_watermark = {
'last_x_datetime': last_row[table_config.datetime_column],
'last_sequence_value': (
int(last_row[table_config.sequence_column])
if table_config.sequence_column and pd.notna(last_row[table_config.sequence_column])
else None
),
}
if chunk_num % self.config.BATCH_SIZE == 0:
self.logger.log_progress(table_name, chunk_num, total_rows)
if total_rows == 0:
max_seen_watermark = upper_bound
self.save_watermark(
pg_table,
max_seen_watermark['last_x_datetime'],
max_seen_watermark['last_sequence_value'],
total_rows,
'success',
)
self.logger.log_info(f"Всего инкрементально загружено строк: {total_rows}")
if total_events != total_rows:
self.logger.log_info(
f"Получено событий из Life_: {total_events}, фактически применено изменений: {total_rows}"
)
if total_rows > 1000000:
self.vacuum_analyze_table(pg_table)
elif total_rows > 0:
self.analyze_table(pg_table)
self.logger.log_table_success(table_name, total_rows)
return True
except Exception as e:
if self.is_retryable_mssql_error(e):
raise
try:
failed_watermark = self.get_last_watermark(pg_table)
self.save_watermark(
pg_table,
failed_watermark['last_x_datetime'],
failed_watermark['last_sequence_value'],
0,
'failed',
str(e),
)
except Exception as state_error:
self.logger.log_error(f"Ошибка при сохранении состояния миграции {pg_table}", state_error)
self.logger.log_table_failure(table_name, str(e))
return False
finally:
try:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(upsert_staging)}'))
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(delete_staging)}'))
conn.commit()
except Exception as cleanup_error:
self.logger.log_warning(f"Не удалось очистить staging-таблицы для {pg_table}: {cleanup_error}")
def run_migration(
self,
table_names: Optional[List[str]] = None,
send_email: bool = False,
dry_run: Optional[bool] = None,
read_limit: Optional[int] = None,
force_full: bool = False,
):
"""Запуск полной миграции"""
if dry_run is not None:
self.config.DRY_RUN = dry_run
if read_limit is not None:
self.config.READ_LIMIT = read_limit
table_configs = self.table_configs
if table_names:
requested = {table_name.lower() for table_name in table_names}
table_configs = [
table_config for table_config in self.table_configs
if table_config.source_table.lower() in requested or table_config.pg_table in requested
]
self.logger.stats['total_tables'] = len(table_configs)
self.logger.log_info("="*60)
self.logger.log_info("НАЧАЛО МИГРАЦИИ ДАННЫХ")
self.logger.log_info(f"Время начала: {self.logger.start_time}")
self.logger.log_info(f"Количество таблиц для обработки: {len(table_configs)}")
self.logger.log_info(f"Force full reload: {force_full}")
self.logger.log_info("="*60)
# Миграция таблиц
for table_config in table_configs:
success = self.migrate_table(table_config, force_full=force_full)
if not success:
self.logger.log_warning(f"Таблица {table_config.source_table} пропущена из-за ошибки")
# Финальная статистика
self.logger.log_info("Обновление статистики для всей базы данных...")
self.update_table_statistics()
# Генерация отчета
report = self.logger.generate_report()
# Вывод итогов
self.logger.log_info("="*60)
self.logger.log_info("ИТОГОВАЯ СТАТИСТИКА")
self.logger.log_info("="*60)
self.logger.log_info(f"Успешно скопировано таблиц: {report['summary']['successful_tables']}")
self.logger.log_info(f"Не удалось скопировать таблиц: {report['summary']['failed_tables']}")
self.logger.log_info(f"Всего строк: {report['summary']['total_rows']}")
self.logger.log_info(f"Процент успеха: {report['summary']['success_rate']:.2f}%")
self.logger.log_info(f"Продолжительность: {report['summary']['duration']}")
self.logger.log_info("="*60)
# Отправка email
if send_email:
self.send_notification(report)
else:
self.logger.log_info("Email отключен для этого запуска")
return report
def backfill_watermarks(
self,
table_names: Optional[List[str]] = None,
overwrite: bool = False,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Заполнение migration_state верхними границами Life_ без перезаливки данных."""
original_dry_run = self.config.DRY_RUN
self.config.DRY_RUN = dry_run
try:
table_configs = self.table_configs
if table_names:
requested = {table_name.lower() for table_name in table_names}
table_configs = [
table_config for table_config in self.table_configs
if table_config.source_table.lower() in requested or table_config.pg_table in requested
]
summary = {
'processed': 0,
'updated': 0,
'skipped': 0,
'failed': 0,
'tables': [],
}
self.logger.log_info("=" * 60)
self.logger.log_info("НАЧАЛО BACKFILL WATERMARKS")
self.logger.log_info(f"Количество таблиц для обработки: {len(table_configs)}")
self.logger.log_info(f"Overwrite existing: {overwrite}")
self.logger.log_info(f"Dry run: {dry_run}")
self.logger.log_info("=" * 60)
for table_config in table_configs:
summary['processed'] += 1
table_name = table_config.source_table
pg_table = table_config.pg_table
if table_config.mode != 'incremental' or not table_config.life_table:
self.logger.log_info(
f"Пропуск {table_name}: таблица не настроена на incremental через Life_"
)
summary['skipped'] += 1
summary['tables'].append({
'table': table_name,
'status': 'skipped',
'reason': 'not_incremental',
})
continue
existing_watermark = self.get_last_watermark(pg_table)
if existing_watermark['last_x_datetime'] is not None and not overwrite:
self.logger.log_info(
f"Пропуск {table_name}: watermark уже существует для {pg_table}"
)
summary['skipped'] += 1
summary['tables'].append({
'table': table_name,
'status': 'skipped',
'reason': 'watermark_exists',
'watermark': existing_watermark,
})
continue
try:
upper_bound = self.get_incremental_upper_bound(table_config)
if upper_bound['last_x_datetime'] is None:
self.logger.log_warning(
f"Пропуск {table_name}: в {table_config.life_table} нет данных"
)
summary['skipped'] += 1
summary['tables'].append({
'table': table_name,
'status': 'skipped',
'reason': 'life_empty',
})
continue
self.save_watermark(
pg_table,
upper_bound['last_x_datetime'],
upper_bound['last_sequence_value'],
0,
'success',
)
self.logger.log_info(
f"Watermark сохранен для {pg_table}: {upper_bound}"
)
summary['updated'] += 1
summary['tables'].append({
'table': table_name,
'status': 'updated',
'watermark': upper_bound,
})
except Exception as e:
self.logger.log_error(
f"Ошибка backfill watermark для {table_name}",
e,
)
summary['failed'] += 1
summary['tables'].append({
'table': table_name,
'status': 'failed',
'error': str(e),
})
self.logger.log_info("=" * 60)
self.logger.log_info("ИТОГ BACKFILL WATERMARKS")
self.logger.log_info(
f"Обработано: {summary['processed']}, обновлено: {summary['updated']}, "
f"пропущено: {summary['skipped']}, ошибок: {summary['failed']}"
)
self.logger.log_info("=" * 60)
return summary
finally:
self.config.DRY_RUN = original_dry_run
def send_notification(self, report: Dict[str, Any]):
"""Отправка уведомления по email"""
# Формируем тело письма
subject = f"{self.config.EMAIL_SUBJECT} - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
body = f"""
Результат миграции данных MSSQL → PostgreSQL
ОБЩАЯ ИНФОРМАЦИЯ:
------------------
Время начала: {report['summary']['start_time']}
Время окончания: {report['summary']['end_time']}
Продолжительность: {report['summary']['duration']}
СТАТИСТИКА:
-----------
Всего таблиц: {report['summary']['total_tables']}
Успешно скопировано: {report['summary']['successful_tables']}
Не удалось скопировать: {report['summary']['failed_tables']}
Процент успеха: {report['summary']['success_rate']:.2f}%
Всего строк: {report['summary']['total_rows']}
УСПЕШНЫЕ ТАБЛИЦЫ ({len(report['successful_tables'])}):
-------------------
{chr(10).join([f"- {t['name']}: {t['rows']} строк" for t in report['successful_tables']])}
ПРОВАЛЕННЫЕ ТАБЛИЦЫ ({len(report['failed_tables'])}):
---------------------
{chr(10).join([f"- {t['name']}: {t['error']}" for t in report['failed_tables']])}
ОШИБКИ ({len(report['errors'])}):
--------
{chr(10).join([f"- {e['message']}" for e in report['errors'][:5]])}
{"..." if len(report['errors']) > 5 else ""}
Лог-файл: {self.logger.log_file}
Отчет в формате JSON также прикреплен к письму.
"""
# Вложения
attachments = [
self.logger.log_file,
os.path.join(self.config.LOG_DIR, f"migration_report_{self.logger.timestamp}.json")
]
# Отправка
self.email_sender.send_email(subject, body, attachments)
def send_failure_notification(
self,
error: str,
table_names: Optional[List[str]] = None,
job_id: Optional[str] = None,
):
"""Отправка уведомления о провале job до формирования стандартного отчета."""
subject = f"{self.config.EMAIL_SUBJECT} - FAILED - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
tables_text = ', '.join(table_names) if table_names else 'all configured tables'
body = f"""
Результат миграции данных MSSQL → PostgreSQL
СТАТУС:
-------
Выполнение job завершилось ошибкой до формирования итогового отчета.
ДЕТАЛИ:
-------
Job ID: {job_id or 'n/a'}
Таблицы: {tables_text}
Ошибка: {error}
Лог-файл: {self.logger.log_file}
"""
attachments = [self.logger.log_file]
self.email_sender.send_email(subject, body, attachments)
def cleanup_old_logs(self, days_to_keep: int = 7):
"""Очистка старых логов"""
try:
cutoff_date = datetime.now().timestamp() - (days_to_keep * 24 * 3600)
for filename in os.listdir(self.config.LOG_DIR):
filepath = os.path.join(self.config.LOG_DIR, filename)
# Проверяем, что это лог-файл
if filename.endswith('.log') or filename.endswith('.json'):
# Получаем время последнего изменения
file_mtime = os.path.getmtime(filepath)
# Удаляем старые файлы
if file_mtime < cutoff_date:
os.remove(filepath)
self.logger.log_info(f"Удален старый файл: {filename}")
except Exception as e:
self.logger.log_error("Ошибка при очистке старых логов", e)