Files
syncio/docs/replication.md
brusnitsyn 3fb2053705 v2026.06.3
2026-06-14 22:51:46 +09:00

7.1 KiB
Raw Permalink Blame History

Схемы репликации

Конфигурация таблиц в БД

Хранится в replicator.migration_tables. Заполняется автоматически из DEFAULT_TABLE_MIGRATIONS при первом запуске, если таблица пуста.

Поля таблицы replicator.migration_tables

Поле Тип Описание
source_table text PK Имя таблицы в MSSQL
target_table text Имя таблицы в PostgreSQL (если NULL — берётся source_table)
mode text Схема репликации: full или incremental
initial_load_mode text Режим первого запуска: full_then_incremental
life_table text Имя Life_-таблицы в MSSQL (только для incremental)
datetime_column text Колонка даты в Life_-таблице (обычно x_DateTime)
sequence_column text Колонка последовательности в Life_-таблице (например LPULifeID)
order_columns_json jsonb Порядок сортировки при инкрементальном чтении, например ["x_DateTime","LPULifeID"]
operation_column text Колонка типа операции в Life_-таблице (обычно x_Operation)
delete_operations_json jsonb Значения operation_column, означающие удаление (по умолчанию ["d"])
upsert_operations_json jsonb Значения operation_column, означающие вставку/обновление (по умолчанию ["i","u"])
primary_key_json jsonb Первичный ключ целевой таблицы, например ["LPUID"]
exclude_columns_json jsonb Колонки Life_-таблицы, которые не нужно реплицировать (например служебные x_DateTime, x_Operation, LPULifeID)
timescale boolean Использовать TimescaleDB hypertable
timescale_time_column text Колонка времени для TimescaleDB
enabled boolean Включена ли репликация данной таблицы

Состояние репликации replicator.migration_state

Поле Описание
table_name Имя таблицы PostgreSQL
last_x_datetime Последняя обработанная дата из Life_
last_sequence_value Последнее обработанное значение sequence
last_run_at Время последнего запуска
rows_copied Количество скопированных строк
status success / failed
error Текст ошибки если status = failed

Схема 1: mode = 'full'

Полная перезапись таблицы при каждом запуске. Нет watermark, нет состояния.

Когда использовать: небольшие справочники, которые меняются редко и не имеют Life_-таблицы.

Пример записи в БД:

INSERT INTO replicator.migration_tables (
    source_table, mode, initial_load_mode,
    datetime_column, enabled
) VALUES (
    'oms_LPU', 'full', 'full_then_incremental',
    'x_DateTime', true
);

Поведение каждого запуска:

  1. Читает всю таблицу из MSSQL
  2. Перезаписывает в PostgreSQL

Схема 2: mode = 'incremental'

Инкрементальная репликация через Life_-таблицу. Отслеживает watermark (last_x_datetime, last_sequence_value).

Когда использовать: большие таблицы с журналом изменений (Life_).

Пример записи в БД:

INSERT INTO replicator.migration_tables (
    source_table, target_table, mode, initial_load_mode,
    life_table, datetime_column, sequence_column,
    order_columns_json, operation_column,
    delete_operations_json, upsert_operations_json,
    primary_key_json, exclude_columns_json,
    enabled
) VALUES (
    'Oms_LPU', 'oms_lpu', 'incremental', 'full_then_incremental',
    'Life_oms_LPU', 'x_DateTime', 'LPULifeID',
    '["x_DateTime","LPULifeID"]', 'x_Operation',
    '["d"]', '["i","u"]',
    '["LPUID"]', '["LPULifeID","x_Operation","x_DateTime","x_Seance","x_User"]',
    true
);

Поведение каждого запуска

1. В Life_ нет новых данных (upper_bound = NULL)
   → Ничего не делать, watermark сохраняется как есть

2. Watermark NULL + таблица существует с данными
   → Автодетекция: ищет МИНИМАЛЬНЫЙ (x_DateTime, sequence)
     в Life_ для всех PK уже имеющихся в PostgreSQL
   → Сохраняет как watermark
   → Переходит к шагу 4

3. Watermark всё ещё NULL + initial_load_mode = 'full_then_incremental'
   → Полная загрузка из source_table (не из Life_)
   → После успеха сохраняет upper_bound как watermark

4. Watermark есть → инкрементальная миграция
   → Читает из Life_: x_DateTime > watermark AND x_DateTime <= upper_bound
   → Разбивает на upsert (операции из upsert_operations) и delete (из delete_operations)
   → Применяет через staging-таблицу в PostgreSQL
   → Обновляет watermark до максимального обработанного значения

Принудительная перезагрузка (force_full)

Запускается через API или вручную. Игнорирует watermark.

Условие Поведение
Первый force_full для full_then_incremental таблицы без watermark и без целевой таблицы Быстрая загрузка через COPY без SQLAlchemy
Все остальные случаи Обычная полная загрузка
После успеха на incremental таблице Сохраняет upper_bound как watermark

Сводная матрица поведения

Ситуация Поведение
mode = full Всегда полная перезапись
mode = incremental, нет данных в Life_ Пропуск
mode = incremental, нет watermark, таблица с данными Автодетекция min watermark → инкрементальная
mode = incremental, нет watermark, таблица пустая или отсутствует Полная загрузка → watermark = upper_bound
mode = incremental, watermark есть Инкрементальная от watermark до upper_bound
force_full Полная перезагрузка → watermark = upper_bound