Files
2026-03-29 23:24:15 +09:00

518 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Data Replication Service
Сервис репликации данных из множественных источников (MSSQL, PostgreSQL) в целевую PostgreSQL базу с использованием DLT (Data Load Tool), FastAPI и планировщика задач APScheduler.
## Особенности
-**Динамическое управление источниками данных** - Добавляйте и удаляйте источники через REST API без перезагрузки сервиса
- ✅ Поддержка множественных типов источников (MSSQL, PostgreSQL)
- ✅ Репликация таблиц с использованием DLT для оптимальной производительности
- ✅ Обработка логов изменений (Life таблицы с INSERT, UPDATE, DELETE операциями)
- ✅ Индивидуальное расписание для каждой таблицы (Cron выражения)
- ✅ Управление конфликтами времени выполнения (очередь задач)
- ✅ REST API для полного управления процессом репликации
- ✅ Кеширование подключений для оптимизации производительности
- ✅ Логирование и мониторинг задач репликации
- ✅ Health check и статистика
## Архитектурные улучшения
### Переход на DataSource-based конфигурацию
**Раньше (static конфигурация):**
- Источники данных определялись в `.env` файле
- Требовалась перезагрузка сервиса для изменения конфигурации
- Одна MSSQL и одна PostgreSQL база фиксированно
**Теперь (динамическая конфигурация):**
- Источники хранятся в БД (таблица `data_sources`)
- Полное управление через REST API
- Поддержка множественных источников каждого типа
- Без необходимости перезагрузки сервиса
- Кеширование подключений с TTL = 5 минут
## Структура проекта
```
replication_service/
├── main.py # Главное приложение FastAPI
├── config.py # Конфигурация и переменные окружения
├── models.py # ORM модели (SQLAlchemy) - включая DataSource
├── database.py # Управление подключениями с кешированием
├── api.py # REST API endpoints (DataSource + Migration)
├── replication.py # Логика репликации данных
├── scheduler.py # Планировщик задач с управлением конфликтами
├── migrate_to_data_sources.py # Скрипт миграции из старой конфигурации
├── examples_api.py # Примеры использования новой API
├── requirements.txt # Зависимости Python
├── DATA_SOURCES.md # Документация по управлению источниками
├── .env.example # Пример файла переменных окружения
└── README.md # Документация
```
## Установка
### 1. Создать виртуальное окружение
```bash
python -m venv venv
source venv/bin/activate # Linux/Mac
# или
venv\Scripts\activate # Windows
```
### 2. Установить зависимости
```bash
pip install -r requirements.txt
```
### 3. Настроить переменные окружения
Скопировать `.env.example` в `.env` и заполнить данные целевой базы данных PostgreSQL:
```bash
cp .env.example .env
```
```env
# PostgreSQL Connection (целевая база для всех репликаций)
POSTGRES_HOST=your_postgres_host
POSTGRES_DATABASE=replication_db
POSTGRES_USERNAME=postgres
POSTGRES_PASSWORD=your_password
POSTGRES_PORT=5432
```
**Важно:** Источники данных больше НЕ конфигурируются через `.env` файл!
### 4. Инициализировать базу данных
```bash
python -c "from database import DatabaseManager; DatabaseManager.init_postgres_db()"
```
## Запуск
```bash
python main.py
```
или
```bash
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
Сервис будет доступен по адресу: `http://localhost:8000`
## Быстрый старт с новой API
### 1. Создать источник данных
```bash
curl -X POST http://localhost:8000/api/v1/data-sources \
-H "Content-Type: application/json" \
-d '{
"name": "MSSQL Production",
"source_type": "mssql",
"host": "mssql-server.example.com",
"port": 1433,
"database": "MyDatabase",
"username": "sa",
"password": "Password123!"
}'
```
**Ответ:**
```json
{
"id": 1,
"name": "MSSQL Production",
"source_type": "mssql",
"host": "mssql-server.example.com",
"port": 1433,
"database": "MyDatabase",
"default_schema": "dbo",
"is_active": true,
"created_at": "2024-01-15T10:30:00",
"updated_at": "2024-01-15T10:30:00"
}
```
### 2. Проверить подключение
```bash
curl -X POST http://localhost:8000/api/v1/data-sources/1/test
```
**Ответ:**
```json
{
"source_id": 1,
"connection_ok": true
}
```
### 3. Создать расписание репликации
```bash
curl -X POST http://localhost:8000/api/v1/migration-tables \
-H "Content-Type: application/json" \
-d '{
"table_name": "Orders",
"source_id": 1,
"cron_schedule": "0 2 * * *",
"source_schema": "dbo",
"target_schema": "public"
}'
```
**Ответ:**
```json
{
"id": 1,
"table_name": "Orders",
"source_id": 1,
"source_schema": "dbo",
"target_schema": "public",
"cron_schedule": "0 2 * * *",
"is_active": true,
"created_at": "2024-01-15T10:35:00",
"updated_at": "2024-01-15T10:35:00"
}
```
### 4. Запустить примеры API
```bash
python examples_api.py
```
Этот скрипт демонстрирует все основные операции с API.
## API Endpoints
### Health Check
- `GET /api/v1/health` - Проверка здоровья сервиса
**Ответ:**
"status": "healthy",
"mssql_connected": true,
"postgres_connected": true,
"timestamp": "2026-03-29T10:00:00"
}
```
### Управление таблицами миграции
#### Создать таблицу для миграции
```bash
POST /api/v1/migration-tables
Content-Type: application/json
{
"table_name": "Orders",
"cron_schedule": "0 2 * * *", # Каждый день в 2:00
"source_schema": "dbo",
"target_schema": "public"
}
```
#### Получить список всех таблиц
```bash
GET /api/v1/migration-tables
```
#### Получить информацию о конкретной таблице
```bash
GET /api/v1/migration-tables/{table_id}
```
#### Удалить таблицу из миграции
```bash
DELETE /api/v1/migration-tables/{table_id}
```
### Управление планировщиком
#### Получить статус планировщика
```bash
GET /api/v1/scheduler/status
```
**Ответ:**
```json
{
"is_running": true,
"migration_tables_count": 5,
"running_jobs": {
"1": 1680000000.123
},
"queued_jobs": []
}
```
#### Запустить планировщик
```bash
POST /api/v1/scheduler/start
```
#### Остановить планировщик
```bash
POST /api/v1/scheduler/stop
```
#### Получить список задач планировщика
```bash
GET /api/v1/scheduler/jobs
```
### История и статистика
#### Получить список задач репликации
```bash
GET /api/v1/replication-jobs?limit=100&offset=0&status=success
```
#### Получить информацию о конкретной задаче
```bash
GET /api/v1/replication-jobs/{job_id}
```
#### Получить статистику
```bash
GET /api/v1/statistics
```
**Ответ:**
```json
{
"total_jobs": 150,
"successful_jobs": 148,
"failed_jobs": 2,
"success_rate": 98.67,
"timestamp": "2026-03-29T10:00:00"
}
```
## Примеры использования
### 1. Добавить таблицу для репликации с расписанием
```bash
curl -X POST "http://localhost:8000/api/v1/migration-tables" \
-H "Content-Type: application/json" \
-d '{
"table_name": "Customers",
"cron_schedule": "*/30 * * * *",
"source_schema": "dbo",
"target_schema": "public"
}'
```
Это расписание означает: "выполнять репликацию каждые 30 минут"
### 2. Добавить несколько таблиц с разным расписанием
```bash
# Orders - каждый день в 2:00 AM
curl -X POST "http://localhost:8000/api/v1/migration-tables" \
-H "Content-Type: application/json" \
-d '{"table_name": "Orders", "cron_schedule": "0 2 * * *"}'
# Products - каждый день в 3:00 AM
curl -X POST "http://localhost:8000/api/v1/migration-tables" \
-H "Content-Type: application/json" \
-d '{"table_name": "Products", "cron_schedule": "0 3 * * *"}'
```
### 3. Контролировать расписание при конфликтах
Если две таблицы имеют пересекающееся расписание, они будут автоматически добавлены в очередь и выполняться последовательно:
```bash
# Таблица 1: каждый день в 2:00 AM
# Таблица 2: каждый день в 2:15 AM
# Таблица 3: каждый день в 2:30 AM
# Все три задачи начнут выполняться в порядке очереди
```
## Спецификация Cron для расписания
**Формат:** `<минуты> <часы> <день месяца> <месяц> <день недели>`
**Примеры:**
- `0 2 * * *` - Каждый день в 2:00 AM
- `*/30 * * * *` - Каждые 30 минут
- `0 0 * * 0` - Каждое воскресенье в полночь
- `0 9,12,15 * * *` - В 9:00, 12:00 и 15:00 каждый день
- `0 */4 * * *` - Каждые 4 часа
- `0 0 1 * *` - Первый день каждого месяца в полночь
## Life таблицы
**Life таблицы** - это таблицы в MSSQL, которые хранят логи транзакций для оригинальных таблиц.
**Соглашение по именованию:**
- Оригинальная таблица: `Orders`
- Life таблица: `LifeOrders`
Сервис автоматически обрабатывает следующие операции:
- **INSERT** - Добавление новых записей
- **UPDATE** - Обновление существующих записей
- **DELETE** - Удаление записей
## Мониторинг и отладка
### Логи приложения
Логи выводятся с информацией о каждом этапе репликации:
```
2026-03-29 10:00:00 - scheduler - INFO - Starting replication for table Orders
2026-03-29 10:00:15 - replication - INFO - Successfully replicated 1250 rows from Orders
2026-03-29 10:00:25 - scheduler - INFO - Processing queued job for Products
```
### API Documentation
После запуска приложения, интерактивная документация доступна по адресам:
- Swagger UI: `http://localhost:8000/docs`
- ReDoc: `http://localhost:8000/redoc`
## Конфигурация
### DatabaseManager
```python
from database import DatabaseManager
# Проверить подключение к MSSQL
DatabaseManager.test_mssql_connection()
# Проверить подключение к PostgreSQL
DatabaseManager.test_postgres_connection()
# Инициализировать таблицы PostgreSQL
DatabaseManager.init_postgres_db()
```
### SchedulerManager
```python
from scheduler import scheduler_manager
# Добавить таблицу
scheduler_manager.add_migration_table(
table_name="Orders",
cron_schedule="0 2 * * *"
)
# Получить список таблиц
tables = scheduler_manager.get_migration_tables()
# Получить текущие выполняющиеся задачи
running = scheduler_manager.get_running_jobs()
# Получить задачи в очереди
queued = scheduler_manager.get_queued_jobs()
```
## Производительность
- **Асинхронная обработка**: Использует APScheduler для фонового выполнения
- **Очередь задач**: Автоматическое управление конфликтами расписания
- **Оптимизация DLT**:批量 обработка данных для минимизации операций с БД
- **Connection pooling**: Эффективное использование подключений к БД
## Безопасность
- Используйте переменные окружения для конфиденциальных данных (не коммитьте `.env`)
- Настройте `.env.example` как шаблон для других разработчиков
- Рассмотрите использование API ключей для защиты endpoints
- Логируйте все операции репликации для аудита
## Развертывание на производстве
### Docker
```dockerfile
FROM python:3.11
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
```
### Docker Compose
```yaml
version: '3.8'
services:
replication-service:
build: .
ports:
- "8000:8000"
environment:
MSSQL_SERVER: mssql
POSTGRES_HOST: postgres
depends_on:
- postgres
postgres:
image: postgres:15
environment:
POSTGRES_PASSWORD: postgres
```
## Решение проблем
### Ошибка подключения к MSSQL
1. Проверить, запущен ли сервер MSSQL
2. Проверить правильность USER/PASSWORD в `.env`
3. Убедиться, что брандмауэр не блокирует порт 1433
4. Проверить права доступа sa пользователя
### Ошибка подключения к PostgreSQL
1. Проверить, запущен ли сервер PostgreSQL
2. Проверить правильность HOST/PORT в `.env`
3. Убедиться, что база данных с заданным именем существует
4. Проверить права доступа пользователя
### DLT ошибки
1. Убедиться, что таблица существует в MSSQL
2. Проверить права доступа на чтение таблицы
3. Проверить, что целевая схема существует в PostgreSQL
## Лицензия
MIT
## Поддержка
Для сообщений об ошибках и предложений создавайте Issues в репозитории.