Исправление индикатора верхней границы

This commit is contained in:
brusnitsyn
2026-06-16 09:11:19 +09:00
parent 3fb2053705
commit 3f08d263ad

View File

@@ -1429,7 +1429,7 @@ class DatabaseMigrator:
f"Таблица {pg_table} содержит {pg_count} строк без watermark — " f"Таблица {pg_table} содержит {pg_count} строк без watermark — "
f"автоопределение watermark из {table_config.life_table}" f"автоопределение watermark из {table_config.life_table}"
) )
detected = self._get_watermark_for_pg_data(table_config, find_min=True) detected = self._get_watermark_for_pg_data(table_config)
if detected['last_x_datetime'] is not None: if detected['last_x_datetime'] is not None:
self.save_watermark( self.save_watermark(
pg_table, pg_table,
@@ -1683,12 +1683,12 @@ class DatabaseMigrator:
return '0x' + value.hex() return '0x' + value.hex()
return "'" + str(value).replace("'", "''") + "'" return "'" + str(value).replace("'", "''") + "'"
def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig, find_min: bool = False) -> Dict[str, Any]: def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig) -> Dict[str, Any]:
"""Поиск watermark в MSSQL Life_ для строк, уже имеющихся в PostgreSQL. """Поиск watermark в MSSQL Life_ по максимальному PK из PostgreSQL.
Читает PK из PG потоком (без загрузки всей таблицы в память), батчами Берёт MAX(pk) из PG (1 запрос), затем ищет соответствующее insert-событие
запрашивает MSSQL Life_ и находит минимальную или максимальную (datetime, sequence) пару в Life_-таблице MSSQL (1 запрос). Работает только для одиночного целочисленного PK.
среди событий, относящихся к уже реплицированным строкам. Для составного PK возвращает верхнюю границу Life_.
""" """
empty: Dict[str, Any] = {'last_x_datetime': None, 'last_sequence_value': None} empty: Dict[str, Any] = {'last_x_datetime': None, 'last_sequence_value': None}
pk_cols = table_config.primary_key pk_cols = table_config.primary_key
@@ -1696,14 +1696,29 @@ class DatabaseMigrator:
if not pk_cols or not table_config.life_table: if not pk_cols or not table_config.life_table:
return self.get_incremental_upper_bound(table_config) return self.get_incremental_upper_bound(table_config)
LOOKUP_BATCH = 500 if len(pk_cols) != 1:
self.logger.log_info(
f"Составной PK для {table_config.pg_table}, watermark берётся из верхней границы Life_"
)
return self.get_incremental_upper_bound(table_config)
direction = 'ASC' if find_min else 'DESC' pk_col = pk_cols[0]
order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} {direction}"]
if table_config.sequence_column:
order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} {direction}")
order_clause = ', '.join(order_parts)
# 1. MAX PK из PG
with self.dst_engine.connect() as conn:
max_pk = conn.execute(
text(f"SELECT MAX({self.quote_identifier(pk_col)}) FROM {self.quote_identifier(table_config.pg_table)}")
).scalar()
if max_pk is None:
return empty
self.logger.log_info(
f"MAX {pk_col} в {table_config.pg_table}: {max_pk}, "
f"поиск insert-события в {table_config.life_table}"
)
# 2. insert-событие для этого PK в Life_ MSSQL
select_parts = [ select_parts = [
f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_dt", f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_dt",
] ]
@@ -1711,88 +1726,41 @@ class DatabaseMigrator:
select_parts.append( select_parts.append(
f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_seq" f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_seq"
) )
else:
select_parts.append("CAST(NULL AS bigint) AS max_seq")
select_clause = ', '.join(select_parts) select_clause = ', '.join(select_parts)
max_datetime = None order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} DESC"]
max_sequence = None if table_config.sequence_column:
total_rows = 0 order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} DESC")
batch_num = 0 order_clause = ', '.join(order_parts)
quoted_pk = ', '.join(self.quote_identifier(c) for c in pk_cols) where = f"{self.quote_mssql_identifier(pk_col)} = {self._format_mssql_literal(max_pk)}"
pk_sql = text(f'SELECT {quoted_pk} FROM {self.quote_identifier(table_config.pg_table)}') if table_config.operation_column:
where = f"({where}) AND LOWER({self.quote_mssql_identifier(table_config.operation_column)}) = 'i'"
self.logger.log_info(
f"Потоковое чтение PK из {table_config.pg_table}, "
f"поиск watermark в {table_config.life_table} батчами по {LOOKUP_BATCH}"
)
with self.dst_engine.connect().execution_options(
stream_results=True, yield_per=LOOKUP_BATCH
) as conn:
for batch in conn.execute(pk_sql).partitions(LOOKUP_BATCH):
if not batch:
continue
batch_num += 1
total_rows += len(batch)
if len(pk_cols) == 1:
col = pk_cols[0]
values_str = ', '.join(self._format_mssql_literal(row[0]) for row in batch)
where = f"{self.quote_mssql_identifier(col)} IN ({values_str})"
else:
conditions = []
for row in batch:
parts = [
f"{self.quote_mssql_identifier(col)} = {self._format_mssql_literal(row[i])}"
for i, col in enumerate(pk_cols)
]
conditions.append(f"({' AND '.join(parts)})")
where = ' OR '.join(conditions)
sql = text(f""" sql = text(f"""
SELECT TOP 1 {select_clause} SELECT TOP 1 {select_clause}
FROM {self.quote_mssql_identifier(table_config.life_table)} FROM {self.quote_mssql_identifier(table_config.life_table)} WITH (NOLOCK)
WHERE {where} WHERE {where}
ORDER BY {order_clause} ORDER BY {order_clause}
""") """)
try:
with self.src_engine.connect() as src_conn: with self.src_engine.connect() as src_conn:
row = src_conn.execute(sql).mappings().first() row = src_conn.execute(sql).mappings().first()
if row and row['max_dt'] is not None: if not row or row['max_dt'] is None:
batch_dt = row['max_dt']
batch_seq = row['max_seq'] if table_config.sequence_column else None
is_better = (
max_datetime is None
or (find_min and (
batch_dt < max_datetime
or (batch_dt == max_datetime and batch_seq is not None and (max_sequence is None or batch_seq < max_sequence))
))
or (not find_min and (
batch_dt > max_datetime
or (batch_dt == max_datetime and batch_seq is not None and (max_sequence is None or batch_seq > max_sequence))
))
)
if is_better:
max_datetime = batch_dt
max_sequence = batch_seq
except Exception as e:
self.logger.log_warning( self.logger.log_warning(
f"Ошибка при поиске watermark батч #{batch_num} " f"Insert-событие для {pk_col}={max_pk} не найдено в {table_config.life_table}"
f"(строки {total_rows - len(batch) + 1}{total_rows}): {e}"
) )
return empty
direction_label = 'минимальный' if find_min else 'максимальный' result = {
self.logger.log_info( 'last_x_datetime': row['max_dt'],
f"Обработано {total_rows} PK из {table_config.pg_table} " 'last_sequence_value': row['max_seq'],
f"в {batch_num} батчах, {direction_label} watermark: {max_datetime}" }
) self.logger.log_info(f"Watermark по MAX PK: {result}")
return result
return {'last_x_datetime': max_datetime, 'last_sequence_value': max_sequence}
def backfill_state_from_pg( def backfill_state_from_pg(
self, self,