Небольшие изменения

This commit is contained in:
brusnitsyn
2026-05-29 06:47:29 +09:00
parent 7a13ff3b74
commit ff810e59bc
4 changed files with 284 additions and 88 deletions

View File

@@ -21,6 +21,8 @@ ENABLE_TIMESCALE=false
DRY_RUN=false
READ_LIMIT=0
CHUNK_SIZE=5000
INCREMENTAL_CHUNK_SIZE=5000
FULL_LOAD_CHUNK_SIZE=5000
WRITE_CHUNK_SIZE=5000
CREATE_FOREIGN_KEYS=true
QUEUE_POLL_SECONDS=1

View File

@@ -100,6 +100,8 @@ class Config:
# Настройки миграции
CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', '5000'))
WRITE_CHUNK_SIZE = int(os.getenv('WRITE_CHUNK_SIZE', str(CHUNK_SIZE)))
INCREMENTAL_CHUNK_SIZE = int(os.getenv('INCREMENTAL_CHUNK_SIZE', str(CHUNK_SIZE * 2)))
FULL_LOAD_CHUNK_SIZE = int(os.getenv('FULL_LOAD_CHUNK_SIZE', str(CHUNK_SIZE * 4)))
BATCH_SIZE = 10 # Через сколько чанков выводить прогресс
REPLICATOR_SCHEMA = os.getenv('REPLICATOR_SCHEMA', 'replicator')
STATE_TABLE = 'migration_state'

View File

@@ -37,15 +37,16 @@ class MigrationLogger:
def setup_logging(self):
"""Настройка системы логирования"""
logging.basicConfig(
level=self.config.LOG_LEVEL,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.log_file, encoding='utf-8'),
logging.StreamHandler() # Вывод в консоль
]
)
self.logger = logging.getLogger(__name__)
fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(f"migration.{self.timestamp}")
self.logger.setLevel(self.config.LOG_LEVEL)
self.logger.propagate = False
file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
file_handler.setFormatter(fmt)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(fmt)
self.logger.addHandler(file_handler)
self.logger.addHandler(stream_handler)
def log_info(self, message: str):
"""Логирование информационного сообщения"""

View File

@@ -1,12 +1,15 @@
import os
import re
import time
import csv
import io
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.exc import DBAPIError, OperationalError
from sqlalchemy.pool import NullPool
from sqlalchemy.sql import sqltypes
from .config import Config, TableMigrationConfig
@@ -50,12 +53,8 @@ class DatabaseMigrator:
connect_args={
'charset': self.config.MSSQL_CHARSET,
'login_timeout': self.config.MSSQL_CONNECT_TIMEOUT,
'timeout': self.config.MSSQL_CONNECT_TIMEOUT,
},
pool_pre_ping=True,
pool_recycle=self.config.MSSQL_POOL_RECYCLE,
pool_size=self.config.MSSQL_POOL_SIZE,
max_overflow=self.config.MSSQL_MAX_OVERFLOW,
poolclass=NullPool,
)
def reconnect_mssql_engine(self):
@@ -72,28 +71,46 @@ class DatabaseMigrator:
def is_retryable_mssql_error(self, exception: Exception) -> bool:
"""Определение временной ошибки MSSQL/pymssql, при которой есть смысл повторить таблицу."""
if isinstance(exception, (OperationalError, DBAPIError)):
message = str(exception).lower()
retry_markers = (
'dbprocess is dead',
'adaptive server connection failed',
'server closed the connection unexpectedly',
'connection reset',
'connection refused',
'communication link failure',
'lost connection',
'closed connection',
'not enabled',
'08s01',
)
return any(marker in message for marker in retry_markers)
return False
message = str(exception).lower()
retry_markers = (
'dbprocess is dead',
'unexpected eof',
'adaptive server connection failed',
'server closed the connection unexpectedly',
'connection reset',
'connection refused',
'communication link failure',
'lost connection',
'closed connection',
'not enabled',
'08s01',
'20002',
'20017',
)
if any(marker in message for marker in retry_markers):
return True
return isinstance(exception, (OperationalError, DBAPIError))
def migrate_table_once(self, table_config: TableMigrationConfig, force_full: bool = False) -> bool:
"""Один проход миграции таблицы без retry-обертки."""
if force_full:
self.logger.log_info(f"Force full reload для таблицы {table_config.source_table}")
success = self.migrate_full_table(table_config)
is_initial_force_full = (
table_config.mode == 'incremental'
and table_config.initial_load_mode == 'full_then_incremental'
and not self.table_exists(table_config.pg_table)
and self.get_last_watermark(table_config.pg_table).get('last_x_datetime') is None
)
if is_initial_force_full:
self.logger.log_info(
f"Первый force_full для {table_config.source_table}: "
"загрузка выполняется без SQLAlchemy"
)
success = self.migrate_full_table_without_sqlalchemy(table_config)
else:
success = self.migrate_full_table(table_config)
if success and table_config.mode == 'incremental' and table_config.life_table:
upper_bound = self.get_incremental_upper_bound(table_config)
self.save_watermark(
@@ -453,7 +470,7 @@ class DatabaseMigrator:
sql,
self.src_engine,
params=params,
chunksize=self.config.CHUNK_SIZE,
chunksize=self.config.INCREMENTAL_CHUNK_SIZE,
)
def read_full_chunks(
@@ -464,9 +481,9 @@ class DatabaseMigrator:
"""Чтение полной таблицы чанками с опциональным лимитом для проверки."""
if read_limit:
sql = text(f"SELECT TOP {int(read_limit)} * FROM {self.quote_mssql_identifier(table_name)}")
return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.CHUNK_SIZE)
return pd.read_sql_query(sql, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE)
return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.CHUNK_SIZE)
return pd.read_sql_table(table_name, self.src_engine, chunksize=self.config.FULL_LOAD_CHUNK_SIZE)
def write_dataframe_batch(
self,
@@ -488,6 +505,80 @@ class DatabaseMigrator:
method='multi',
)
def read_full_chunks_without_sqlalchemy(
self,
table_name: str,
read_limit: Optional[int] = None,
):
"""Чтение полной таблицы чанками через DBAPI-курсор (без pandas.read_sql_*)."""
src_connection = self.src_engine.raw_connection()
cursor = src_connection.cursor()
top_clause = f"TOP {int(read_limit)} " if read_limit else ""
sql = f"SELECT {top_clause}* FROM {self.quote_mssql_identifier(table_name)}"
try:
cursor.execute(sql)
columns = [column[0] for column in cursor.description]
while True:
rows = cursor.fetchmany(self.config.FULL_LOAD_CHUNK_SIZE)
if not rows:
break
yield pd.DataFrame.from_records(rows, columns=columns)
finally:
try:
cursor.close()
finally:
src_connection.close()
def write_dataframe_batch_without_sqlalchemy(
self,
chunk: pd.DataFrame,
table_name: str,
):
"""Batch-запись DataFrame в PostgreSQL через COPY (без pandas.to_sql)."""
if chunk.empty:
return
if self.config.DRY_RUN:
self.logger.log_info(f"DRY RUN: пропущена запись {len(chunk)} строк в {table_name}")
return
buffer = io.StringIO()
chunk.to_csv(
buffer,
index=False,
header=False,
sep='\t',
na_rep='\\N',
date_format='%Y-%m-%d %H:%M:%S.%f',
quoting=csv.QUOTE_MINIMAL,
escapechar='\\',
)
buffer.seek(0)
quoted_columns = ', '.join(
self.quote_identifier(column_name)
for column_name in chunk.columns
)
copy_sql = (
f"COPY {self.qualify_table_name(table_name)} ({quoted_columns}) "
"FROM STDIN WITH (FORMAT csv, DELIMITER E'\\t', NULL '\\N', ESCAPE '\\', QUOTE '\"')"
)
dst_connection = self.dst_engine.raw_connection()
cursor = dst_connection.cursor()
try:
cursor.copy_expert(copy_sql, buffer)
dst_connection.commit()
except Exception:
dst_connection.rollback()
raise
finally:
try:
cursor.close()
finally:
dst_connection.close()
def prepare_incremental_chunk(
self,
chunk: pd.DataFrame,
@@ -571,6 +662,7 @@ class DatabaseMigrator:
chunk: pd.DataFrame,
table_name: str,
primary_key: List[str],
staging_table: Optional[str] = None,
):
"""Batch delete в PostgreSQL через staging-таблицу с ключами."""
if chunk.empty:
@@ -588,7 +680,8 @@ class DatabaseMigrator:
if missing_columns:
raise ValueError(f"Для удаления из {table_name} не найдены ключевые поля: {missing_columns}")
staging_table = f"_stg_delete_{table_name}_{int(time.time() * 1000)}"
own_staging = staging_table is None
staging = staging_table or f"_stg_delete_{table_name}_{int(time.time() * 1000)}"
key_chunk = chunk[primary_key].drop_duplicates()
join_condition = ' AND '.join([
f"target.{self.quote_identifier(column)} = source.{self.quote_identifier(column)}"
@@ -596,20 +689,31 @@ class DatabaseMigrator:
])
try:
self.write_dataframe_batch(key_chunk, staging_table, if_exists='replace')
if self.table_exists(staging):
with self.dst_engine.connect() as conn:
conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}'))
conn.commit()
else:
key_chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False)
with self.dst_engine.connect() as conn:
conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED'))
conn.commit()
self.write_dataframe_batch_without_sqlalchemy(key_chunk, staging)
sql = f"""
DELETE FROM {self.quote_identifier(table_name)} AS target
USING {self.quote_identifier(staging_table)} AS source
USING {self.quote_identifier(staging)} AS source
WHERE {join_condition}
"""
with self.dst_engine.connect() as conn:
conn.execute(text(sql))
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}'))
if own_staging:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
except Exception:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}'))
conn.commit()
if own_staging:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
raise
def upsert_dataframe_batch(
@@ -617,6 +721,7 @@ class DatabaseMigrator:
chunk: pd.DataFrame,
table_name: str,
primary_key: List[str],
staging_table: Optional[str] = None,
):
"""Batch upsert через staging-таблицу."""
if self.config.DRY_RUN:
@@ -627,8 +732,8 @@ class DatabaseMigrator:
self.write_dataframe_batch(chunk, table_name, if_exists='append')
return
chunk = self.deduplicate_incremental_chunk(chunk, primary_key)
staging_table = f"_stg_{table_name}_{int(time.time() * 1000)}"
own_staging = staging_table is None
staging = staging_table or f"_stg_{table_name}_{int(time.time() * 1000)}"
columns = list(chunk.columns)
quoted_columns = ', '.join([self.quote_identifier(column) for column in columns])
conflict_columns = ', '.join([self.quote_identifier(column) for column in primary_key])
@@ -644,21 +749,32 @@ class DatabaseMigrator:
conflict_action = "DO NOTHING"
try:
self.write_dataframe_batch(chunk, staging_table, if_exists='replace')
if self.table_exists(staging):
with self.dst_engine.connect() as conn:
conn.execute(text(f'TRUNCATE TABLE {self.quote_identifier(staging)}'))
conn.commit()
else:
chunk.iloc[0:0].to_sql(staging, self.dst_engine, if_exists='replace', index=False)
with self.dst_engine.connect() as conn:
conn.execute(text(f'ALTER TABLE {self.quote_identifier(staging)} SET UNLOGGED'))
conn.commit()
self.write_dataframe_batch_without_sqlalchemy(chunk, staging)
sql = f"""
INSERT INTO {self.quote_identifier(table_name)} ({quoted_columns})
SELECT {quoted_columns}
FROM {self.quote_identifier(staging_table)}
FROM {self.quote_identifier(staging)}
ON CONFLICT ({conflict_columns}) {conflict_action}
"""
with self.dst_engine.connect() as conn:
conn.execute(text(sql))
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}'))
if own_staging:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
except Exception:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging_table)}'))
conn.commit()
if own_staging:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
conn.commit()
raise
def create_timescale_hypertable(self, table_config: TableMigrationConfig):
@@ -1040,26 +1156,25 @@ class DatabaseMigrator:
pk_columns = self.get_mssql_primary_key(table_name)
foreign_keys = self.get_mssql_foreign_keys(table_name)
# Очищаем целевую таблицу
self.logger.log_info(f"Очистка целевой таблицы {pg_table}")
self.truncate_table(pg_table)
# Читаем данные
self.logger.log_info(f"Чтение данных из {table_name}")
chunks = self.read_full_chunks(table_name, read_limit=read_limit)
# Загружаем данные
first_chunk = True
total_rows = 0
for chunk_num, chunk in enumerate(chunks, 1):
if first_chunk:
self.write_dataframe_batch(chunk, pg_table, if_exists='fail')
first_chunk = False
# Дропаем целевую таблицу только после успешного чтения первого чанка,
# чтобы не уничтожить данные при недоступном MSSQL
self.logger.log_info(f"Очистка целевой таблицы {pg_table}")
self.truncate_table(pg_table)
chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False)
self.logger.log_info(f"Таблица {pg_table} создана")
else:
self.write_dataframe_batch(chunk, pg_table, if_exists='append')
first_chunk = False
self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table)
total_rows += len(chunk)
if chunk_num % self.config.BATCH_SIZE == 0:
self.logger.log_progress(table_name, chunk_num, total_rows)
@@ -1068,31 +1183,25 @@ class DatabaseMigrator:
if total_rows > 0:
self.sync_target_schema(table_name, pg_table)
if total_rows > 0:
self.create_timescale_hypertable(table_config)
# Создаем первичный ключ
if self.can_create_primary_key(table_config, pk_columns):
self.logger.log_info(f"Создание первичного ключа для {pg_table}")
self.create_pg_primary_key(pg_table, pk_columns)
# Создаем индексы
if indexes:
self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}")
self.create_pg_indexes(pg_table, indexes)
# Создаем внешние ключи
if foreign_keys:
self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}")
self.create_pg_foreign_keys(pg_table, foreign_keys)
# Обновляем статистику
self.logger.log_info(f"Обновление статистики для {pg_table}")
if total_rows > 1000000:
self.vacuum_analyze_table(pg_table)
else:
self.analyze_table(pg_table)
if self.can_create_primary_key(table_config, pk_columns):
self.logger.log_info(f"Создание первичного ключа для {pg_table}")
self.create_pg_primary_key(pg_table, pk_columns)
if indexes:
self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}")
self.create_pg_indexes(pg_table, indexes)
if foreign_keys:
self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}")
self.create_pg_foreign_keys(pg_table, foreign_keys)
self.logger.log_info(f"Обновление статистики для {pg_table}")
if total_rows > 1000000:
self.vacuum_analyze_table(pg_table)
else:
self.analyze_table(pg_table)
self.logger.log_table_success(table_name, total_rows)
return True
@@ -1103,11 +1212,83 @@ class DatabaseMigrator:
self.logger.log_table_failure(table_name, str(e))
return False
def migrate_full_table_without_sqlalchemy(
self,
table_config: TableMigrationConfig,
read_limit: Optional[int] = None,
) -> bool:
"""Полная миграция одной таблицы без SQLAlchemy в этапе чтения/заливки данных."""
table_name = table_config.source_table
pg_table = table_config.pg_table
self.logger.log_table_start(table_name)
try:
indexes = self.get_mssql_indexes(table_name)
pk_columns = self.get_mssql_primary_key(table_name)
foreign_keys = self.get_mssql_foreign_keys(table_name)
self.logger.log_info(f"Чтение данных из {table_name} без SQLAlchemy")
chunks = self.read_full_chunks_without_sqlalchemy(table_name, read_limit=read_limit)
first_chunk = True
total_rows = 0
for chunk_num, chunk in enumerate(chunks, 1):
if first_chunk:
# Дропаем целевую таблицу только после успешного чтения первого чанка,
# чтобы не уничтожить данные при недоступном MSSQL
self.logger.log_info(f"Очистка целевой таблицы {pg_table}")
self.truncate_table(pg_table)
self.write_dataframe_batch(chunk.iloc[0:0], pg_table, if_exists='fail')
self.logger.log_info(f"Таблица {pg_table} создана")
first_chunk = False
self.write_dataframe_batch_without_sqlalchemy(chunk, pg_table)
total_rows += len(chunk)
if chunk_num % self.config.BATCH_SIZE == 0:
self.logger.log_progress(table_name, chunk_num, total_rows)
self.logger.log_info(f"Всего загружено строк: {total_rows}")
if total_rows > 0:
self.sync_target_schema(table_name, pg_table)
self.create_timescale_hypertable(table_config)
if self.can_create_primary_key(table_config, pk_columns):
self.logger.log_info(f"Создание первичного ключа для {pg_table}")
self.create_pg_primary_key(pg_table, pk_columns)
if indexes:
self.logger.log_info(f"Создание {len(indexes)} индексов для {pg_table}")
self.create_pg_indexes(pg_table, indexes)
if foreign_keys:
self.logger.log_info(f"Создание {len(foreign_keys)} внешних ключей для {pg_table}")
self.create_pg_foreign_keys(pg_table, foreign_keys)
self.logger.log_info(f"Обновление статистики для {pg_table}")
if total_rows > 1000000:
self.vacuum_analyze_table(pg_table)
else:
self.analyze_table(pg_table)
self.logger.log_table_success(table_name, total_rows)
return True
except Exception as e:
if self.is_retryable_mssql_error(e):
raise
self.logger.log_table_failure(table_name, str(e))
return False
def migrate_incremental_table(self, table_config: TableMigrationConfig) -> bool:
"""Инкрементальная миграция Life_ таблицы по x_DateTime."""
table_name = table_config.source_table
pg_table = table_config.pg_table
self.logger.log_table_start(f"{table_name} ({table_config.read_table})")
upsert_staging = f"_stg_upsert_{pg_table}"
delete_staging = f"_stg_delete_{pg_table}"
try:
if not table_config.life_table:
@@ -1188,18 +1369,19 @@ class DatabaseMigrator:
)
if not delete_chunk.empty:
self.delete_dataframe_batch(delete_chunk, pg_table, table_config.primary_key)
self.delete_dataframe_batch(delete_chunk, pg_table, table_config.primary_key, staging_table=delete_staging)
if write_chunk.empty:
pass
elif first_chunk:
self.write_dataframe_batch(write_chunk, pg_table, if_exists='append')
write_chunk.iloc[0:0].to_sql(pg_table, self.dst_engine, if_exists='replace', index=False)
self.write_dataframe_batch_without_sqlalchemy(write_chunk, pg_table)
self.create_timescale_hypertable(table_config)
if self.can_create_primary_key(table_config, table_config.primary_key):
self.create_pg_primary_key(pg_table, table_config.primary_key)
first_chunk = False
elif table_config.primary_key:
self.upsert_dataframe_batch(write_chunk, pg_table, table_config.primary_key)
self.upsert_dataframe_batch(write_chunk, pg_table, table_config.primary_key, staging_table=upsert_staging)
else:
self.write_dataframe_batch(write_chunk, pg_table, if_exists='append')
@@ -1259,7 +1441,16 @@ class DatabaseMigrator:
self.logger.log_error(f"Ошибка при сохранении состояния миграции {pg_table}", state_error)
self.logger.log_table_failure(table_name, str(e))
return False
finally:
try:
with self.dst_engine.connect() as conn:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(upsert_staging)}'))
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(delete_staging)}'))
conn.commit()
except Exception as cleanup_error:
self.logger.log_warning(f"Не удалось очистить staging-таблицы для {pg_table}: {cleanup_error}")
def run_migration(
self,
table_names: Optional[List[str]] = None,