diff --git a/app/migrator.py b/app/migrator.py index 5ae3ab9..a3ac82b 100644 --- a/app/migrator.py +++ b/app/migrator.py @@ -9,7 +9,7 @@ from typing import Any, Dict, List, Optional, Tuple import pandas as pd from sqlalchemy import create_engine, inspect, text -from sqlalchemy.exc import DBAPIError, OperationalError +from sqlalchemy.exc import DBAPIError, IntegrityError, OperationalError from sqlalchemy.pool import NullPool from sqlalchemy.sql import sqltypes @@ -188,6 +188,11 @@ class DatabaseMigrator: if any(marker in message for marker in retry_markers): return True + # IntegrityError (UniqueViolation, ForeignKeyViolation и т.д.) — детерминированная + # ошибка данных, retry не поможет + if isinstance(exception, IntegrityError): + return False + return isinstance(exception, (OperationalError, DBAPIError)) def migrate_table_once(self, table_config: TableMigrationConfig, force_full: bool = False) -> bool: @@ -722,6 +727,28 @@ class DatabaseMigrator: upsert_chunk = chunk[operations.isin(upsert_operations)] delete_chunk = chunk[operations.isin(delete_operations)] + + # Если для одного PK в батче есть и 'i'/'u' и 'd', и последнее событие — 'd', + # то строку не надо переинсертить после удаления — убираем её из upsert. + pk = table_config.primary_key + if pk and not delete_chunk.empty and not upsert_chunk.empty: + op_col = table_config.operation_column + last_op_per_pk = ( + chunk.assign(_op_lower=operations) + .groupby(pk, sort=False)['_op_lower'] + .last() + ) + pks_deleted_last = last_op_per_pk[last_op_per_pk.isin(delete_operations)] + if not pks_deleted_last.empty: + if len(pk) == 1: + upsert_chunk = upsert_chunk[ + ~upsert_chunk[pk[0]].isin(pks_deleted_last.index) + ] + else: + deleted_tuples = set(pks_deleted_last.index.tolist()) + upsert_mask = upsert_chunk[pk].apply(tuple, axis=1).isin(deleted_tuples) + upsert_chunk = upsert_chunk[~upsert_mask] + return upsert_chunk, delete_chunk def deduplicate_incremental_chunk( @@ -824,6 +851,68 @@ class DatabaseMigrator: conn.commit() raise + def _delete_secondary_conflicts( + self, + conn, + table_name: str, + staging: str, + primary_key: List[str], + ) -> None: + """Удаление строк в target, конфликтующих со staging по вторичным уникальным ключам. + + Нужно для случаев когда строка была удалена в источнике (событие 'd' пропущено из-за + watermark), а на её место вставлена новая строка с тем же уникальным ключом. + """ + # pg_constraint (contype='u') видит только UNIQUE CONSTRAINT, а не CREATE UNIQUE INDEX. + # Используем pg_index чтобы поймать оба варианта. + constraint_sql = text(""" + SELECT i.relname AS conname, + array_agg(a.attname ORDER BY array_position(ix.indkey::int[], a.attnum::int)) AS columns + FROM pg_index ix + JOIN pg_class t ON t.oid = ix.indrelid + JOIN pg_class i ON i.oid = ix.indexrelid + JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) + WHERE t.relname = :table_name + AND ix.indisunique = true + AND ix.indisprimary = false + GROUP BY i.relname + """) + try: + constraints = conn.execute(constraint_sql, {'table_name': table_name}).mappings().all() + except Exception as e: + self.logger.log_warning(f"Не удалось получить уникальные ограничения для {table_name}: {e}") + return + + pk_set = {c.lower() for c in primary_key} + t = self.quote_identifier(table_name) + s = self.quote_identifier(staging) + + for row in constraints: + cols = row['columns'] + if {c.lower() for c in cols} == pk_set: + continue + + join_parts = ' AND '.join( + f't.{self.quote_identifier(c)} IS NOT DISTINCT FROM s.{self.quote_identifier(c)}' + for c in cols + ) + pk_neq_parts = ' OR '.join( + f't.{self.quote_identifier(pk)} != s.{self.quote_identifier(pk)}' + for pk in primary_key + ) + delete_sql = text(f""" + DELETE FROM {t} AS t + USING {s} AS s + WHERE {join_parts} + AND ({pk_neq_parts}) + """) + result = conn.execute(delete_sql) + if result.rowcount > 0: + self.logger.log_info( + f"Вытеснено {result.rowcount} строк из {table_name} " + f"по ограничению {row['conname']} (пропущенный delete в источнике)" + ) + def upsert_dataframe_batch( self, chunk: pd.DataFrame, @@ -888,6 +977,7 @@ class DatabaseMigrator: ON CONFLICT ({conflict_columns}) {conflict_action} """ with self.dst_engine.connect() as conn: + self._delete_secondary_conflicts(conn, table_name, staging, primary_key) conn.execute(text(sql)) if own_staging: conn.execute(text(f'DROP TABLE IF EXISTS {self.quote_identifier(staging)}')) @@ -1684,10 +1774,16 @@ class DatabaseMigrator: return "'" + str(value).replace("'", "''") + "'" def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig) -> Dict[str, Any]: - """Поиск watermark в MSSQL Life_ по максимальному PK из PostgreSQL. + """Поиск точной границы снимка через Life_-таблицу. - Берёт MAX(pk) из PG (1 запрос), затем ищет соответствующее insert-событие - в Life_-таблице MSSQL (1 запрос). Работает только для одиночного целочисленного PK. + Алгоритм (3 запроса): + 1. MAX(pk) из PG — последний PK, существовавший на момент full load. + 2. MIN(LifeID) в Life_ для insert-событий с pk > max_pg_pk — первое событие, + которое ещё не отражено в PG (новая строка после снимка). + 3. Событие в Life_ прямо перед этим LifeID — точная граница снимка. + + Такой подход гарантирует что все delete/update для строк существовавших + на момент снимка попадут в инкрементальное окно. Для составного PK возвращает верхнюю границу Life_. """ empty: Dict[str, Any] = {'last_x_datetime': None, 'last_sequence_value': None} @@ -1703,6 +1799,10 @@ class DatabaseMigrator: return self.get_incremental_upper_bound(table_config) pk_col = pk_cols[0] + life_table = self.quote_mssql_identifier(table_config.life_table) + life_pk = self.quote_mssql_identifier(pk_col) + life_seq = self.quote_mssql_identifier(table_config.sequence_column) if table_config.sequence_column else None + life_dt = self.quote_mssql_identifier(table_config.datetime_column) # 1. MAX PK из PG with self.dst_engine.connect() as conn: @@ -1713,53 +1813,61 @@ class DatabaseMigrator: if max_pk is None: return empty + self.logger.log_info(f"MAX {pk_col} в {table_config.pg_table}: {max_pk}") + + # 2. MIN(LifeID) первого insert-события для PK, которых нет в PG + op_filter = "" + if table_config.operation_column: + op_filter = f"AND LOWER({self.quote_mssql_identifier(table_config.operation_column)}) = 'i'" + + sql_first_new = text(f""" + SELECT MIN({life_seq}) AS first_new_lifeid + FROM {life_table} WITH (NOLOCK) + WHERE {life_pk} > {self._format_mssql_literal(max_pk)} + {op_filter} + """) if life_seq else None + + first_new_lifeid = None + if life_seq is not None: + with self.src_engine.connect() as src_conn: + first_new_lifeid = src_conn.execute(sql_first_new).scalar() + + if first_new_lifeid is None: + # Новых строк после MAX PK нет — все текущие события уже в PG, + # берём верхнюю границу Life_ как watermark + self.logger.log_info( + f"Новых строк после {pk_col}={max_pk} нет, watermark = верхняя граница Life_" + ) + return self.get_incremental_upper_bound(table_config) + self.logger.log_info( - f"MAX {pk_col} в {table_config.pg_table}: {max_pk}, " - f"поиск insert-события в {table_config.life_table}" + f"Первое новое событие в {table_config.life_table}: {table_config.sequence_column}={first_new_lifeid}" ) - # 2. insert-событие для этого PK в Life_ MSSQL - select_parts = [ - f"{self.quote_mssql_identifier(table_config.datetime_column)} AS max_dt", - ] - if table_config.sequence_column: - select_parts.append( - f"{self.quote_mssql_identifier(table_config.sequence_column)} AS max_seq" - ) - else: - select_parts.append("CAST(NULL AS bigint) AS max_seq") - select_clause = ', '.join(select_parts) - - order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} DESC"] - if table_config.sequence_column: - order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} DESC") - order_clause = ', '.join(order_parts) - - where = f"{self.quote_mssql_identifier(pk_col)} = {self._format_mssql_literal(max_pk)}" - if table_config.operation_column: - where = f"({where}) AND LOWER({self.quote_mssql_identifier(table_config.operation_column)}) = 'i'" - - sql = text(f""" - SELECT TOP 1 {select_clause} - FROM {self.quote_mssql_identifier(table_config.life_table)} WITH (NOLOCK) - WHERE {where} - ORDER BY {order_clause} + # 3. Событие прямо перед первым новым — граница снимка + seq_select = f"{life_seq} AS max_seq" if life_seq else "CAST(NULL AS bigint) AS max_seq" + sql_boundary = text(f""" + SELECT TOP 1 {life_dt} AS max_dt, {seq_select} + FROM {life_table} WITH (NOLOCK) + WHERE {life_seq} < {self._format_mssql_literal(first_new_lifeid)} + ORDER BY {life_seq} DESC """) with self.src_engine.connect() as src_conn: - row = src_conn.execute(sql).mappings().first() + row = src_conn.execute(sql_boundary).mappings().first() if not row or row['max_dt'] is None: self.logger.log_warning( - f"Insert-событие для {pk_col}={max_pk} не найдено в {table_config.life_table}" + f"Не найдено событий до {table_config.sequence_column}={first_new_lifeid} " + f"в {table_config.life_table}, watermark = верхняя граница" ) - return empty + return self.get_incremental_upper_bound(table_config) result = { 'last_x_datetime': row['max_dt'], 'last_sequence_value': row['max_seq'], } - self.logger.log_info(f"Watermark по MAX PK: {result}") + self.logger.log_info(f"Watermark по границе снимка: {result}") return result def backfill_state_from_pg(