Исправил определение границы для следующей репликации

This commit is contained in:
brusnitsyn
2026-06-16 14:31:10 +09:00
parent 3f08d263ad
commit 17f4d4307f

View File

@@ -9,7 +9,7 @@ from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.exc import DBAPIError, OperationalError
from sqlalchemy.exc import DBAPIError, IntegrityError, OperationalError
from sqlalchemy.pool import NullPool
from sqlalchemy.sql import sqltypes
@@ -188,6 +188,11 @@ class DatabaseMigrator:
if any(marker in message for marker in retry_markers):
return True
# IntegrityError (UniqueViolation, ForeignKeyViolation и т.д.) — детерминированная
# ошибка данных, retry не поможет
if isinstance(exception, IntegrityError):
return False
return isinstance(exception, (OperationalError, DBAPIError))
def migrate_table_once(self, table_config: TableMigrationConfig, force_full: bool = False) -> bool:
@@ -722,6 +727,28 @@ class DatabaseMigrator:
upsert_chunk = chunk[operations.isin(upsert_operations)]
delete_chunk = chunk[operations.isin(delete_operations)]
# Если для одного PK в батче есть и 'i'/'u' и 'd', и последнее событие — 'd',
# то строку не надо переинсертить после удаления — убираем её из upsert.
pk = table_config.primary_key
if pk and not delete_chunk.empty and not upsert_chunk.empty:
op_col = table_config.operation_column
last_op_per_pk = (
chunk.assign(_op_lower=operations)
.groupby(pk, sort=False)['_op_lower']
.last()
)
pks_deleted_last = last_op_per_pk[last_op_per_pk.isin(delete_operations)]
if not pks_deleted_last.empty:
if len(pk) == 1:
upsert_chunk = upsert_chunk[
~upsert_chunk[pk[0]].isin(pks_deleted_last.index)
]
else:
deleted_tuples = set(pks_deleted_last.index.tolist())
upsert_mask = upsert_chunk[pk].apply(tuple, axis=1).isin(deleted_tuples)
upsert_chunk = upsert_chunk[~upsert_mask]
return upsert_chunk, delete_chunk
def deduplicate_incremental_chunk(
@@ -824,6 +851,68 @@ class DatabaseMigrator:
conn.commit()
raise
def _delete_secondary_conflicts(
self,
conn,
table_name: str,
staging: str,
primary_key: List[str],
) -> None:
"""Удаление строк в target, конфликтующих со staging по вторичным уникальным ключам.
Нужно для случаев когда строка была удалена в источнике (событие 'd' пропущено из-за
watermark), а на её место вставлена новая строка с тем же уникальным ключом.
"""
# pg_constraint (contype='u') видит только UNIQUE CONSTRAINT, а не CREATE UNIQUE INDEX.
# Используем pg_index чтобы поймать оба варианта.
constraint_sql = text("""
SELECT i.relname AS conname,
array_agg(a.attname ORDER BY array_position(ix.indkey::int[], a.attnum::int)) AS columns
FROM pg_index ix
JOIN pg_class t ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relname = :table_name
AND ix.indisunique = true
AND ix.indisprimary = false
GROUP BY i.relname
""")
try:
constraints = conn.execute(constraint_sql, {'table_name': table_name}).mappings().all()
except Exception as e:
self.logger.log_warning(f"Не удалось получить уникальные ограничения для {table_name}: {e}")
return
pk_set = {c.lower() for c in primary_key}
t = self.quote_identifier(table_name)
s = self.quote_identifier(staging)
for row in constraints:
cols = row['columns']
if {c.lower() for c in cols} == pk_set:
continue
join_parts = ' AND '.join(
f't.{self.quote_identifier(c)} IS NOT DISTINCT FROM s.{self.quote_identifier(c)}'
for c in cols
)
pk_neq_parts = ' OR '.join(
f't.{self.quote_identifier(pk)} != s.{self.quote_identifier(pk)}'
for pk in primary_key
)
delete_sql = text(f"""
DELETE FROM {t} AS t
USING {s} AS s
WHERE {join_parts}
AND ({pk_neq_parts})
""")
result = conn.execute(delete_sql)
if result.rowcount > 0:
self.logger.log_info(
f"Вытеснено {result.rowcount} строк из {table_name} "
f"по ограничению {row['conname']} (пропущенный delete в источнике)"
)
def upsert_dataframe_batch(
self,
chunk: pd.DataFrame,
@@ -888,6 +977,7 @@ class DatabaseMigrator:
ON CONFLICT ({conflict_columns}) {conflict_action}
"""
with self.dst_engine.connect() as conn:
self._delete_secondary_conflicts(conn, table_name, staging, primary_key)
conn.execute(text(sql))
if own_staging:
conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}'))
@@ -1684,10 +1774,16 @@ class DatabaseMigrator:
return "'" + str(value).replace("'", "''") + "'"
def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig) -> Dict[str, Any]:
"""Поиск watermark в MSSQL Life_ по максимальному PK из PostgreSQL.
"""Поиск точной границы снимка через Life_-таблицу.
Берёт MAX(pk) из PG (1 запрос), затем ищет соответствующее insert-событие
в Life_-таблице MSSQL (1 запрос). Работает только для одиночного целочисленного PK.
Алгоритм (3 запроса):
1. MAX(pk) из PG — последний PK, существовавший на момент full load.
2. MIN(LifeID) в Life_ для insert-событий с pk > max_pg_pk — первое событие,
которое ещё не отражено в PG (новая строка после снимка).
3. Событие в Life_ прямо перед этим LifeID — точная граница снимка.
Такой подход гарантирует что все delete/update для строк существовавших
на момент снимка попадут в инкрементальное окно.
Для составного PK возвращает верхнюю границу Life_.
"""
empty: Dict[str, Any] = {'last_x_datetime': None, 'last_sequence_value': None}
@@ -1703,6 +1799,10 @@ class DatabaseMigrator:
return self.get_incremental_upper_bound(table_config)
pk_col = pk_cols[0]
life_table = self.quote_mssql_identifier(table_config.life_table)
life_pk = self.quote_mssql_identifier(pk_col)
life_seq = self.quote_mssql_identifier(table_config.sequence_column) if table_config.sequence_column else None
life_dt = self.quote_mssql_identifier(table_config.datetime_column)
# 1. MAX PK из PG
with self.dst_engine.connect() as conn:
@@ -1713,53 +1813,61 @@ class DatabaseMigrator:
if max_pk is None:
return empty
self.logger.log_info(
f"MAX {pk_col} в {table_config.pg_table}: {max_pk}, "
f"поиск insert-события в {table_config.life_table}"
)
self.logger.log_info(f"MAX {pk_col} в {table_config.pg_table}: {max_pk}")
# 2. insert-событие для этого PK в Life_ MSSQL
select_parts = [
f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_dt",
]
if table_config.sequence_column:
select_parts.append(
f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_seq"
)
else:
select_parts.append("CAST(NULL AS bigint) AS max_seq")
select_clause = ', '.join(select_parts)
order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} DESC"]
if table_config.sequence_column:
order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} DESC")
order_clause = ', '.join(order_parts)
where = f"{self.quote_mssql_identifier(pk_col)} = {self._format_mssql_literal(max_pk)}"
# 2. MIN(LifeID) первого insert-события для PK, которых нет в PG
op_filter = ""
if table_config.operation_column:
where = f"({where}) AND LOWER({self.quote_mssql_identifier(table_config.operation_column)}) = 'i'"
op_filter = f"AND LOWER({self.quote_mssql_identifier(table_config.operation_column)}) = 'i'"
sql = text(f"""
SELECT TOP 1 {select_clause}
FROM {self.quote_mssql_identifier(table_config.life_table)} WITH (NOLOCK)
WHERE {where}
ORDER BY {order_clause}
sql_first_new = text(f"""
SELECT MIN({life_seq}) AS first_new_lifeid
FROM {life_table} WITH (NOLOCK)
WHERE {life_pk} > {self._format_mssql_literal(max_pk)}
{op_filter}
""") if life_seq else None
first_new_lifeid = None
if life_seq is not None:
with self.src_engine.connect() as src_conn:
first_new_lifeid = src_conn.execute(sql_first_new).scalar()
if first_new_lifeid is None:
# Новых строк после MAX PK нет — все текущие события уже в PG,
# берём верхнюю границу Life_ как watermark
self.logger.log_info(
f"Новых строк после {pk_col}={max_pk} нет, watermark = верхняя граница Life_"
)
return self.get_incremental_upper_bound(table_config)
self.logger.log_info(
f"Первое новое событие в {table_config.life_table}: {table_config.sequence_column}={first_new_lifeid}"
)
# 3. Событие прямо перед первым новым — граница снимка
seq_select = f"{life_seq} AS max_seq" if life_seq else "CAST(NULL AS bigint) AS max_seq"
sql_boundary = text(f"""
SELECT TOP 1 {life_dt} AS max_dt, {seq_select}
FROM {life_table} WITH (NOLOCK)
WHERE {life_seq} < {self._format_mssql_literal(first_new_lifeid)}
ORDER BY {life_seq} DESC
""")
with self.src_engine.connect() as src_conn:
row = src_conn.execute(sql).mappings().first()
row = src_conn.execute(sql_boundary).mappings().first()
if not row or row['max_dt'] is None:
self.logger.log_warning(
f"Insert-событие для {pk_col}={max_pk} не найдено в {table_config.life_table}"
f"Не найдено событий до {table_config.sequence_column}={first_new_lifeid} "
f"в {table_config.life_table}, watermark = верхняя граница"
)
return empty
return self.get_incremental_upper_bound(table_config)
result = {
'last_x_datetime': row['max_dt'],
'last_sequence_value': row['max_seq'],
}
self.logger.log_info(f"Watermark по MAX PK: {result}")
self.logger.log_info(f"Watermark по границе снимка: {result}")
return result
def backfill_state_from_pg(