Files
2026-03-29 23:24:15 +09:00

16 KiB
Raw Permalink Blame History

Data Replication Service

Сервис репликации данных из множественных источников (MSSQL, PostgreSQL) в целевую PostgreSQL базу с использованием DLT (Data Load Tool), FastAPI и планировщика задач APScheduler.

Особенности

  • Динамическое управление источниками данных - Добавляйте и удаляйте источники через REST API без перезагрузки сервиса
  • Поддержка множественных типов источников (MSSQL, PostgreSQL)
  • Репликация таблиц с использованием DLT для оптимальной производительности
  • Обработка логов изменений (Life таблицы с INSERT, UPDATE, DELETE операциями)
  • Индивидуальное расписание для каждой таблицы (Cron выражения)
  • Управление конфликтами времени выполнения (очередь задач)
  • REST API для полного управления процессом репликации
  • Кеширование подключений для оптимизации производительности
  • Логирование и мониторинг задач репликации
  • Health check и статистика

Архитектурные улучшения

Переход на DataSource-based конфигурацию

Раньше (static конфигурация):

  • Источники данных определялись в .env файле
  • Требовалась перезагрузка сервиса для изменения конфигурации
  • Одна MSSQL и одна PostgreSQL база фиксированно

Теперь (динамическая конфигурация):

  • Источники хранятся в БД (таблица data_sources)
  • Полное управление через REST API
  • Поддержка множественных источников каждого типа
  • Без необходимости перезагрузки сервиса
  • Кеширование подключений с TTL = 5 минут

Структура проекта

replication_service/
├── main.py                    # Главное приложение FastAPI
├── config.py                  # Конфигурация и переменные окружения
├── models.py                  # ORM модели (SQLAlchemy) - включая DataSource
├── database.py                # Управление подключениями с кешированием
├── api.py                     # REST API endpoints (DataSource + Migration)
├── replication.py             # Логика репликации данных
├── scheduler.py               # Планировщик задач с управлением конфликтами
├── migrate_to_data_sources.py # Скрипт миграции из старой конфигурации
├── examples_api.py            # Примеры использования новой API
├── requirements.txt           # Зависимости Python
├── DATA_SOURCES.md            # Документация по управлению источниками
├── .env.example               # Пример файла переменных окружения
└── README.md                  # Документация

Установка

1. Создать виртуальное окружение

python -m venv venv
source venv/bin/activate  # Linux/Mac
# или
venv\Scripts\activate  # Windows

2. Установить зависимости

pip install -r requirements.txt

3. Настроить переменные окружения

Скопировать .env.example в .env и заполнить данные целевой базы данных PostgreSQL:

cp .env.example .env
# PostgreSQL Connection (целевая база для всех репликаций)
POSTGRES_HOST=your_postgres_host
POSTGRES_DATABASE=replication_db
POSTGRES_USERNAME=postgres
POSTGRES_PASSWORD=your_password
POSTGRES_PORT=5432

Важно: Источники данных больше НЕ конфигурируются через .env файл!

4. Инициализировать базу данных

python -c "from database import DatabaseManager; DatabaseManager.init_postgres_db()"

Запуск

python main.py

или

uvicorn main:app --reload --host 0.0.0.0 --port 8000

Сервис будет доступен по адресу: http://localhost:8000

Быстрый старт с новой API

1. Создать источник данных

curl -X POST http://localhost:8000/api/v1/data-sources \
  -H "Content-Type: application/json" \
  -d '{
    "name": "MSSQL Production",
    "source_type": "mssql",
    "host": "mssql-server.example.com",
    "port": 1433,
    "database": "MyDatabase",
    "username": "sa",
    "password": "Password123!"
  }'

Ответ:

{
  "id": 1,
  "name": "MSSQL Production",
  "source_type": "mssql",
  "host": "mssql-server.example.com",
  "port": 1433,
  "database": "MyDatabase",
  "default_schema": "dbo",
  "is_active": true,
  "created_at": "2024-01-15T10:30:00",
  "updated_at": "2024-01-15T10:30:00"
}

2. Проверить подключение

curl -X POST http://localhost:8000/api/v1/data-sources/1/test

Ответ:

{
  "source_id": 1,
  "connection_ok": true
}

3. Создать расписание репликации

curl -X POST http://localhost:8000/api/v1/migration-tables \
  -H "Content-Type: application/json" \
  -d '{
    "table_name": "Orders",
    "source_id": 1,
    "cron_schedule": "0 2 * * *",
    "source_schema": "dbo",
    "target_schema": "public"
  }'

Ответ:

{
  "id": 1,
  "table_name": "Orders",
  "source_id": 1,
  "source_schema": "dbo",
  "target_schema": "public",
  "cron_schedule": "0 2 * * *",
  "is_active": true,
  "created_at": "2024-01-15T10:35:00",
  "updated_at": "2024-01-15T10:35:00"
}

4. Запустить примеры API

python examples_api.py

Этот скрипт демонстрирует все основные операции с API.

API Endpoints

Health Check

  • GET /api/v1/health - Проверка здоровья сервиса

Ответ: "status": "healthy", "mssql_connected": true, "postgres_connected": true, "timestamp": "2026-03-29T10:00:00" }


### Управление таблицами миграции

#### Создать таблицу для миграции

```bash
POST /api/v1/migration-tables
Content-Type: application/json

{
  "table_name": "Orders",
  "cron_schedule": "0 2 * * *",  # Каждый день в 2:00
  "source_schema": "dbo",
  "target_schema": "public"
}

Получить список всех таблиц

GET /api/v1/migration-tables

Получить информацию о конкретной таблице

GET /api/v1/migration-tables/{table_id}

Удалить таблицу из миграции

DELETE /api/v1/migration-tables/{table_id}

Управление планировщиком

Получить статус планировщика

GET /api/v1/scheduler/status

Ответ:

{
  "is_running": true,
  "migration_tables_count": 5,
  "running_jobs": {
    "1": 1680000000.123
  },
  "queued_jobs": []
}

Запустить планировщик

POST /api/v1/scheduler/start

Остановить планировщик

POST /api/v1/scheduler/stop

Получить список задач планировщика

GET /api/v1/scheduler/jobs

История и статистика

Получить список задач репликации

GET /api/v1/replication-jobs?limit=100&offset=0&status=success

Получить информацию о конкретной задаче

GET /api/v1/replication-jobs/{job_id}

Получить статистику

GET /api/v1/statistics

Ответ:

{
  "total_jobs": 150,
  "successful_jobs": 148,
  "failed_jobs": 2,
  "success_rate": 98.67,
  "timestamp": "2026-03-29T10:00:00"
}

Примеры использования

1. Добавить таблицу для репликации с расписанием

curl -X POST "http://localhost:8000/api/v1/migration-tables" \
  -H "Content-Type: application/json" \
  -d '{
    "table_name": "Customers",
    "cron_schedule": "*/30 * * * *",
    "source_schema": "dbo",
    "target_schema": "public"
  }'

Это расписание означает: "выполнять репликацию каждые 30 минут"

2. Добавить несколько таблиц с разным расписанием

# Orders - каждый день в 2:00 AM
curl -X POST "http://localhost:8000/api/v1/migration-tables" \
  -H "Content-Type: application/json" \
  -d '{"table_name": "Orders", "cron_schedule": "0 2 * * *"}'

# Products - каждый день в 3:00 AM
curl -X POST "http://localhost:8000/api/v1/migration-tables" \
  -H "Content-Type: application/json" \
  -d '{"table_name": "Products", "cron_schedule": "0 3 * * *"}'

3. Контролировать расписание при конфликтах

Если две таблицы имеют пересекающееся расписание, они будут автоматически добавлены в очередь и выполняться последовательно:

# Таблица 1: каждый день в 2:00 AM
# Таблица 2: каждый день в 2:15 AM
# Таблица 3: каждый день в 2:30 AM

# Все три задачи начнут выполняться в порядке очереди

Спецификация Cron для расписания

Формат: <минуты> <часы> <день месяца> <месяц> <день недели>

Примеры:

  • 0 2 * * * - Каждый день в 2:00 AM
  • */30 * * * * - Каждые 30 минут
  • 0 0 * * 0 - Каждое воскресенье в полночь
  • 0 9,12,15 * * * - В 9:00, 12:00 и 15:00 каждый день
  • 0 */4 * * * - Каждые 4 часа
  • 0 0 1 * * - Первый день каждого месяца в полночь

Life таблицы

Life таблицы - это таблицы в MSSQL, которые хранят логи транзакций для оригинальных таблиц.

Соглашение по именованию:

  • Оригинальная таблица: Orders
  • Life таблица: LifeOrders

Сервис автоматически обрабатывает следующие операции:

  • INSERT - Добавление новых записей
  • UPDATE - Обновление существующих записей
  • DELETE - Удаление записей

Мониторинг и отладка

Логи приложения

Логи выводятся с информацией о каждом этапе репликации:

2026-03-29 10:00:00 - scheduler - INFO - Starting replication for table Orders
2026-03-29 10:00:15 - replication - INFO - Successfully replicated 1250 rows from Orders
2026-03-29 10:00:25 - scheduler - INFO - Processing queued job for Products

API Documentation

После запуска приложения, интерактивная документация доступна по адресам:

  • Swagger UI: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc

Конфигурация

DatabaseManager

from database import DatabaseManager

# Проверить подключение к MSSQL
DatabaseManager.test_mssql_connection()

# Проверить подключение к PostgreSQL
DatabaseManager.test_postgres_connection()

# Инициализировать таблицы PostgreSQL
DatabaseManager.init_postgres_db()

SchedulerManager

from scheduler import scheduler_manager

# Добавить таблицу
scheduler_manager.add_migration_table(
    table_name="Orders",
    cron_schedule="0 2 * * *"
)

# Получить список таблиц
tables = scheduler_manager.get_migration_tables()

# Получить текущие выполняющиеся задачи
running = scheduler_manager.get_running_jobs()

# Получить задачи в очереди
queued = scheduler_manager.get_queued_jobs()

Производительность

  • Асинхронная обработка: Использует APScheduler для фонового выполнения
  • Очередь задач: Автоматическое управление конфликтами расписания
  • Оптимизация DLT:批量 обработка данных для минимизации операций с БД
  • Connection pooling: Эффективное использование подключений к БД

Безопасность

  • Используйте переменные окружения для конфиденциальных данных (не коммитьте .env)
  • Настройте .env.example как шаблон для других разработчиков
  • Рассмотрите использование API ключей для защиты endpoints
  • Логируйте все операции репликации для аудита

Развертывание на производстве

Docker

FROM python:3.11

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

version: '3.8'

services:
  replication-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      MSSQL_SERVER: mssql
      POSTGRES_HOST: postgres
    depends_on:
      - postgres
  
  postgres:
    image: postgres:15
    environment:
      POSTGRES_PASSWORD: postgres

Решение проблем

Ошибка подключения к MSSQL

  1. Проверить, запущен ли сервер MSSQL
  2. Проверить правильность USER/PASSWORD в .env
  3. Убедиться, что брандмауэр не блокирует порт 1433
  4. Проверить права доступа sa пользователя

Ошибка подключения к PostgreSQL

  1. Проверить, запущен ли сервер PostgreSQL
  2. Проверить правильность HOST/PORT в .env
  3. Убедиться, что база данных с заданным именем существует
  4. Проверить права доступа пользователя

DLT ошибки

  1. Убедиться, что таблица существует в MSSQL
  2. Проверить права доступа на чтение таблицы
  3. Проверить, что целевая схема существует в PostgreSQL

Лицензия

MIT

Поддержка

Для сообщений об ошибках и предложений создавайте Issues в репозитории.