137 lines
7.1 KiB
Markdown
137 lines
7.1 KiB
Markdown
# Схемы репликации
|
||
|
||
## Конфигурация таблиц в БД
|
||
|
||
Хранится в `replicator.migration_tables`. Заполняется автоматически из `DEFAULT_TABLE_MIGRATIONS` при первом запуске, если таблица пуста.
|
||
|
||
### Поля таблицы `replicator.migration_tables`
|
||
|
||
| Поле | Тип | Описание |
|
||
|---|---|---|
|
||
| `source_table` | text PK | Имя таблицы в MSSQL |
|
||
| `target_table` | text | Имя таблицы в PostgreSQL (если `NULL` — берётся `source_table`) |
|
||
| `mode` | text | Схема репликации: `full` или `incremental` |
|
||
| `initial_load_mode` | text | Режим первого запуска: `full_then_incremental` |
|
||
| `life_table` | text | Имя Life_-таблицы в MSSQL (только для `incremental`) |
|
||
| `datetime_column` | text | Колонка даты в Life_-таблице (обычно `x_DateTime`) |
|
||
| `sequence_column` | text | Колонка последовательности в Life_-таблице (например `LPULifeID`) |
|
||
| `order_columns_json` | jsonb | Порядок сортировки при инкрементальном чтении, например `["x_DateTime","LPULifeID"]` |
|
||
| `operation_column` | text | Колонка типа операции в Life_-таблице (обычно `x_Operation`) |
|
||
| `delete_operations_json` | jsonb | Значения operation_column, означающие удаление (по умолчанию `["d"]`) |
|
||
| `upsert_operations_json` | jsonb | Значения operation_column, означающие вставку/обновление (по умолчанию `["i","u"]`) |
|
||
| `primary_key_json` | jsonb | Первичный ключ целевой таблицы, например `["LPUID"]` |
|
||
| `exclude_columns_json` | jsonb | Колонки Life_-таблицы, которые не нужно реплицировать (например служебные `x_DateTime`, `x_Operation`, `LPULifeID`) |
|
||
| `timescale` | boolean | Использовать TimescaleDB hypertable |
|
||
| `timescale_time_column` | text | Колонка времени для TimescaleDB |
|
||
| `enabled` | boolean | Включена ли репликация данной таблицы |
|
||
|
||
### Состояние репликации `replicator.migration_state`
|
||
|
||
| Поле | Описание |
|
||
|---|---|
|
||
| `table_name` | Имя таблицы PostgreSQL |
|
||
| `last_x_datetime` | Последняя обработанная дата из Life_ |
|
||
| `last_sequence_value` | Последнее обработанное значение sequence |
|
||
| `last_run_at` | Время последнего запуска |
|
||
| `rows_copied` | Количество скопированных строк |
|
||
| `status` | `success` / `failed` |
|
||
| `error` | Текст ошибки если `status = failed` |
|
||
|
||
---
|
||
|
||
## Схема 1: `mode = 'full'`
|
||
|
||
Полная перезапись таблицы при каждом запуске. Нет watermark, нет состояния.
|
||
|
||
**Когда использовать:** небольшие справочники, которые меняются редко и не имеют Life_-таблицы.
|
||
|
||
**Пример записи в БД:**
|
||
```sql
|
||
INSERT INTO replicator.migration_tables (
|
||
source_table, mode, initial_load_mode,
|
||
datetime_column, enabled
|
||
) VALUES (
|
||
'oms_LPU', 'full', 'full_then_incremental',
|
||
'x_DateTime', true
|
||
);
|
||
```
|
||
|
||
**Поведение каждого запуска:**
|
||
1. Читает всю таблицу из MSSQL
|
||
2. Перезаписывает в PostgreSQL
|
||
|
||
---
|
||
|
||
## Схема 2: `mode = 'incremental'`
|
||
|
||
Инкрементальная репликация через Life_-таблицу. Отслеживает watermark `(last_x_datetime, last_sequence_value)`.
|
||
|
||
**Когда использовать:** большие таблицы с журналом изменений (Life_).
|
||
|
||
**Пример записи в БД:**
|
||
```sql
|
||
INSERT INTO replicator.migration_tables (
|
||
source_table, target_table, mode, initial_load_mode,
|
||
life_table, datetime_column, sequence_column,
|
||
order_columns_json, operation_column,
|
||
delete_operations_json, upsert_operations_json,
|
||
primary_key_json, exclude_columns_json,
|
||
enabled
|
||
) VALUES (
|
||
'Oms_LPU', 'oms_lpu', 'incremental', 'full_then_incremental',
|
||
'Life_oms_LPU', 'x_DateTime', 'LPULifeID',
|
||
'["x_DateTime","LPULifeID"]', 'x_Operation',
|
||
'["d"]', '["i","u"]',
|
||
'["LPUID"]', '["LPULifeID","x_Operation","x_DateTime","x_Seance","x_User"]',
|
||
true
|
||
);
|
||
```
|
||
|
||
### Поведение каждого запуска
|
||
|
||
```
|
||
1. В Life_ нет новых данных (upper_bound = NULL)
|
||
→ Ничего не делать, watermark сохраняется как есть
|
||
|
||
2. Watermark NULL + таблица существует с данными
|
||
→ Автодетекция: ищет МИНИМАЛЬНЫЙ (x_DateTime, sequence)
|
||
в Life_ для всех PK уже имеющихся в PostgreSQL
|
||
→ Сохраняет как watermark
|
||
→ Переходит к шагу 4
|
||
|
||
3. Watermark всё ещё NULL + initial_load_mode = 'full_then_incremental'
|
||
→ Полная загрузка из source_table (не из Life_)
|
||
→ После успеха сохраняет upper_bound как watermark
|
||
|
||
4. Watermark есть → инкрементальная миграция
|
||
→ Читает из Life_: x_DateTime > watermark AND x_DateTime <= upper_bound
|
||
→ Разбивает на upsert (операции из upsert_operations) и delete (из delete_operations)
|
||
→ Применяет через staging-таблицу в PostgreSQL
|
||
→ Обновляет watermark до максимального обработанного значения
|
||
```
|
||
|
||
---
|
||
|
||
## Принудительная перезагрузка (`force_full`)
|
||
|
||
Запускается через API или вручную. Игнорирует watermark.
|
||
|
||
| Условие | Поведение |
|
||
|---|---|
|
||
| Первый `force_full` для `full_then_incremental` таблицы без watermark и без целевой таблицы | Быстрая загрузка через `COPY` без SQLAlchemy |
|
||
| Все остальные случаи | Обычная полная загрузка |
|
||
| После успеха на `incremental` таблице | Сохраняет `upper_bound` как watermark |
|
||
|
||
---
|
||
|
||
## Сводная матрица поведения
|
||
|
||
| Ситуация | Поведение |
|
||
|---|---|
|
||
| `mode = full` | Всегда полная перезапись |
|
||
| `mode = incremental`, нет данных в Life_ | Пропуск |
|
||
| `mode = incremental`, нет watermark, таблица с данными | Автодетекция min watermark → инкрементальная |
|
||
| `mode = incremental`, нет watermark, таблица пустая или отсутствует | Полная загрузка → watermark = upper_bound |
|
||
| `mode = incremental`, watermark есть | Инкрементальная от watermark до upper_bound |
|
||
| `force_full` | Полная перезагрузка → watermark = upper_bound |
|