1730 lines
76 KiB
Python
1730 lines
76 KiB
Python
import os
|
||
import re
|
||
import time
|
||
import csv
|
||
import io
|
||
from datetime import datetime
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import pandas as pd
|
||
from sqlalchemy import create_engine, inspect, text
|
||
from sqlalchemy.exc import DBAPIError, OperationalError
|
||
from sqlalchemy.pool import NullPool
|
||
from sqlalchemy.sql import sqltypes
|
||
|
||
from .config import Config, TableMigrationConfig
|
||
from .email_sender import EmailSender
|
||
from .logging_utils import MigrationLogger
|
||
from .table_config_repository import TableConfigRepository
|
||
|
||
|
||
class DatabaseMigrator:
|
||
"""Основной класс для миграции данных"""
|
||
|
||
def __init__(self, config: Config):
|
||
self.config = config
|
||
|
||
# Подключение к PostgreSQL и загрузка конфигурации таблиц из БД
|
||
self.dst_engine = create_engine(config.POSTGRES_CONNECTION_STRING)
|
||
self.table_config_repository = TableConfigRepository(config, self.dst_engine)
|
||
self.table_configs = self.table_config_repository.load_configs(seed_defaults=True)
|
||
self.config.TABLE_MIGRATIONS = self.table_configs
|
||
self.config.TABLES_TO_COPY = [table.source_table for table in self.table_configs]
|
||
|
||
self.logger = MigrationLogger(config)
|
||
self.email_sender = EmailSender(config)
|
||
self._source_columns_cache: Dict[str, List[Dict[str, Any]]] = {}
|
||
self._mssql_indexes_cache: Dict[str, List[Dict[str, Any]]] = {}
|
||
self._mssql_pk_cache: Dict[str, Optional[List[str]]] = {}
|
||
self._mssql_fk_cache: Dict[str, Optional[List[Dict[str, str]]]] = {}
|
||
|
||
# Подключение к БД
|
||
self.logger.log_info(
|
||
f"Подключение к БД МИС (MSSQL), charset={self.config.MSSQL_CHARSET}"
|
||
)
|
||
self.src_engine = self.create_mssql_engine()
|
||
|
||
self.logger.log_info("Подключение к PostgreSQL")
|
||
|
||
def create_mssql_engine(self):
|
||
"""Создание SQLAlchemy engine для MSSQL с явной кодировкой pymssql."""
|
||
return create_engine(
|
||
self.config.MSSQL_CONNECTION_STRING,
|
||
connect_args={
|
||
'charset': self.config.MSSQL_CHARSET,
|
||
'login_timeout': self.config.MSSQL_CONNECT_TIMEOUT,
|
||
},
|
||
poolclass=NullPool,
|
||
)
|
||
|
||
def reconnect_mssql_engine(self):
|
||
"""Пересоздание MSSQL engine после сетевого/драйверного сбоя."""
|
||
try:
|
||
self.src_engine.dispose()
|
||
except Exception:
|
||
pass
|
||
self._source_columns_cache.clear()
|
||
self._mssql_indexes_cache.clear()
|
||
self._mssql_pk_cache.clear()
|
||
self._mssql_fk_cache.clear()
|
||
self.src_engine = self.create_mssql_engine()
|
||
|
||
def is_retryable_mssql_error(self, exception: Exception) -> bool:
|
||
"""Определение временной ошибки MSSQL/pymssql, при которой есть смысл повторить таблицу."""
|
||
message = str(exception).lower()
|
||
retry_markers = (
|
||
'dbprocess is dead',
|
||
'unexpected eof',
|
||
'adaptive server connection failed',
|
||
'server closed the connection unexpectedly',
|
||
'connection reset',
|
||
'connection refused',
|
||
'communication link failure',
|
||
'lost connection',
|
||
'closed connection',
|
||
'not enabled',
|
||
'08s01',
|
||
'20002',
|
||
'20017',
|
||
)
|
||
|
||
if any(marker in message for marker in retry_markers):
|
||
return True
|
||
|
||
return isinstance(exception, (OperationalError, DBAPIError))
|
||
|
||
def migrate_table_once(self, table_config: TableMigrationConfig, force_full: bool = False) -> bool:
|
||
"""Один проход миграции таблицы без retry-обертки."""
|
||
if force_full:
|
||
self.logger.log_info(f"Force full reload для таблицы {table_config.source_table}")
|
||
is_initial_force_full = (
|
||
table_config.mode == 'incremental'
|
||
and table_config.initial_load_mode == 'full_then_incremental'
|
||
and not self.table_exists(table_config.pg_table)
|
||
and self.get_last_watermark(table_config.pg_table).get('last_x_datetime') is None
|
||
)
|
||
if is_initial_force_full:
|
||
self.logger.log_info(
|
||
f"Первый force_full для {table_config.source_table}: "
|
||
"загрузка выполняется без SQLAlchemy"
|
||
)
|
||
success = self.migrate_full_table_without_sqlalchemy(table_config)
|
||
else:
|
||
success = self.migrate_full_table(table_config)
|
||
if success and table_config.mode == 'incremental' and table_config.life_table:
|
||
upper_bound = self.get_incremental_upper_bound(table_config)
|
||
self.save_watermark(
|
||
table_config.pg_table,
|
||
upper_bound['last_x_datetime'],
|
||
upper_bound['last_sequence_value'],
|
||
0,
|
||
'success',
|
||
)
|
||
self.logger.log_info(
|
||
f"После force full сохранен watermark для {table_config.pg_table}: "
|
||
f"{upper_bound}"
|
||
)
|
||
return success
|
||
|
||
if table_config.mode == 'incremental':
|
||
return self.migrate_incremental_table(table_config)
|
||
|
||
return self.migrate_full_table(table_config)
|
||
|
||
@staticmethod
|
||
def quote_identifier(identifier: str) -> str:
|
||
"""Экранирование SQL identifier для PostgreSQL/MSSQL."""
|
||
return '"' + identifier.replace('"', '""') + '"'
|
||
|
||
@staticmethod
|
||
def quote_mssql_identifier(identifier: str) -> str:
|
||
"""Экранирование SQL Server identifier."""
|
||
return '[' + identifier.replace(']', ']]') + ']'
|
||
|
||
def qualify_table_name(self, table_name: str, schema: Optional[str] = None) -> str:
|
||
"""Полное имя таблицы PostgreSQL с указанием схемы."""
|
||
if schema:
|
||
return f'{self.quote_identifier(schema)}.{self.quote_identifier(table_name)}'
|
||
return self.quote_identifier(table_name)
|
||
|
||
def compile_pg_type(self, source_type) -> str:
|
||
"""Преобразование SQLAlchemy/MSSQL типа в тип PostgreSQL."""
|
||
type_name = source_type.__class__.__name__.lower()
|
||
|
||
if 'uniqueidentifier' in type_name or 'uuid' in type_name:
|
||
return 'uuid'
|
||
if 'bit' in type_name or isinstance(source_type, sqltypes.Boolean):
|
||
return 'boolean'
|
||
if isinstance(source_type, sqltypes.BigInteger):
|
||
return 'bigint'
|
||
if isinstance(source_type, sqltypes.SmallInteger):
|
||
return 'smallint'
|
||
if isinstance(source_type, sqltypes.Integer):
|
||
return 'integer'
|
||
if isinstance(source_type, sqltypes.Numeric):
|
||
precision = getattr(source_type, 'precision', None)
|
||
scale = getattr(source_type, 'scale', None)
|
||
if precision is not None and scale is not None:
|
||
return f'numeric({precision}, {scale})'
|
||
if precision is not None:
|
||
return f'numeric({precision})'
|
||
return 'numeric'
|
||
if isinstance(source_type, (sqltypes.Float, sqltypes.REAL)):
|
||
return 'double precision'
|
||
if isinstance(source_type, sqltypes.DateTime):
|
||
return 'timestamp'
|
||
if isinstance(source_type, sqltypes.Date):
|
||
return 'date'
|
||
if isinstance(source_type, sqltypes.Time):
|
||
return 'time'
|
||
if isinstance(source_type, sqltypes.CHAR):
|
||
length = getattr(source_type, 'length', None)
|
||
return f'char({length})' if length else 'char'
|
||
if isinstance(source_type, sqltypes.String):
|
||
length = getattr(source_type, 'length', None)
|
||
return f'varchar({length})' if length else 'text'
|
||
if isinstance(source_type, (sqltypes.Text, sqltypes.UnicodeText)):
|
||
return 'text'
|
||
if isinstance(source_type, sqltypes.LargeBinary):
|
||
return 'bytea'
|
||
|
||
return 'text'
|
||
|
||
def get_source_table_columns(self, table_name: str) -> List[Dict[str, Any]]:
|
||
"""Получение колонок исходной MSSQL таблицы."""
|
||
cache_key = table_name.lower()
|
||
if cache_key not in self._source_columns_cache:
|
||
self._source_columns_cache[cache_key] = inspect(self.src_engine).get_columns(table_name)
|
||
return self._source_columns_cache[cache_key]
|
||
|
||
def get_target_table_columns(self, table_name: str) -> List[Dict[str, Any]]:
|
||
"""Получение колонок целевой PostgreSQL таблицы."""
|
||
return inspect(self.dst_engine).get_columns(table_name)
|
||
|
||
def sync_target_schema(self, source_table: str, target_table: str):
|
||
"""Добавление новых колонок из MSSQL в PostgreSQL как nullable."""
|
||
if not self.table_exists(target_table):
|
||
return
|
||
|
||
source_columns = self.get_source_table_columns(source_table)
|
||
target_columns = {
|
||
column['name'].lower(): column
|
||
for column in self.get_target_table_columns(target_table)
|
||
}
|
||
|
||
for source_column in source_columns:
|
||
column_name = source_column['name']
|
||
lower_name = column_name.lower()
|
||
if lower_name in target_columns:
|
||
target_type = str(target_columns[lower_name]['type']).lower()
|
||
source_type = self.compile_pg_type(source_column['type']).lower()
|
||
if target_type != source_type:
|
||
self.logger.log_warning(
|
||
f"Тип колонки {target_table}.{column_name} отличается: "
|
||
f"source={source_type}, target={target_type}. Автоизменение типа не выполняется"
|
||
)
|
||
continue
|
||
|
||
pg_type = self.compile_pg_type(source_column['type'])
|
||
self.logger.log_info(
|
||
f"Обнаружена новая колонка {source_table}.{column_name}; "
|
||
f"добавление в {target_table} как nullable ({pg_type})"
|
||
)
|
||
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(
|
||
f"DRY RUN: пропущено добавление колонки {target_table}.{column_name}"
|
||
)
|
||
continue
|
||
|
||
sql = text(
|
||
f'ALTER TABLE {self.qualify_table_name(target_table)} '
|
||
f'ADD COLUMN IF NOT EXISTS {self.quote_identifier(column_name)} {pg_type} NULL'
|
||
)
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(sql)
|
||
conn.commit()
|
||
|
||
def get_table_config(self, table_name: str) -> TableMigrationConfig:
|
||
"""Получение настроек таблицы по имени источника или цели."""
|
||
table_config = self.table_config_repository.get_config(table_name)
|
||
return table_config or TableMigrationConfig(source_table=table_name)
|
||
|
||
def table_exists(self, table_name: str) -> bool:
|
||
"""Проверка существования таблицы в целевой БД."""
|
||
return self.table_exists_in_schema(table_name, None)
|
||
|
||
def table_exists_in_schema(self, table_name: str, schema: Optional[str]) -> bool:
|
||
"""Проверка существования таблицы в указанной схеме PostgreSQL."""
|
||
query = text("""
|
||
SELECT EXISTS (
|
||
SELECT 1
|
||
FROM information_schema.tables
|
||
WHERE table_schema = COALESCE(:schema, current_schema())
|
||
AND table_name = :table_name
|
||
)
|
||
""")
|
||
with self.dst_engine.connect() as conn:
|
||
return bool(conn.execute(query, {'schema': schema, 'table_name': table_name}).scalar())
|
||
|
||
def ensure_state_table(self):
|
||
"""Создание таблицы состояния инкрементальной миграции."""
|
||
qualified_state_table = self.qualify_table_name(
|
||
self.config.STATE_TABLE,
|
||
self.config.REPLICATOR_SCHEMA,
|
||
)
|
||
sql = f"""
|
||
CREATE TABLE IF NOT EXISTS {qualified_state_table} (
|
||
table_name text PRIMARY KEY,
|
||
last_x_datetime timestamp NULL,
|
||
last_sequence_value bigint NULL,
|
||
last_run_at timestamp NOT NULL DEFAULT now(),
|
||
rows_copied bigint NOT NULL DEFAULT 0,
|
||
status text NOT NULL DEFAULT 'pending',
|
||
error text NULL
|
||
)
|
||
"""
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(
|
||
f'CREATE SCHEMA IF NOT EXISTS {self.quote_identifier(self.config.REPLICATOR_SCHEMA)}'
|
||
))
|
||
conn.execute(text(sql))
|
||
conn.execute(text(f"""
|
||
ALTER TABLE {qualified_state_table}
|
||
ADD COLUMN IF NOT EXISTS last_sequence_value bigint NULL
|
||
"""))
|
||
conn.execute(text(f"""
|
||
DELETE FROM {qualified_state_table} AS duplicate_rows
|
||
USING {qualified_state_table} AS preserved_rows
|
||
WHERE duplicate_rows.table_name = preserved_rows.table_name
|
||
AND duplicate_rows.ctid < preserved_rows.ctid
|
||
"""))
|
||
conn.execute(text(f"""
|
||
CREATE UNIQUE INDEX IF NOT EXISTS idx_{self.config.STATE_TABLE}_table_name
|
||
ON {qualified_state_table} (table_name)
|
||
"""))
|
||
conn.commit()
|
||
|
||
def get_last_watermark(self, table_name: str) -> Dict[str, Any]:
|
||
"""Чтение последнего успешно обработанного x_DateTime."""
|
||
empty_watermark = {'last_x_datetime': None, 'last_sequence_value': None}
|
||
if self.config.DRY_RUN and not self.table_exists_in_schema(
|
||
self.config.STATE_TABLE,
|
||
self.config.REPLICATOR_SCHEMA,
|
||
):
|
||
return empty_watermark
|
||
|
||
self.ensure_state_table()
|
||
qualified_state_table = self.qualify_table_name(
|
||
self.config.STATE_TABLE,
|
||
self.config.REPLICATOR_SCHEMA,
|
||
)
|
||
sql = text(f"""
|
||
SELECT last_x_datetime, last_sequence_value
|
||
FROM {qualified_state_table}
|
||
WHERE table_name = :table_name
|
||
""")
|
||
with self.dst_engine.connect() as conn:
|
||
row = conn.execute(sql, {'table_name': table_name}).mappings().first()
|
||
|
||
if not row:
|
||
return empty_watermark
|
||
|
||
return {
|
||
'last_x_datetime': row['last_x_datetime'],
|
||
'last_sequence_value': row['last_sequence_value'],
|
||
}
|
||
|
||
def save_watermark(
|
||
self,
|
||
table_name: str,
|
||
last_x_datetime: Optional[datetime],
|
||
last_sequence_value: Optional[int],
|
||
rows_copied: int,
|
||
status: str,
|
||
error: Optional[str] = None,
|
||
):
|
||
"""Сохранение состояния инкрементальной миграции."""
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: состояние миграции {table_name} не обновляется")
|
||
return
|
||
|
||
self.ensure_state_table()
|
||
qualified_state_table = self.qualify_table_name(
|
||
self.config.STATE_TABLE,
|
||
self.config.REPLICATOR_SCHEMA,
|
||
)
|
||
sql = text(f"""
|
||
INSERT INTO {qualified_state_table}
|
||
(table_name, last_x_datetime, last_sequence_value, last_run_at, rows_copied, status, error)
|
||
VALUES
|
||
(:table_name, :last_x_datetime, :last_sequence_value, now(), :rows_copied, :status, :error)
|
||
ON CONFLICT (table_name) DO UPDATE SET
|
||
last_x_datetime = EXCLUDED.last_x_datetime,
|
||
last_sequence_value = EXCLUDED.last_sequence_value,
|
||
last_run_at = EXCLUDED.last_run_at,
|
||
rows_copied = EXCLUDED.rows_copied,
|
||
status = EXCLUDED.status,
|
||
error = EXCLUDED.error
|
||
""")
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(sql, {
|
||
'table_name': table_name,
|
||
'last_x_datetime': last_x_datetime,
|
||
'last_sequence_value': last_sequence_value,
|
||
'rows_copied': rows_copied,
|
||
'status': status,
|
||
'error': error,
|
||
})
|
||
conn.commit()
|
||
|
||
def get_incremental_upper_bound(self, table_config: TableMigrationConfig) -> Dict[str, Any]:
|
||
"""Фиксация верхней границы Life_ на начало запуска."""
|
||
select_columns = [
|
||
f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_datetime",
|
||
]
|
||
if table_config.sequence_column:
|
||
select_columns.append(
|
||
f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_sequence_value"
|
||
)
|
||
else:
|
||
select_columns.append("CAST(NULL AS bigint) AS max_sequence_value")
|
||
|
||
order_columns = ', '.join([
|
||
f"{self.quote_mssql_identifier(column)} DESC"
|
||
for column in table_config.incremental_order_columns
|
||
])
|
||
sql = text(f"""
|
||
SELECT TOP 1 {', '.join(select_columns)}
|
||
FROM {self.quote_mssql_identifier(table_config.read_table)}
|
||
ORDER BY {order_columns}
|
||
""")
|
||
with self.src_engine.connect() as conn:
|
||
row = conn.execute(sql).mappings().first()
|
||
|
||
if not row:
|
||
return {'last_x_datetime': None, 'last_sequence_value': None}
|
||
|
||
return {
|
||
'last_x_datetime': row['max_datetime'],
|
||
'last_sequence_value': row['max_sequence_value'],
|
||
}
|
||
|
||
def read_incremental_chunks(
|
||
self,
|
||
table_config: TableMigrationConfig,
|
||
last_watermark: Dict[str, Any],
|
||
upper_bound: Dict[str, Any],
|
||
read_limit: Optional[int] = None,
|
||
):
|
||
"""Чтение Life_ таблицы по x_DateTime чанками."""
|
||
where_parts = [
|
||
f"{self.quote_mssql_identifier(table_config.datetime_column)} <= :upper_bound_datetime",
|
||
]
|
||
params = {'upper_bound_datetime': upper_bound['last_x_datetime']}
|
||
|
||
if table_config.sequence_column and upper_bound.get('last_sequence_value') is not None:
|
||
where_parts.append(f"""
|
||
(
|
||
{self.quote_mssql_identifier(table_config.datetime_column)} < :upper_bound_datetime
|
||
OR (
|
||
{self.quote_mssql_identifier(table_config.datetime_column)} = :upper_bound_datetime
|
||
AND {self.quote_mssql_identifier(table_config.sequence_column)} <= :upper_bound_sequence
|
||
)
|
||
)
|
||
""")
|
||
params['upper_bound_sequence'] = upper_bound['last_sequence_value']
|
||
|
||
if last_watermark.get('last_x_datetime') is not None:
|
||
if table_config.sequence_column and last_watermark.get('last_sequence_value') is not None:
|
||
where_parts.append(f"""
|
||
(
|
||
{self.quote_mssql_identifier(table_config.datetime_column)} > :last_watermark_datetime
|
||
OR (
|
||
{self.quote_mssql_identifier(table_config.datetime_column)} = :last_watermark_datetime
|
||
AND {self.quote_mssql_identifier(table_config.sequence_column)} > :last_watermark_sequence
|
||
)
|
||
)
|
||
""")
|
||
params['last_watermark_sequence'] = last_watermark['last_sequence_value']
|
||
else:
|
||
where_parts.append(
|
||
f"{self.quote_mssql_identifier(table_config.datetime_column)} > :last_watermark_datetime"
|
||
)
|
||
params['last_watermark_datetime'] = last_watermark['last_x_datetime']
|
||
|
||
order_columns = ', '.join([
|
||
self.quote_mssql_identifier(column)
|
||
for column in table_config.incremental_order_columns
|
||
])
|
||
top_clause = f"TOP {int(read_limit)} " if read_limit else ""
|
||
|
||
sql = text(f"""
|
||
SELECT {top_clause}*
|
||
FROM {self.quote_mssql_identifier(table_config.read_table)}
|
||
WHERE {' AND '.join(where_parts)}
|
||
ORDER BY {order_columns}
|
||
""")
|
||
|
||
return pd.read_sql_query(
|
||
sql,
|
||
self.src_engine,
|
||
params=params,
|
||
chunksize=self.config.INCREMENTAL_CHUNK_SIZE,
|
||
)
|
||
|
||
def read_full_chunks(
|
||
self,
|
||
table_name: str,
|
||
read_limit: Optional[int] = None,
|
||
):
|
||
"""Чтение полной таблицы чанками с опциональным лимитом для проверки."""
|
||
if read_limit:
|
||
sql = text(f"SELECT TOP {int(read_limit)} * FROM {self.quote_mssql_identifier(table_name)}")
|
||
return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE)
|
||
|
||
return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE)
|
||
|
||
def write_dataframe_batch(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
table_name: str,
|
||
if_exists: str = 'append',
|
||
):
|
||
"""Batch-запись DataFrame в PostgreSQL."""
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}")
|
||
return
|
||
|
||
chunk.to_sql(
|
||
table_name,
|
||
self.dst_engine,
|
||
if_exists=if_exists,
|
||
index=False,
|
||
chunksize=self.config.WRITE_CHUNK_SIZE,
|
||
method='multi',
|
||
)
|
||
|
||
def read_full_chunks_without_sqlalchemy(
|
||
self,
|
||
table_name: str,
|
||
read_limit: Optional[int] = None,
|
||
):
|
||
"""Чтение полной таблицы чанками через DBAPI-курсор (без pandas.read_sql_*)."""
|
||
src_connection = self.src_engine.raw_connection()
|
||
cursor = src_connection.cursor()
|
||
|
||
top_clause = f"TOP {int(read_limit)} " if read_limit else ""
|
||
sql = f"SELECT {top_clause}* FROM {self.quote_mssql_identifier(table_name)}"
|
||
|
||
try:
|
||
cursor.execute(sql)
|
||
columns = [column[0] for column in cursor.description]
|
||
while True:
|
||
rows = cursor.fetchmany(self.config.FULL_LOAD_CHUNK_SIZE)
|
||
if not rows:
|
||
break
|
||
yield pd.DataFrame.from_records(rows, columns=columns)
|
||
finally:
|
||
try:
|
||
cursor.close()
|
||
finally:
|
||
src_connection.close()
|
||
|
||
def write_dataframe_batch_without_sqlalchemy(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
table_name: str,
|
||
):
|
||
"""Batch-запись DataFrame в PostgreSQL через COPY (без pandas.to_sql)."""
|
||
if chunk.empty:
|
||
return
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}")
|
||
return
|
||
|
||
buffer = io.StringIO()
|
||
chunk.to_csv(
|
||
buffer,
|
||
index=False,
|
||
header=False,
|
||
sep='\t',
|
||
na_rep='\\N',
|
||
date_format='%Y-%m-%d %H:%M:%S.%f',
|
||
quoting=csv.QUOTE_MINIMAL,
|
||
escapechar='\\',
|
||
)
|
||
buffer.seek(0)
|
||
|
||
quoted_columns = ', '.join(
|
||
self.quote_identifier(column_name)
|
||
for column_name in chunk.columns
|
||
)
|
||
copy_sql = (
|
||
f"COPY {self.qualify_table_name(table_name)} ({quoted_columns}) "
|
||
"FROM STDIN WITH (FORMAT csv, DELIMITER E'\\t', NULL '\\N', ESCAPE '\\', QUOTE '\"')"
|
||
)
|
||
|
||
dst_connection = self.dst_engine.raw_connection()
|
||
cursor = dst_connection.cursor()
|
||
try:
|
||
cursor.copy_expert(copy_sql, buffer)
|
||
dst_connection.commit()
|
||
except Exception:
|
||
dst_connection.rollback()
|
||
raise
|
||
finally:
|
||
try:
|
||
cursor.close()
|
||
finally:
|
||
dst_connection.close()
|
||
|
||
def prepare_incremental_chunk(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
table_config: TableMigrationConfig,
|
||
) -> pd.DataFrame:
|
||
"""Удаление служебных Life_ полей перед записью в целевую таблицу."""
|
||
exclude_columns = [
|
||
column for column in table_config.exclude_columns
|
||
if column in chunk.columns
|
||
]
|
||
if not exclude_columns:
|
||
return chunk
|
||
|
||
return chunk.drop(columns=exclude_columns)
|
||
|
||
def split_incremental_chunk(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
table_config: TableMigrationConfig,
|
||
) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||
"""Разделение Life_ чанка на строки для upsert и delete."""
|
||
if not table_config.operation_column:
|
||
return chunk, chunk.iloc[0:0].copy()
|
||
|
||
if table_config.operation_column not in chunk.columns:
|
||
raise ValueError(
|
||
f"В таблице {table_config.read_table} не найдено поле {table_config.operation_column}"
|
||
)
|
||
|
||
operations = chunk[table_config.operation_column].astype(str).str.lower().str.strip()
|
||
upsert_operations = {operation.lower() for operation in table_config.upsert_operations}
|
||
delete_operations = {operation.lower() for operation in table_config.delete_operations}
|
||
|
||
upsert_chunk = chunk[operations.isin(upsert_operations)]
|
||
delete_chunk = chunk[operations.isin(delete_operations)]
|
||
return upsert_chunk, delete_chunk
|
||
|
||
def deduplicate_incremental_chunk(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
primary_key: List[str],
|
||
) -> pd.DataFrame:
|
||
"""Схлопывание повторных событий по ключу внутри одного чанка с сохранением последней версии."""
|
||
if chunk.empty or not primary_key:
|
||
return chunk
|
||
|
||
missing_columns = [column for column in primary_key if column not in chunk.columns]
|
||
if missing_columns:
|
||
raise ValueError(
|
||
f"Для дедупликации не найдены ключевые поля: {missing_columns}"
|
||
)
|
||
|
||
before_count = len(chunk)
|
||
deduplicated_chunk = chunk.drop_duplicates(subset=primary_key, keep='last')
|
||
dropped_rows = before_count - len(deduplicated_chunk)
|
||
if dropped_rows > 0:
|
||
self.logger.log_info(
|
||
f"Схлопнуто {dropped_rows} повторных событий внутри чанка по ключу {primary_key}"
|
||
)
|
||
return deduplicated_chunk
|
||
|
||
def get_effective_delete_count(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
primary_key: List[str],
|
||
) -> int:
|
||
"""Количество фактических delete-операций после схлопывания дублей по ключу."""
|
||
if chunk.empty:
|
||
return 0
|
||
if not primary_key:
|
||
return len(chunk)
|
||
missing_columns = [column for column in primary_key if column not in chunk.columns]
|
||
if missing_columns:
|
||
raise ValueError(
|
||
f"Для расчета delete count не найдены ключевые поля: {missing_columns}"
|
||
)
|
||
return len(chunk[primary_key].drop_duplicates())
|
||
|
||
def delete_dataframe_batch(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
table_name: str,
|
||
primary_key: List[str],
|
||
staging_table: Optional[str] = None,
|
||
):
|
||
"""Batch delete в PostgreSQL через staging-таблицу с ключами."""
|
||
if chunk.empty:
|
||
return
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущено удаление {len(chunk)} строк из {table_name}")
|
||
return
|
||
if not self.table_exists(table_name):
|
||
self.logger.log_warning(f"Удаление пропущено: таблица {table_name} еще не существует")
|
||
return
|
||
if not primary_key:
|
||
raise ValueError(f"Для удаления из {table_name} не задан primary_key")
|
||
|
||
missing_columns = [column for column in primary_key if column not in chunk.columns]
|
||
if missing_columns:
|
||
raise ValueError(f"Для удаления из {table_name} не найдены ключевые поля: {missing_columns}")
|
||
|
||
own_staging = staging_table is None
|
||
staging = staging_table or f"_stg_delete_{table_name}_{int(time.time() * 1000)}"
|
||
key_chunk = chunk[primary_key].drop_duplicates()
|
||
join_condition = ' AND '.join([
|
||
f"target.{self.quote_identifier(column)} = source.{self.quote_identifier(column)}"
|
||
for column in primary_key
|
||
])
|
||
|
||
try:
|
||
if self.table_exists(staging):
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}'))
|
||
conn.commit()
|
||
else:
|
||
key_chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False)
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED'))
|
||
conn.commit()
|
||
self.write_dataframe_batch_without_sqlalchemy(key_chunk, staging)
|
||
sql = f"""
|
||
DELETE FROM {self.quote_identifier(table_name)} AS target
|
||
USING {self.quote_identifier(staging)} AS source
|
||
WHERE {join_condition}
|
||
"""
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(sql))
|
||
if own_staging:
|
||
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
|
||
conn.commit()
|
||
except Exception:
|
||
if own_staging:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
|
||
conn.commit()
|
||
raise
|
||
|
||
def upsert_dataframe_batch(
|
||
self,
|
||
chunk: pd.DataFrame,
|
||
table_name: str,
|
||
primary_key: List[str],
|
||
staging_table: Optional[str] = None,
|
||
):
|
||
"""Batch upsert через staging-таблицу."""
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущен upsert {len(chunk)} строк в {table_name}")
|
||
return
|
||
|
||
if not primary_key:
|
||
self.write_dataframe_batch(chunk, table_name, if_exists='append')
|
||
return
|
||
|
||
own_staging = staging_table is None
|
||
staging = staging_table or f"_stg_{table_name}_{int(time.time() * 1000)}"
|
||
columns = list(chunk.columns)
|
||
quoted_columns = ', '.join([self.quote_identifier(column) for column in columns])
|
||
conflict_columns = ', '.join([self.quote_identifier(column) for column in primary_key])
|
||
update_columns = [column for column in columns if column not in primary_key]
|
||
|
||
if update_columns:
|
||
update_set = ', '.join([
|
||
f"{self.quote_identifier(column)} = EXCLUDED.{self.quote_identifier(column)}"
|
||
for column in update_columns
|
||
])
|
||
conflict_action = f"DO UPDATE SET {update_set}"
|
||
else:
|
||
conflict_action = "DO NOTHING"
|
||
|
||
try:
|
||
if self.table_exists(staging):
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}'))
|
||
conn.commit()
|
||
else:
|
||
chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False)
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED'))
|
||
conn.commit()
|
||
self.write_dataframe_batch_without_sqlalchemy(chunk, staging)
|
||
sql = f"""
|
||
INSERT INTO {self.quote_identifier(table_name)} ({quoted_columns})
|
||
SELECT {quoted_columns}
|
||
FROM {self.quote_identifier(staging)}
|
||
ON CONFLICT ({conflict_columns}) {conflict_action}
|
||
"""
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(sql))
|
||
if own_staging:
|
||
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
|
||
conn.commit()
|
||
except Exception:
|
||
if own_staging:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
|
||
conn.commit()
|
||
raise
|
||
|
||
def create_timescale_hypertable(self, table_config: TableMigrationConfig):
|
||
"""Преобразование PostgreSQL таблицы в TimescaleDB hypertable."""
|
||
if not self.config.ENABLE_TIMESCALE or not table_config.timescale:
|
||
return
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущено создание TimescaleDB hypertable для {table_config.pg_table}")
|
||
return
|
||
|
||
time_column = table_config.timescale_time_column or table_config.datetime_column
|
||
table_name = table_config.pg_table
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb"))
|
||
conn.execute(text("""
|
||
SELECT create_hypertable(
|
||
:table_name,
|
||
:time_column,
|
||
if_not_exists => TRUE,
|
||
migrate_data => TRUE
|
||
)
|
||
"""), {
|
||
'table_name': table_name,
|
||
'time_column': time_column,
|
||
})
|
||
conn.commit()
|
||
self.logger.log_info(f"Таблица {table_name} преобразована в TimescaleDB hypertable")
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при создании TimescaleDB hypertable для {table_name}", e)
|
||
|
||
def can_create_primary_key(
|
||
self,
|
||
table_config: TableMigrationConfig,
|
||
pk_columns: List[str],
|
||
) -> bool:
|
||
"""Проверка совместимости primary key с TimescaleDB."""
|
||
if not pk_columns:
|
||
return False
|
||
if not self.config.ENABLE_TIMESCALE or not table_config.timescale:
|
||
return True
|
||
|
||
time_column = table_config.timescale_time_column or table_config.datetime_column
|
||
if time_column in pk_columns:
|
||
return True
|
||
|
||
self.logger.log_warning(
|
||
f"Primary key для {table_config.pg_table} пропущен: TimescaleDB hypertable "
|
||
f"требует включить time column {time_column} в уникальные ограничения"
|
||
)
|
||
return False
|
||
|
||
def get_mssql_indexes(self, table_name: str) -> List[Dict[str, Any]]:
|
||
"""Получение индексов из MSSQL таблицы"""
|
||
cache_key = table_name.lower()
|
||
if cache_key in self._mssql_indexes_cache:
|
||
return self._mssql_indexes_cache[cache_key]
|
||
|
||
indexes = []
|
||
|
||
query = f"""
|
||
SELECT
|
||
i.name as index_name,
|
||
i.is_unique,
|
||
i.type_desc as index_type,
|
||
c.name as column_name,
|
||
ic.key_ordinal as column_position,
|
||
ic.is_descending_key
|
||
FROM sys.indexes i
|
||
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
|
||
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
|
||
INNER JOIN sys.tables t ON i.object_id = t.object_id
|
||
WHERE t.name = '{table_name}'
|
||
AND i.type_desc NOT IN ('HEAP', 'CLUSTERED')
|
||
AND i.is_primary_key = 0
|
||
ORDER BY i.name, ic.key_ordinal
|
||
"""
|
||
|
||
try:
|
||
index_df = pd.read_sql_query(query, self.src_engine)
|
||
|
||
if not index_df.empty:
|
||
for index_name in index_df['index_name'].unique():
|
||
index_data = index_df[index_df['index_name'] == index_name]
|
||
unique = bool(index_data.iloc[0]['is_unique'])
|
||
|
||
columns = []
|
||
for _, row in index_data.sort_values('column_position').iterrows():
|
||
column_name = row['column_name']
|
||
column_name = column_name.replace('[', '').replace(']', '')
|
||
columns.append(column_name)
|
||
|
||
indexes.append({
|
||
'name': index_name,
|
||
'unique': unique,
|
||
'columns': columns
|
||
})
|
||
|
||
self.logger.log_info(f"Найдено {len(indexes)} индексов для таблицы {table_name}")
|
||
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при получении индексов для таблицы {table_name}", e)
|
||
|
||
self._mssql_indexes_cache[cache_key] = indexes
|
||
return indexes
|
||
|
||
def create_pg_indexes(self, table_name: str, indexes: List[Dict[str, Any]]):
|
||
"""Создание индексов в PostgreSQL"""
|
||
if not indexes:
|
||
return
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущено создание {len(indexes)} индексов для {table_name}")
|
||
return
|
||
|
||
for index_info in indexes:
|
||
index_name = f"idx_{table_name}_{index_info['name'].lower()}"
|
||
index_name = re.sub(r'[^a-z0-9_]', '_', index_name)
|
||
|
||
columns = ', '.join([f'"{col}"' for col in index_info['columns']])
|
||
unique_str = 'UNIQUE ' if index_info['unique'] else ''
|
||
|
||
sql = f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ({columns})'
|
||
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(sql))
|
||
conn.commit()
|
||
self.logger.log_info(f"Создан индекс: {index_name} на столбцах: {columns}")
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при создании индекса {index_name}", e)
|
||
|
||
def get_mssql_primary_key(self, table_name: str) -> Optional[List[str]]:
|
||
"""Получение информации о первичном ключе из MSSQL"""
|
||
cache_key = table_name.lower()
|
||
if cache_key in self._mssql_pk_cache:
|
||
return self._mssql_pk_cache[cache_key]
|
||
|
||
query = f"""
|
||
SELECT
|
||
c.name as column_name
|
||
FROM sys.indexes i
|
||
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
|
||
INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
|
||
INNER JOIN sys.tables t ON i.object_id = t.object_id
|
||
WHERE t.name = '{table_name}'
|
||
AND i.is_primary_key = 1
|
||
ORDER BY ic.key_ordinal
|
||
"""
|
||
|
||
try:
|
||
pk_df = pd.read_sql_query(query, self.src_engine)
|
||
if not pk_df.empty:
|
||
pk_columns = [row['column_name'] for _, row in pk_df.iterrows()]
|
||
self.logger.log_info(f"Найден первичный ключ для таблицы {table_name}: {pk_columns}")
|
||
self._mssql_pk_cache[cache_key] = pk_columns
|
||
return pk_columns
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при получении первичного ключа для таблицы {table_name}", e)
|
||
self._mssql_pk_cache[cache_key] = None
|
||
return None
|
||
|
||
def create_pg_primary_key(self, table_name: str, pk_columns: List[str]):
|
||
"""Создание первичного ключа в PostgreSQL"""
|
||
if not pk_columns:
|
||
return
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущено создание первичного ключа для {table_name}")
|
||
return
|
||
|
||
columns = ', '.join([f'"{col}"' for col in pk_columns])
|
||
pk_name = f"pk_{table_name}"
|
||
|
||
sql = f'ALTER TABLE "{table_name}" ADD CONSTRAINT "{pk_name}" PRIMARY KEY ({columns})'
|
||
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(sql))
|
||
conn.commit()
|
||
self.logger.log_info(f"Создан первичный ключ: {pk_name} на столбцах: {columns}")
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при создании первичного ключа {pk_name}", e)
|
||
|
||
def has_pg_primary_key(self, table_name: str) -> bool:
|
||
"""Проверка наличия primary key в PostgreSQL."""
|
||
sql = text("""
|
||
SELECT EXISTS (
|
||
SELECT 1
|
||
FROM pg_constraint c
|
||
JOIN pg_class t ON t.oid = c.conrelid
|
||
JOIN pg_namespace n ON n.oid = t.relnamespace
|
||
WHERE c.contype = 'p'
|
||
AND n.nspname = current_schema()
|
||
AND t.relname = :table_name
|
||
)
|
||
""")
|
||
with self.dst_engine.connect() as conn:
|
||
return bool(conn.execute(sql, {'table_name': table_name}).scalar())
|
||
|
||
def ensure_pg_primary_key(self, table_name: str, pk_columns: List[str]):
|
||
"""Создание primary key, если его еще нет."""
|
||
if not pk_columns:
|
||
return
|
||
if self.has_pg_primary_key(table_name):
|
||
return
|
||
self.create_pg_primary_key(table_name, pk_columns)
|
||
|
||
def get_mssql_foreign_keys(self, table_name: str) -> Optional[List[Dict[str, str]]]:
|
||
"""Получение информации о внешних ключах из MSSQL"""
|
||
cache_key = table_name.lower()
|
||
if cache_key in self._mssql_fk_cache:
|
||
return self._mssql_fk_cache[cache_key]
|
||
|
||
query = f"""
|
||
SELECT
|
||
fk.name as fk_name,
|
||
pc.name as parent_column,
|
||
rc.name as referenced_column,
|
||
rt.name as referenced_table
|
||
FROM sys.foreign_keys fk
|
||
INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
|
||
INNER JOIN sys.columns pc ON fkc.parent_object_id = pc.object_id AND fkc.parent_column_id = pc.column_id
|
||
INNER JOIN sys.columns rc ON fkc.referenced_object_id = rc.object_id AND fkc.referenced_column_id = rc.column_id
|
||
INNER JOIN sys.tables rt ON fkc.referenced_object_id = rt.object_id
|
||
INNER JOIN sys.tables pt ON fkc.parent_object_id = pt.object_id
|
||
WHERE pt.name = '{table_name}'
|
||
ORDER BY fk.name, fkc.constraint_column_id
|
||
"""
|
||
|
||
try:
|
||
fk_df = pd.read_sql_query(query, self.src_engine)
|
||
if not fk_df.empty:
|
||
fks = []
|
||
for fk_name in fk_df['fk_name'].unique():
|
||
fk_data = fk_df[fk_df['fk_name'] == fk_name]
|
||
fks.append({
|
||
'name': fk_name,
|
||
'parent_column': fk_data.iloc[0]['parent_column'],
|
||
'referenced_table': fk_data.iloc[0]['referenced_table'].lower(),
|
||
'referenced_column': fk_data.iloc[0]['referenced_column']
|
||
})
|
||
self.logger.log_info(f"Найдено {len(fks)} внешних ключей для таблицы {table_name}")
|
||
self._mssql_fk_cache[cache_key] = fks
|
||
return fks
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при получении внешних ключей для таблицы {table_name}", e)
|
||
self._mssql_fk_cache[cache_key] = None
|
||
return None
|
||
|
||
def create_pg_foreign_keys(self, table_name: str, foreign_keys: List[Dict[str, str]]):
|
||
"""Создание внешних ключей в PostgreSQL"""
|
||
if not foreign_keys:
|
||
return
|
||
if not self.config.CREATE_FOREIGN_KEYS:
|
||
self.logger.log_info(f"Создание внешних ключей отключено конфигом для {table_name}")
|
||
return
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущено создание {len(foreign_keys)} внешних ключей для {table_name}")
|
||
return
|
||
|
||
for fk_info in foreign_keys:
|
||
if not self.table_exists(fk_info['referenced_table']):
|
||
self.logger.log_warning(
|
||
f"Внешний ключ для {table_name}.{fk_info['parent_column']} пропущен: "
|
||
f"таблица {fk_info['referenced_table']} отсутствует в PostgreSQL"
|
||
)
|
||
continue
|
||
fk_name = f"fk_{table_name}_{fk_info['parent_column']}"
|
||
|
||
sql = f"""
|
||
ALTER TABLE "{table_name}"
|
||
ADD CONSTRAINT "{fk_name}"
|
||
FOREIGN KEY ("{fk_info['parent_column']}")
|
||
REFERENCES "{fk_info['referenced_table']}" ("{fk_info['referenced_column']}")
|
||
"""
|
||
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(sql))
|
||
conn.commit()
|
||
self.logger.log_info(f"Создан внешний ключ: {fk_name}")
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при создании внешнего ключа {fk_name}", e)
|
||
|
||
def analyze_table(self, table_name: str):
|
||
"""Выполнение ANALYZE для таблицы"""
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущен ANALYZE для {table_name}")
|
||
return
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
sql = f'ANALYZE "{table_name}"'
|
||
conn.execute(text(sql))
|
||
conn.commit()
|
||
self.logger.log_info(f"Выполнен ANALYZE для таблицы {table_name}")
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при выполнении ANALYZE для таблицы {table_name}", e)
|
||
|
||
def vacuum_analyze_table(self, table_name: str):
|
||
"""Выполнение VACUUM ANALYZE для таблицы"""
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущен VACUUM ANALYZE для {table_name}")
|
||
return
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
sql = f'VACUUM ANALYZE "{table_name}"'
|
||
conn.execute(text(sql))
|
||
conn.commit()
|
||
self.logger.log_info(f"Выполнен VACUUM ANALYZE для таблицы {table_name}")
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при выполнении VACUUM ANALYZE для таблицы {table_name}", e)
|
||
|
||
def update_table_statistics(self):
|
||
"""Обновление статистики для всей базы данных"""
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info("DRY RUN: пропущен ANALYZE для всей базы данных")
|
||
return
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text('ANALYZE'))
|
||
conn.commit()
|
||
self.logger.log_info("Выполнен ANALYZE для всей базы данных")
|
||
except Exception as e:
|
||
self.logger.log_error("Ошибка при выполнении ANALYZE для всей базы данных", e)
|
||
|
||
def truncate_table(self, table_name: str):
|
||
"""Очистка таблицы"""
|
||
if self.config.DRY_RUN:
|
||
self.logger.log_info(f"DRY RUN: пропущено удаление таблицы {table_name}")
|
||
return
|
||
|
||
table_exists = self.table_exists(table_name)
|
||
|
||
if table_exists:
|
||
try:
|
||
query = text(f'DROP TABLE IF EXISTS {self.quote_identifier(table_name)} CASCADE')
|
||
with self.dst_engine.connect() as connection:
|
||
connection.execute(query)
|
||
connection.commit()
|
||
self.logger.log_info(f"Таблица {table_name} удалена")
|
||
except Exception as e:
|
||
self.logger.log_error(f"Ошибка при удалении таблицы {table_name}", e)
|
||
else:
|
||
self.logger.log_info(f"Таблица {table_name} не существует в целевой БД")
|
||
|
||
def migrate_table(self, table: Any, force_full: bool = False) -> bool:
|
||
"""Миграция одной таблицы"""
|
||
table_config = table if isinstance(table, TableMigrationConfig) else self.get_table_config(table)
|
||
max_attempts = max(1, self.config.MSSQL_TABLE_RETRIES + 1)
|
||
|
||
for attempt in range(1, max_attempts + 1):
|
||
try:
|
||
return self.migrate_table_once(table_config, force_full=force_full)
|
||
except Exception as e:
|
||
if not self.is_retryable_mssql_error(e) or attempt >= max_attempts:
|
||
raise
|
||
|
||
self.logger.log_warning(
|
||
f"Временная ошибка MSSQL при обработке {table_config.source_table}, "
|
||
f"попытка {attempt} из {max_attempts}: {e}"
|
||
)
|
||
self.reconnect_mssql_engine()
|
||
time.sleep(min(attempt * 5, 15))
|
||
|
||
return False
|
||
|
||
def migrate_full_table(
|
||
self,
|
||
table_config: TableMigrationConfig,
|
||
read_limit: Optional[int] = None,
|
||
) -> bool:
|
||
"""Полная миграция одной таблицы с пересозданием целевой таблицы."""
|
||
table_name = table_config.source_table
|
||
pg_table = table_config.pg_table
|
||
self.logger.log_table_start(table_name)
|
||
|
||
try:
|
||
# Получаем метаданные
|
||
indexes = self.get_mssql_indexes(table_name)
|
||
pk_columns = self.get_mssql_primary_key(table_name)
|
||
foreign_keys = self.get_mssql_foreign_keys(table_name)
|
||
|
||
# Читаем данные
|
||
self.logger.log_info(f"Чтение данных из {table_name}")
|
||
chunks = self.read_full_chunks(table_name, read_limit=read_limit)
|
||
|
||
# Загружаем данные
|
||
first_chunk = True
|
||
total_rows = 0
|
||
|
||
for chunk_num, chunk in enumerate(chunks, 1):
|
||
if first_chunk:
|
||
# Дропаем целевую таблицу только после успешного чтения первого чанка,
|
||
# чтобы не уничтожить данные при недоступном MSSQL
|
||
self.logger.log_info(f"Очистка целевой таблицы {pg_table}")
|
||
self.truncate_table(pg_table)
|
||
chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False)
|
||
self.logger.log_info(f"Таблица {pg_table} создана")
|
||
first_chunk = False
|
||
self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table)
|
||
|
||
total_rows += len(chunk)
|
||
if chunk_num % self.config.BATCH_SIZE == 0:
|
||
self.logger.log_progress(table_name, chunk_num, total_rows)
|
||
|
||
self.logger.log_info(f"Всего загружено строк: {total_rows}")
|
||
|
||
if total_rows > 0:
|
||
self.sync_target_schema(table_name, pg_table)
|
||
self.create_timescale_hypertable(table_config)
|
||
|
||
if self.can_create_primary_key(table_config, pk_columns):
|
||
self.logger.log_info(f"Создание первичного ключа для {pg_table}")
|
||
self.create_pg_primary_key(pg_table, pk_columns)
|
||
|
||
if indexes:
|
||
self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}")
|
||
self.create_pg_indexes(pg_table, indexes)
|
||
|
||
if foreign_keys:
|
||
self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}")
|
||
self.create_pg_foreign_keys(pg_table, foreign_keys)
|
||
|
||
self.logger.log_info(f"Обновление статистики для {pg_table}")
|
||
if total_rows > 1000000:
|
||
self.vacuum_analyze_table(pg_table)
|
||
else:
|
||
self.analyze_table(pg_table)
|
||
|
||
self.logger.log_table_success(table_name, total_rows)
|
||
return True
|
||
|
||
except Exception as e:
|
||
if self.is_retryable_mssql_error(e):
|
||
raise
|
||
self.logger.log_table_failure(table_name, str(e))
|
||
return False
|
||
|
||
def migrate_full_table_without_sqlalchemy(
|
||
self,
|
||
table_config: TableMigrationConfig,
|
||
read_limit: Optional[int] = None,
|
||
) -> bool:
|
||
"""Полная миграция одной таблицы без SQLAlchemy в этапе чтения/заливки данных."""
|
||
table_name = table_config.source_table
|
||
pg_table = table_config.pg_table
|
||
self.logger.log_table_start(table_name)
|
||
|
||
try:
|
||
indexes = self.get_mssql_indexes(table_name)
|
||
pk_columns = self.get_mssql_primary_key(table_name)
|
||
foreign_keys = self.get_mssql_foreign_keys(table_name)
|
||
|
||
self.logger.log_info(f"Чтение данных из {table_name} без SQLAlchemy")
|
||
chunks = self.read_full_chunks_without_sqlalchemy(table_name, read_limit=read_limit)
|
||
|
||
first_chunk = True
|
||
total_rows = 0
|
||
|
||
for chunk_num, chunk in enumerate(chunks, 1):
|
||
if first_chunk:
|
||
# Дропаем целевую таблицу только после успешного чтения первого чанка,
|
||
# чтобы не уничтожить данные при недоступном MSSQL
|
||
self.logger.log_info(f"Очистка целевой таблицы {pg_table}")
|
||
self.truncate_table(pg_table)
|
||
self.write_dataframe_batch(chunk.iloc[0:0], pg_table, if_exists='fail')
|
||
self.logger.log_info(f"Таблица {pg_table} создана")
|
||
first_chunk = False
|
||
|
||
self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table)
|
||
|
||
total_rows += len(chunk)
|
||
if chunk_num % self.config.BATCH_SIZE == 0:
|
||
self.logger.log_progress(table_name, chunk_num, total_rows)
|
||
|
||
self.logger.log_info(f"Всего загружено строк: {total_rows}")
|
||
|
||
if total_rows > 0:
|
||
self.sync_target_schema(table_name, pg_table)
|
||
self.create_timescale_hypertable(table_config)
|
||
|
||
if self.can_create_primary_key(table_config, pk_columns):
|
||
self.logger.log_info(f"Создание первичного ключа для {pg_table}")
|
||
self.create_pg_primary_key(pg_table, pk_columns)
|
||
|
||
if indexes:
|
||
self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}")
|
||
self.create_pg_indexes(pg_table, indexes)
|
||
|
||
if foreign_keys:
|
||
self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}")
|
||
self.create_pg_foreign_keys(pg_table, foreign_keys)
|
||
|
||
self.logger.log_info(f"Обновление статистики для {pg_table}")
|
||
if total_rows > 1000000:
|
||
self.vacuum_analyze_table(pg_table)
|
||
else:
|
||
self.analyze_table(pg_table)
|
||
|
||
self.logger.log_table_success(table_name, total_rows)
|
||
return True
|
||
|
||
except Exception as e:
|
||
if self.is_retryable_mssql_error(e):
|
||
raise
|
||
self.logger.log_table_failure(table_name, str(e))
|
||
return False
|
||
|
||
def migrate_incremental_table(self, table_config: TableMigrationConfig) -> bool:
|
||
"""Инкрементальная миграция Life_ таблицы по x_DateTime."""
|
||
table_name = table_config.source_table
|
||
pg_table = table_config.pg_table
|
||
self.logger.log_table_start(f"{table_name} ({table_config.read_table})")
|
||
upsert_staging = f"_stg_upsert_{pg_table}"
|
||
delete_staging = f"_stg_delete_{pg_table}"
|
||
|
||
try:
|
||
if not table_config.life_table:
|
||
raise ValueError(f"Для инкрементальной миграции {table_name} не задана life_table")
|
||
|
||
last_watermark = self.get_last_watermark(pg_table)
|
||
upper_bound = self.get_incremental_upper_bound(table_config)
|
||
target_exists = self.table_exists(pg_table)
|
||
|
||
if upper_bound['last_x_datetime'] is None:
|
||
self.logger.log_info(f"В {table_config.read_table} нет данных для инкрементальной миграции")
|
||
self.save_watermark(pg_table, last_watermark['last_x_datetime'], last_watermark['last_sequence_value'], 0, 'success')
|
||
self.logger.log_table_success(table_name, 0)
|
||
return True
|
||
|
||
if (
|
||
table_config.initial_load_mode == 'full_then_incremental'
|
||
and not target_exists
|
||
and last_watermark['last_x_datetime'] is None
|
||
):
|
||
self.logger.log_info(
|
||
f"Первичная загрузка {table_name}: full snapshot из {table_config.source_table}, "
|
||
f"затем watermark по {table_config.read_table}"
|
||
)
|
||
success = self.migrate_full_table(table_config, read_limit=self.config.READ_LIMIT)
|
||
if success:
|
||
self.save_watermark(
|
||
pg_table,
|
||
upper_bound['last_x_datetime'],
|
||
upper_bound['last_sequence_value'],
|
||
0,
|
||
'success',
|
||
)
|
||
return success
|
||
|
||
self.logger.log_info(
|
||
f"Инкрементальная миграция {table_config.read_table}: "
|
||
f"{table_config.datetime_column} > {last_watermark}, <= {upper_bound}"
|
||
)
|
||
|
||
first_chunk = not target_exists
|
||
total_rows = 0
|
||
total_events = 0
|
||
max_seen_watermark = dict(last_watermark)
|
||
chunks = self.read_incremental_chunks(
|
||
table_config,
|
||
last_watermark,
|
||
upper_bound,
|
||
read_limit=self.config.READ_LIMIT,
|
||
)
|
||
|
||
if target_exists:
|
||
self.sync_target_schema(table_name, pg_table)
|
||
|
||
if table_config.primary_key and target_exists and self.can_create_primary_key(table_config, table_config.primary_key):
|
||
self.ensure_pg_primary_key(pg_table, table_config.primary_key)
|
||
|
||
for chunk_num, chunk in enumerate(chunks, 1):
|
||
if chunk.empty:
|
||
continue
|
||
|
||
if table_config.datetime_column not in chunk.columns:
|
||
raise ValueError(
|
||
f"В таблице {table_config.read_table} не найдено поле {table_config.datetime_column}"
|
||
)
|
||
|
||
upsert_chunk, delete_chunk = self.split_incremental_chunk(chunk, table_config)
|
||
write_chunk = self.prepare_incremental_chunk(upsert_chunk, table_config)
|
||
write_chunk = self.deduplicate_incremental_chunk(write_chunk, table_config.primary_key)
|
||
delete_count = self.get_effective_delete_count(delete_chunk, table_config.primary_key)
|
||
missing_pk_columns = [
|
||
column for column in table_config.primary_key
|
||
if column not in write_chunk.columns
|
||
]
|
||
if not write_chunk.empty and missing_pk_columns:
|
||
raise ValueError(
|
||
f"После исключения служебных полей отсутствуют ключевые поля: {missing_pk_columns}"
|
||
)
|
||
|
||
if not delete_chunk.empty:
|
||
self.delete_dataframe_batch(delete_chunk, pg_table, table_config.primary_key, staging_table=delete_staging)
|
||
|
||
if write_chunk.empty:
|
||
pass
|
||
elif first_chunk:
|
||
write_chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False)
|
||
self.write_dataframe_batch_without_sqlalchemy(write_chunk, pg_table)
|
||
self.create_timescale_hypertable(table_config)
|
||
if self.can_create_primary_key(table_config, table_config.primary_key):
|
||
self.create_pg_primary_key(pg_table, table_config.primary_key)
|
||
first_chunk = False
|
||
elif table_config.primary_key:
|
||
self.upsert_dataframe_batch(write_chunk, pg_table, table_config.primary_key, staging_table=upsert_staging)
|
||
else:
|
||
self.write_dataframe_batch(write_chunk, pg_table, if_exists='append')
|
||
|
||
total_events += len(chunk)
|
||
total_rows += len(write_chunk) + delete_count
|
||
last_row = chunk.iloc[-1]
|
||
max_seen_watermark = {
|
||
'last_x_datetime': last_row[table_config.datetime_column],
|
||
'last_sequence_value': (
|
||
int(last_row[table_config.sequence_column])
|
||
if table_config.sequence_column and pd.notna(last_row[table_config.sequence_column])
|
||
else None
|
||
),
|
||
}
|
||
|
||
if chunk_num % self.config.BATCH_SIZE == 0:
|
||
self.logger.log_progress(table_name, chunk_num, total_rows)
|
||
|
||
if total_rows == 0:
|
||
max_seen_watermark = upper_bound
|
||
|
||
self.save_watermark(
|
||
pg_table,
|
||
max_seen_watermark['last_x_datetime'],
|
||
max_seen_watermark['last_sequence_value'],
|
||
total_rows,
|
||
'success',
|
||
)
|
||
|
||
self.logger.log_info(f"Всего инкрементально загружено строк: {total_rows}")
|
||
if total_events != total_rows:
|
||
self.logger.log_info(
|
||
f"Получено событий из Life_: {total_events}, фактически применено изменений: {total_rows}"
|
||
)
|
||
if total_rows > 1000000:
|
||
self.vacuum_analyze_table(pg_table)
|
||
elif total_rows > 0:
|
||
self.analyze_table(pg_table)
|
||
|
||
self.logger.log_table_success(table_name, total_rows)
|
||
return True
|
||
|
||
except Exception as e:
|
||
if self.is_retryable_mssql_error(e):
|
||
raise
|
||
try:
|
||
failed_watermark = self.get_last_watermark(pg_table)
|
||
self.save_watermark(
|
||
pg_table,
|
||
failed_watermark['last_x_datetime'],
|
||
failed_watermark['last_sequence_value'],
|
||
0,
|
||
'failed',
|
||
str(e),
|
||
)
|
||
except Exception as state_error:
|
||
self.logger.log_error(f"Ошибка при сохранении состояния миграции {pg_table}", state_error)
|
||
self.logger.log_table_failure(table_name, str(e))
|
||
return False
|
||
|
||
finally:
|
||
try:
|
||
with self.dst_engine.connect() as conn:
|
||
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(upsert_staging)}'))
|
||
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(delete_staging)}'))
|
||
conn.commit()
|
||
except Exception as cleanup_error:
|
||
self.logger.log_warning(f"Не удалось очистить staging-таблицы для {pg_table}: {cleanup_error}")
|
||
|
||
def run_migration(
|
||
self,
|
||
table_names: Optional[List[str]] = None,
|
||
send_email: bool = False,
|
||
dry_run: Optional[bool] = None,
|
||
read_limit: Optional[int] = None,
|
||
force_full: bool = False,
|
||
):
|
||
"""Запуск полной миграции"""
|
||
if dry_run is not None:
|
||
self.config.DRY_RUN = dry_run
|
||
if read_limit is not None:
|
||
self.config.READ_LIMIT = read_limit
|
||
|
||
table_configs = self.table_configs
|
||
if table_names:
|
||
requested = {table_name.lower() for table_name in table_names}
|
||
table_configs = [
|
||
table_config for table_config in self.table_configs
|
||
if table_config.source_table.lower() in requested or table_config.pg_table in requested
|
||
]
|
||
self.logger.stats['total_tables'] = len(table_configs)
|
||
|
||
self.logger.log_info("="*60)
|
||
self.logger.log_info("НАЧАЛО МИГРАЦИИ ДАННЫХ")
|
||
self.logger.log_info(f"Время начала: {self.logger.start_time}")
|
||
self.logger.log_info(f"Количество таблиц для обработки: {len(table_configs)}")
|
||
self.logger.log_info(f"Force full reload: {force_full}")
|
||
self.logger.log_info("="*60)
|
||
|
||
# Миграция таблиц
|
||
for table_config in table_configs:
|
||
success = self.migrate_table(table_config, force_full=force_full)
|
||
if not success:
|
||
self.logger.log_warning(f"Таблица {table_config.source_table} пропущена из-за ошибки")
|
||
|
||
# Финальная статистика
|
||
self.logger.log_info("Обновление статистики для всей базы данных...")
|
||
self.update_table_statistics()
|
||
|
||
# Генерация отчета
|
||
report = self.logger.generate_report()
|
||
|
||
# Вывод итогов
|
||
self.logger.log_info("="*60)
|
||
self.logger.log_info("ИТОГОВАЯ СТАТИСТИКА")
|
||
self.logger.log_info("="*60)
|
||
self.logger.log_info(f"Успешно скопировано таблиц: {report['summary']['successful_tables']}")
|
||
self.logger.log_info(f"Не удалось скопировать таблиц: {report['summary']['failed_tables']}")
|
||
self.logger.log_info(f"Всего строк: {report['summary']['total_rows']}")
|
||
self.logger.log_info(f"Процент успеха: {report['summary']['success_rate']:.2f}%")
|
||
self.logger.log_info(f"Продолжительность: {report['summary']['duration']}")
|
||
self.logger.log_info("="*60)
|
||
|
||
# Отправка email
|
||
if send_email:
|
||
self.send_notification(report)
|
||
else:
|
||
self.logger.log_info("Email отключен для этого запуска")
|
||
|
||
return report
|
||
|
||
def backfill_watermarks(
|
||
self,
|
||
table_names: Optional[List[str]] = None,
|
||
overwrite: bool = False,
|
||
dry_run: bool = False,
|
||
) -> Dict[str, Any]:
|
||
"""Заполнение migration_state верхними границами Life_ без перезаливки данных."""
|
||
original_dry_run = self.config.DRY_RUN
|
||
self.config.DRY_RUN = dry_run
|
||
|
||
try:
|
||
table_configs = self.table_configs
|
||
if table_names:
|
||
requested = {table_name.lower() for table_name in table_names}
|
||
table_configs = [
|
||
table_config for table_config in self.table_configs
|
||
if table_config.source_table.lower() in requested or table_config.pg_table in requested
|
||
]
|
||
|
||
summary = {
|
||
'processed': 0,
|
||
'updated': 0,
|
||
'skipped': 0,
|
||
'failed': 0,
|
||
'tables': [],
|
||
}
|
||
|
||
self.logger.log_info("=" * 60)
|
||
self.logger.log_info("НАЧАЛО BACKFILL WATERMARKS")
|
||
self.logger.log_info(f"Количество таблиц для обработки: {len(table_configs)}")
|
||
self.logger.log_info(f"Overwrite existing: {overwrite}")
|
||
self.logger.log_info(f"Dry run: {dry_run}")
|
||
self.logger.log_info("=" * 60)
|
||
|
||
for table_config in table_configs:
|
||
summary['processed'] += 1
|
||
table_name = table_config.source_table
|
||
pg_table = table_config.pg_table
|
||
|
||
if table_config.mode != 'incremental' or not table_config.life_table:
|
||
self.logger.log_info(
|
||
f"Пропуск {table_name}: таблица не настроена на incremental через Life_"
|
||
)
|
||
summary['skipped'] += 1
|
||
summary['tables'].append({
|
||
'table': table_name,
|
||
'status': 'skipped',
|
||
'reason': 'not_incremental',
|
||
})
|
||
continue
|
||
|
||
existing_watermark = self.get_last_watermark(pg_table)
|
||
if existing_watermark['last_x_datetime'] is not None and not overwrite:
|
||
self.logger.log_info(
|
||
f"Пропуск {table_name}: watermark уже существует для {pg_table}"
|
||
)
|
||
summary['skipped'] += 1
|
||
summary['tables'].append({
|
||
'table': table_name,
|
||
'status': 'skipped',
|
||
'reason': 'watermark_exists',
|
||
'watermark': existing_watermark,
|
||
})
|
||
continue
|
||
|
||
try:
|
||
upper_bound = self.get_incremental_upper_bound(table_config)
|
||
if upper_bound['last_x_datetime'] is None:
|
||
self.logger.log_warning(
|
||
f"Пропуск {table_name}: в {table_config.life_table} нет данных"
|
||
)
|
||
summary['skipped'] += 1
|
||
summary['tables'].append({
|
||
'table': table_name,
|
||
'status': 'skipped',
|
||
'reason': 'life_empty',
|
||
})
|
||
continue
|
||
|
||
self.save_watermark(
|
||
pg_table,
|
||
upper_bound['last_x_datetime'],
|
||
upper_bound['last_sequence_value'],
|
||
0,
|
||
'success',
|
||
)
|
||
self.logger.log_info(
|
||
f"Watermark сохранен для {pg_table}: {upper_bound}"
|
||
)
|
||
summary['updated'] += 1
|
||
summary['tables'].append({
|
||
'table': table_name,
|
||
'status': 'updated',
|
||
'watermark': upper_bound,
|
||
})
|
||
except Exception as e:
|
||
self.logger.log_error(
|
||
f"Ошибка backfill watermark для {table_name}",
|
||
e,
|
||
)
|
||
summary['failed'] += 1
|
||
summary['tables'].append({
|
||
'table': table_name,
|
||
'status': 'failed',
|
||
'error': str(e),
|
||
})
|
||
|
||
self.logger.log_info("=" * 60)
|
||
self.logger.log_info("ИТОГ BACKFILL WATERMARKS")
|
||
self.logger.log_info(
|
||
f"Обработано: {summary['processed']}, обновлено: {summary['updated']}, "
|
||
f"пропущено: {summary['skipped']}, ошибок: {summary['failed']}"
|
||
)
|
||
self.logger.log_info("=" * 60)
|
||
return summary
|
||
finally:
|
||
self.config.DRY_RUN = original_dry_run
|
||
|
||
def send_notification(self, report: Dict[str, Any]):
|
||
"""Отправка уведомления по email"""
|
||
# Формируем тело письма
|
||
subject = f"{self.config.EMAIL_SUBJECT} - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
||
|
||
body = f"""
|
||
Результат миграции данных MSSQL → PostgreSQL
|
||
|
||
ОБЩАЯ ИНФОРМАЦИЯ:
|
||
------------------
|
||
Время начала: {report['summary']['start_time']}
|
||
Время окончания: {report['summary']['end_time']}
|
||
Продолжительность: {report['summary']['duration']}
|
||
|
||
СТАТИСТИКА:
|
||
-----------
|
||
Всего таблиц: {report['summary']['total_tables']}
|
||
Успешно скопировано: {report['summary']['successful_tables']}
|
||
Не удалось скопировать: {report['summary']['failed_tables']}
|
||
Процент успеха: {report['summary']['success_rate']:.2f}%
|
||
Всего строк: {report['summary']['total_rows']}
|
||
|
||
УСПЕШНЫЕ ТАБЛИЦЫ ({len(report['successful_tables'])}):
|
||
-------------------
|
||
{chr(10).join([f"- {t['name']}: {t['rows']} строк" for t in report['successful_tables']])}
|
||
|
||
ПРОВАЛЕННЫЕ ТАБЛИЦЫ ({len(report['failed_tables'])}):
|
||
---------------------
|
||
{chr(10).join([f"- {t['name']}: {t['error']}" for t in report['failed_tables']])}
|
||
|
||
ОШИБКИ ({len(report['errors'])}):
|
||
--------
|
||
{chr(10).join([f"- {e['message']}" for e in report['errors'][:5]])}
|
||
{"..." if len(report['errors']) > 5 else ""}
|
||
|
||
Лог-файл: {self.logger.log_file}
|
||
Отчет в формате JSON также прикреплен к письму.
|
||
"""
|
||
|
||
# Вложения
|
||
attachments = [
|
||
self.logger.log_file,
|
||
os.path.join(self.config.LOG_DIR, f"migration_report_{self.logger.timestamp}.json")
|
||
]
|
||
|
||
# Отправка
|
||
self.email_sender.send_email(subject, body, attachments)
|
||
|
||
def send_failure_notification(
|
||
self,
|
||
error: str,
|
||
table_names: Optional[List[str]] = None,
|
||
job_id: Optional[str] = None,
|
||
):
|
||
"""Отправка уведомления о провале job до формирования стандартного отчета."""
|
||
subject = f"{self.config.EMAIL_SUBJECT} - FAILED - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
||
tables_text = ', '.join(table_names) if table_names else 'all configured tables'
|
||
body = f"""
|
||
Результат миграции данных MSSQL → PostgreSQL
|
||
|
||
СТАТУС:
|
||
-------
|
||
Выполнение job завершилось ошибкой до формирования итогового отчета.
|
||
|
||
ДЕТАЛИ:
|
||
-------
|
||
Job ID: {job_id or 'n/a'}
|
||
Таблицы: {tables_text}
|
||
Ошибка: {error}
|
||
|
||
Лог-файл: {self.logger.log_file}
|
||
"""
|
||
|
||
attachments = [self.logger.log_file]
|
||
self.email_sender.send_email(subject, body, attachments)
|
||
|
||
def cleanup_old_logs(self, days_to_keep: int = 7):
|
||
"""Очистка старых логов"""
|
||
try:
|
||
cutoff_date = datetime.now().timestamp() - (days_to_keep * 24 * 3600)
|
||
|
||
for filename in os.listdir(self.config.LOG_DIR):
|
||
filepath = os.path.join(self.config.LOG_DIR, filename)
|
||
|
||
# Проверяем, что это лог-файл
|
||
if filename.endswith('.log') or filename.endswith('.json'):
|
||
# Получаем время последнего изменения
|
||
file_mtime = os.path.getmtime(filepath)
|
||
|
||
# Удаляем старые файлы
|
||
if file_mtime < cutoff_date:
|
||
os.remove(filepath)
|
||
self.logger.log_info(f"Удален старый файл: {filename}")
|
||
|
||
except Exception as e:
|
||
self.logger.log_error("Ошибка при очистке старых логов", e)
|