diff --git a/app/migrator.py b/app/migrator.py index bfb7f11..5ae3ab9 100644 --- a/app/migrator.py +++ b/app/migrator.py @@ -1429,7 +1429,7 @@ class DatabaseMigrator: f"Таблица {pg_table} содержит {pg_count} строк без watermark — " f"автоопределение watermark из {table_config.life_table}" ) - detected = self._get_watermark_for_pg_data(table_config, find_min=True) + detected = self._get_watermark_for_pg_data(table_config) if detected['last_x_datetime'] is not None: self.save_watermark( pg_table, @@ -1683,12 +1683,12 @@ class DatabaseMigrator: return '0x' + value.hex() return "'" + str(value).replace("'", "''") + "'" - def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig, find_min: bool = False) -> Dict[str, Any]: - """Поиск watermark в MSSQL Life_ для строк, уже имеющихся в PostgreSQL. + def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig) -> Dict[str, Any]: + """Поиск watermark в MSSQL Life_ по максимальному PK из PostgreSQL. - Читает PK из PG потоком (без загрузки всей таблицы в память), батчами - запрашивает MSSQL Life_ и находит минимальную или максимальную (datetime, sequence) пару - среди событий, относящихся к уже реплицированным строкам. + Берёт MAX(pk) из PG (1 запрос), затем ищет соответствующее insert-событие + в Life_-таблице MSSQL (1 запрос). Работает только для одиночного целочисленного PK. + Для составного PK возвращает верхнюю границу Life_. """ empty: Dict[str, Any] = {'last_x_datetime': None, 'last_sequence_value': None} pk_cols = table_config.primary_key @@ -1696,14 +1696,29 @@ class DatabaseMigrator: if not pk_cols or not table_config.life_table: return self.get_incremental_upper_bound(table_config) - LOOKUP_BATCH = 500 + if len(pk_cols) != 1: + self.logger.log_info( + f"Составной PK для {table_config.pg_table}, watermark берётся из верхней границы Life_" + ) + return self.get_incremental_upper_bound(table_config) - direction = 'ASC' if find_min else 'DESC' - order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} {direction}"] - if table_config.sequence_column: - order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} {direction}") - order_clause = ', '.join(order_parts) + pk_col = pk_cols[0] + # 1. MAX PK из PG + with self.dst_engine.connect() as conn: + max_pk = conn.execute( + text(f"SELECT MAX({self.quote_identifier(pk_col)}) FROM {self.quote_identifier(table_config.pg_table)}") + ).scalar() + + if max_pk is None: + return empty + + self.logger.log_info( + f"MAX {pk_col} в {table_config.pg_table}: {max_pk}, " + f"поиск insert-события в {table_config.life_table}" + ) + + # 2. insert-событие для этого PK в Life_ MSSQL select_parts = [ f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_dt", ] @@ -1711,88 +1726,41 @@ class DatabaseMigrator: select_parts.append( f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_seq" ) + else: + select_parts.append("CAST(NULL AS bigint) AS max_seq") select_clause = ', '.join(select_parts) - max_datetime = None - max_sequence = None - total_rows = 0 - batch_num = 0 + order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} DESC"] + if table_config.sequence_column: + order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} DESC") + order_clause = ', '.join(order_parts) - quoted_pk = ', '.join(self.quote_identifier(c) for c in pk_cols) - pk_sql = text(f'SELECT {quoted_pk} FROM {self.quote_identifier(table_config.pg_table)}') + where = f"{self.quote_mssql_identifier(pk_col)} = {self._format_mssql_literal(max_pk)}" + if table_config.operation_column: + where = f"({where}) AND LOWER({self.quote_mssql_identifier(table_config.operation_column)}) = 'i'" - self.logger.log_info( - f"Потоковое чтение PK из {table_config.pg_table}, " - f"поиск watermark в {table_config.life_table} батчами по {LOOKUP_BATCH}" - ) + sql = text(f""" + SELECT TOP 1 {select_clause} + FROM {self.quote_mssql_identifier(table_config.life_table)} WITH (NOLOCK) + WHERE {where} + ORDER BY {order_clause} + """) - with self.dst_engine.connect().execution_options( - stream_results=True, yield_per=LOOKUP_BATCH - ) as conn: - for batch in conn.execute(pk_sql).partitions(LOOKUP_BATCH): - if not batch: - continue + with self.src_engine.connect() as src_conn: + row = src_conn.execute(sql).mappings().first() - batch_num += 1 - total_rows += len(batch) + if not row or row['max_dt'] is None: + self.logger.log_warning( + f"Insert-событие для {pk_col}={max_pk} не найдено в {table_config.life_table}" + ) + return empty - if len(pk_cols) == 1: - col = pk_cols[0] - values_str = ', '.join(self._format_mssql_literal(row[0]) for row in batch) - where = f"{self.quote_mssql_identifier(col)} IN ({values_str})" - else: - conditions = [] - for row in batch: - parts = [ - f"{self.quote_mssql_identifier(col)} = {self._format_mssql_literal(row[i])}" - for i, col in enumerate(pk_cols) - ] - conditions.append(f"({' AND '.join(parts)})") - where = ' OR '.join(conditions) - - sql = text(f""" - SELECT TOP 1 {select_clause} - FROM {self.quote_mssql_identifier(table_config.life_table)} - WHERE {where} - ORDER BY {order_clause} - """) - - try: - with self.src_engine.connect() as src_conn: - row = src_conn.execute(sql).mappings().first() - - if row and row['max_dt'] is not None: - batch_dt = row['max_dt'] - batch_seq = row['max_seq'] if table_config.sequence_column else None - - is_better = ( - max_datetime is None - or (find_min and ( - batch_dt < max_datetime - or (batch_dt == max_datetime and batch_seq is not None and (max_sequence is None or batch_seq < max_sequence)) - )) - or (not find_min and ( - batch_dt > max_datetime - or (batch_dt == max_datetime and batch_seq is not None and (max_sequence is None or batch_seq > max_sequence)) - )) - ) - if is_better: - max_datetime = batch_dt - max_sequence = batch_seq - - except Exception as e: - self.logger.log_warning( - f"Ошибка при поиске watermark батч #{batch_num} " - f"(строки {total_rows - len(batch) + 1}–{total_rows}): {e}" - ) - - direction_label = 'минимальный' if find_min else 'максимальный' - self.logger.log_info( - f"Обработано {total_rows} PK из {table_config.pg_table} " - f"в {batch_num} батчах, {direction_label} watermark: {max_datetime}" - ) - - return {'last_x_datetime': max_datetime, 'last_sequence_value': max_sequence} + result = { + 'last_x_datetime': row['max_dt'], + 'last_sequence_value': row['max_seq'], + } + self.logger.log_info(f"Watermark по MAX PK: {result}") + return result def backfill_state_from_pg( self,