From a88375f72e177c07c3f69dea8c9f15995b1feeb5 Mon Sep 17 00:00:00 2001 From: brusnitsyn Date: Sun, 29 Mar 2026 23:24:15 +0900 Subject: [PATCH] first commit --- .gitignore | 138 ++++ replication_service/.env.example | 13 + replication_service/.gitignore | 138 ++++ replication_service/DATA_SOURCES.md | 367 +++++++++ replication_service/Dockerfile | 40 + replication_service/README.md | 517 ++++++++++++ replication_service/api.py | 739 ++++++++++++++++++ replication_service/config.py | 24 + replication_service/database.py | 292 +++++++ replication_service/docker-compose.yml | 83 ++ replication_service/examples.py | 340 ++++++++ replication_service/examples_api.py | 260 ++++++ replication_service/main.py | 93 +++ .../migrate_to_data_sources.py | 127 +++ replication_service/models.py | 116 +++ replication_service/replication.py | 381 +++++++++ replication_service/requirements.txt | 36 + replication_service/scheduler.py | 275 +++++++ replication_service/utils.py | 283 +++++++ 19 files changed, 4262 insertions(+) create mode 100644 .gitignore create mode 100644 replication_service/.env.example create mode 100644 replication_service/.gitignore create mode 100644 replication_service/DATA_SOURCES.md create mode 100644 replication_service/Dockerfile create mode 100644 replication_service/README.md create mode 100644 replication_service/api.py create mode 100644 replication_service/config.py create mode 100644 replication_service/database.py create mode 100644 replication_service/docker-compose.yml create mode 100644 replication_service/examples.py create mode 100644 replication_service/examples_api.py create mode 100644 replication_service/main.py create mode 100644 replication_service/migrate_to_data_sources.py create mode 100644 replication_service/models.py create mode 100644 replication_service/replication.py create mode 100644 replication_service/requirements.txt create mode 100644 replication_service/scheduler.py create mode 100644 replication_service/utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6318a6f --- /dev/null +++ b/.gitignore @@ -0,0 +1,138 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# PEP 582 +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# IDE settings +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Project specific +logs/ +*.db +*.sqlite3 +.dlt/ +dlt_data/ diff --git a/replication_service/.env.example b/replication_service/.env.example new file mode 100644 index 0000000..47d2697 --- /dev/null +++ b/replication_service/.env.example @@ -0,0 +1,13 @@ +# PostgreSQL Connection (назначение для всех репликаций) +POSTGRES_HOST=localhost +POSTGRES_DATABASE=replication_db +POSTGRES_USERNAME=postgres +POSTGRES_PASSWORD=postgres_password +POSTGRES_PORT=5432 + +# Redis (для кэширования и очередей) +REDIS_HOST=localhost +REDIS_PORT=6379 + +# API +API_KEY=your_secret_api_key_here diff --git a/replication_service/.gitignore b/replication_service/.gitignore new file mode 100644 index 0000000..6318a6f --- /dev/null +++ b/replication_service/.gitignore @@ -0,0 +1,138 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# PEP 582 +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# IDE settings +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Project specific +logs/ +*.db +*.sqlite3 +.dlt/ +dlt_data/ diff --git a/replication_service/DATA_SOURCES.md b/replication_service/DATA_SOURCES.md new file mode 100644 index 0000000..753459c --- /dev/null +++ b/replication_service/DATA_SOURCES.md @@ -0,0 +1,367 @@ +# Управление источниками данных + +## Обзор + +Новая архитектура позволяет управлять источниками данных динамически через REST API **без перезагрузки сервиса**. Конфигурация хранится в базе данных PostgreSQL, что обеспечивает: + +- **Динамическое управление**: Добавляйте, обновляйте и удаляйте источники во время работы сервиса +- **Кеширование подключений**: 5-минутный TTL для оптимизации производительности +- **Поддержка множественных источников**: MSSQL и PostgreSQL одновременно +- **Безопасность**: Пароли сохраняются в БД (TODO: добавить шифрование) + +## Структура данных + +### Таблица `data_sources` + +```sql +data_sources: + - id (int, PK) -- Уникальный ID источника + - name (string, UNIQUE) -- Имя источника для идентификации + - source_type (enum) -- mssql | pgsql + - host (string) -- Хост/IP адрес + - port (int) -- Порт подключения + - database (string) -- Имя базы данных + - username (string) -- Пользователь БД + - password (string) -- Пароль БД (TODO: шифровать) + - default_schema (string) -- Схема по умолчанию (dbo для MSSQL, public для PostgreSQL) + - is_active (bool) -- Активен ли источник (False = мягкое удаление) + - description (string) -- Описание источника + - created_at (datetime) -- Время создания + - updated_at (datetime) -- Время последнего изменения +``` + +### Таблица `migration_tables` + +```sql +migration_tables: + - id (int, PK) + - table_name (string) -- Имя таблицы для репликации + - source_id (int, FK) -- Связь на data_sources.id + - source_schema (string) -- Явная схема (если NULL, используется default_schema из DataSource) + - target_schema (string) -- Целевая схема в PostgreSQL + - cron_schedule (string) -- Cron выражение для расписания + - is_active (bool) -- Активна ли миграция + - created_at (datetime) + - updated_at (datetime) +``` + +## API Endpoints + +### Управление источниками данных + +#### 1. Создать источник данных +```bash +POST /api/v1/data-sources +Content-Type: application/json + +{ + "name": "MSSQL Production", + "source_type": "mssql", + "host": "mssql-server.example.com", + "port": 1433, + "database": "MyDatabase", + "username": "sa", + "password": "StrongPassword!", + "default_schema": "dbo", + "description": "Production MSSQL server" +} +``` + +**Ответ:** +```json +{ + "id": 1, + "name": "MSSQL Production", + "source_type": "mssql", + "host": "mssql-server.example.com", + "port": 1433, + "database": "MyDatabase", + "default_schema": "dbo", + "is_active": true, + "description": "Production MSSQL server", + "created_at": "2024-01-15T10:30:00", + "updated_at": "2024-01-15T10:30:00" +} +``` + +#### 2. Получить список источников +```bash +GET /api/v1/data-sources +GET /api/v1/data-sources?active_only=true # Только активные +``` + +**Ответ:** +```json +[ + { + "id": 1, + "name": "MSSQL Production", + "source_type": "mssql", + "host": "mssql-server.example.com", + "port": 1433, + "database": "MyDatabase", + "default_schema": "dbo", + "is_active": true, + "description": "Production MSSQL server", + "created_at": "2024-01-15T10:30:00", + "updated_at": "2024-01-15T10:30:00" + } +] +``` + +#### 3. Получить источник по ID +```bash +GET /api/v1/data-sources/{source_id} +``` + +#### 4. Обновить источник данных +```bash +PUT /api/v1/data-sources/{source_id} +Content-Type: application/json + +{ + "host": "new-mssql-server.example.com", + "port": 1433, + "is_active": true +} +``` + +#### 5. Удалить источник (мягкое удаление) +```bash +DELETE /api/v1/data-sources/{source_id} +``` + +**Ответ:** +```json +{ + "message": "DataSource 1 deactivated" +} +``` + +#### 6. Проверить подключение к источнику +```bash +POST /api/v1/data-sources/{source_id}/test +``` + +**Ответ:** +```json +{ + "source_id": 1, + "connection_ok": true +} +``` + +### Управление таблицами миграции + +#### 1. Создать миграцию таблицы +```bash +POST /api/v1/migration-tables +Content-Type: application/json + +{ + "table_name": "Customers", + "source_id": 1, + "cron_schedule": "0 2 * * *", # Каждый день в 2:00 AM + "source_schema": "dbo", # (опционально, если NULL, используется default_schema из DataSource) + "target_schema": "public" +} +``` + +**Ответ:** +```json +{ + "id": 1, + "table_name": "Customers", + "source_id": 1, + "source_schema": "dbo", + "target_schema": "public", + "cron_schedule": "0 2 * * *", + "is_active": true, + "created_at": "2024-01-15T10:35:00", + "updated_at": "2024-01-15T10:35:00" +} +``` + +#### 2. Получить список миграций +```bash +GET /api/v1/migration-tables +GET /api/v1/migration-tables?active_only=true # Только активные +``` + +#### 3. Обновить миграцию +```bash +PUT /api/v1/migration-tables/{table_id} +Content-Type: application/json + +{ + "cron_schedule": "0 3 * * *", # Изменить время на 3:00 AM + "source_schema": "dbo" +} +``` + +#### 4. Удалить миграцию +```bash +DELETE /api/v1/migration-tables/{table_id} +``` + +## Примеры использования + +### Пример 1: Добавить MSSQL источник и начать репликацию + +```bash +# 1. Создать источник MSSQL +curl -X POST http://localhost:8000/api/v1/data-sources \ + -H "Content-Type: application/json" \ + -d '{ + "name": "MSSQL Production", + "source_type": "mssql", + "host": "sqlserver.example.com", + "port": 1433, + "database": "SalesDB", + "username": "sa", + "password": "Password123!", + "default_schema": "dbo" + }' + +# 2. Проверить подключение +curl -X POST http://localhost:8000/api/v1/data-sources/1/test + +# 3. Создать миграцию для таблицы Orders +curl -X POST http://localhost:8000/api/v1/migration-tables \ + -H "Content-Type: application/json" \ + -d '{ + "table_name": "Orders", + "source_id": 1, + "cron_schedule": "0 2 * * *", + "source_schema": "dbo", + "target_schema": "public" + }' +``` + +### Пример 2: Добавить PostgreSQL источник + +```bash +curl -X POST http://localhost:8000/api/v1/data-sources \ + -H "Content-Type: application/json" \ + -d '{ + "name": "Postgres Analytics DB", + "source_type": "pgsql", + "host": "analytics.example.com", + "port": 5432, + "database": "analytics", + "username": "analyst", + "password": "AnalystPass456!", + "default_schema": "public", + "description": "Analytics database for weekly reports" + }' +``` + +### Пример 3: Переключиться на другой источник для существующей таблицы + +```bash +# Обновить миграцию чтобы использовать другой источник +curl -X PUT http://localhost:8000/api/v1/migration-tables/1 \ + -H "Content-Type: application/json" \ + -d '{ + "source_id": 2 # Переключиться на источник ID 2 + }' +``` + +### Пример 4: Получить статус здоровья сервиса + +```bash +curl http://localhost:8000/api/v1/health + +# Ответ: +{ + "status": "healthy", + "target_postgres": true, + "data_sources": { + "MSSQL Production (mssql)": true, + "Postgres Analytics DB (pgsql)": true + }, + "timestamp": "2024-01-15T10:45:00" +} +``` + +## Cron выражения + +Формат: `minute hour day month weekday` + +Примеры: +- `0 2 * * *` - Каждый день в 2:00 AM +- `0 */4 * * *` - Каждые 4 часа +- `0 0 * * MON` - Каждый понедельник в полночь +- `*/15 * * * *` - Каждые 15 минут +- `0 8-17 * * MON-FRI` - Каждый час от 8 до 17 в рабочие дни + +## Безопасность + +### Текущее состояние +⚠️ **Пароли хранятся в открытом виде в PostgreSQL** + +### Рекомендации для production +1. Добавить шифрование паролей (например, используя `cryptography` library) +2. Использовать переменные окружения для admin credentials +3. Ограничить доступ к API endpoints (HTTPS, authentication) +4. Аудитировать изменения в data_sources +5. Использовать connection pooling для оптимизации + +## Мониторинг + +### Статус репликации +```bash +GET /api/v1/replication-jobs +GET /api/v1/replication-jobs/{job_id} +``` + +### Статус планировщика +```bash +GET /api/v1/health +``` + +## Миграция из старой системы + +Если вы используете старую систему с environment variables: + +1. Создайте источники через API: +```bash +curl -X POST http://localhost:8000/api/v1/data-sources \ + -H "Content-Type: application/json" \ + -d '{ + "name": "MSSQL from ENV", + "source_type": "mssql", + "host": "'$MSSQL_SERVER'", + "port": '$MSSQL_PORT', + "database": "'$MSSQL_DATABASE'", + "username": "'$MSSQL_USERNAME'", + "password": "'$MSSQL_PASSWORD'", + "default_schema": "dbo" + }' +``` + +2. Обновите migration_tables с source_id вместо source_type +3. Удалите переменные окружения из .env + +## Troubleshooting + +### "DataSource not found" +- Проверьте что ID источника существует: `GET /api/v1/data-sources` +- Убедитесь что источник активен: `is_active: true` + +### "Connection failed" +- Проверьте подключение: `POST /api/v1/data-sources/{id}/test` +- Проверьте учетные данные (хост, порт, пароль) +- Убедитесь что firewall позволяет подключение + +### "Replication not starting" +- Проверьте что migration table активна: `is_active: true` +- Проверьте cron выражение синтаксис +- Проверьте логи: `docker logs replication-service` +- Убедитесь что таблица существует в источнике + +## Примечания + +- При обновлении источника (хост, порт, credentials) кеш подключений автоматически очищается +- При удалении источника (is_active=false) новые миграции не могут его использовать +- Существующие миграции продолжат работать но будут ошибаться если источник неактивен diff --git a/replication_service/Dockerfile b/replication_service/Dockerfile new file mode 100644 index 0000000..df54858 --- /dev/null +++ b/replication_service/Dockerfile @@ -0,0 +1,40 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Установить системные зависимости для ODBC и PostgreSQL +RUN apt-get update && apt-get install -y \ + unixodbc-dev \ + unixodbc \ + odbcinst \ + libpq-dev \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Установить ODBC Driver for SQL Server +RUN apt-get update && apt-get install -y \ + gnupg ca-certificates \ + && curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \ + && curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list \ + && apt-get update && ACCEPT_EULA=Y apt-get install -y msodbcsql17 \ + && rm -rf /var/lib/apt/lists/* + +# Копировать requirements.txt и установить зависимости +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Копировать код приложения +COPY . . + +# Создать директорию для логов +RUN mkdir -p /app/logs + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD python -c "import requests; requests.get('http://localhost:8000/api/v1/health')" || exit 1 + +# Run application +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/replication_service/README.md b/replication_service/README.md new file mode 100644 index 0000000..2c55de9 --- /dev/null +++ b/replication_service/README.md @@ -0,0 +1,517 @@ +# Data Replication Service + +Сервис репликации данных из множественных источников (MSSQL, PostgreSQL) в целевую PostgreSQL базу с использованием DLT (Data Load Tool), FastAPI и планировщика задач APScheduler. + +## Особенности + +- ✅ **Динамическое управление источниками данных** - Добавляйте и удаляйте источники через REST API без перезагрузки сервиса +- ✅ Поддержка множественных типов источников (MSSQL, PostgreSQL) +- ✅ Репликация таблиц с использованием DLT для оптимальной производительности +- ✅ Обработка логов изменений (Life таблицы с INSERT, UPDATE, DELETE операциями) +- ✅ Индивидуальное расписание для каждой таблицы (Cron выражения) +- ✅ Управление конфликтами времени выполнения (очередь задач) +- ✅ REST API для полного управления процессом репликации +- ✅ Кеширование подключений для оптимизации производительности +- ✅ Логирование и мониторинг задач репликации +- ✅ Health check и статистика + +## Архитектурные улучшения + +### Переход на DataSource-based конфигурацию + +**Раньше (static конфигурация):** +- Источники данных определялись в `.env` файле +- Требовалась перезагрузка сервиса для изменения конфигурации +- Одна MSSQL и одна PostgreSQL база фиксированно + +**Теперь (динамическая конфигурация):** +- Источники хранятся в БД (таблица `data_sources`) +- Полное управление через REST API +- Поддержка множественных источников каждого типа +- Без необходимости перезагрузки сервиса +- Кеширование подключений с TTL = 5 минут + +## Структура проекта + +``` +replication_service/ +├── main.py # Главное приложение FastAPI +├── config.py # Конфигурация и переменные окружения +├── models.py # ORM модели (SQLAlchemy) - включая DataSource +├── database.py # Управление подключениями с кешированием +├── api.py # REST API endpoints (DataSource + Migration) +├── replication.py # Логика репликации данных +├── scheduler.py # Планировщик задач с управлением конфликтами +├── migrate_to_data_sources.py # Скрипт миграции из старой конфигурации +├── examples_api.py # Примеры использования новой API +├── requirements.txt # Зависимости Python +├── DATA_SOURCES.md # Документация по управлению источниками +├── .env.example # Пример файла переменных окружения +└── README.md # Документация +``` + +## Установка + +### 1. Создать виртуальное окружение + +```bash +python -m venv venv +source venv/bin/activate # Linux/Mac +# или +venv\Scripts\activate # Windows +``` + +### 2. Установить зависимости + +```bash +pip install -r requirements.txt +``` + +### 3. Настроить переменные окружения + +Скопировать `.env.example` в `.env` и заполнить данные целевой базы данных PostgreSQL: + +```bash +cp .env.example .env +``` + +```env +# PostgreSQL Connection (целевая база для всех репликаций) +POSTGRES_HOST=your_postgres_host +POSTGRES_DATABASE=replication_db +POSTGRES_USERNAME=postgres +POSTGRES_PASSWORD=your_password +POSTGRES_PORT=5432 +``` + +**Важно:** Источники данных больше НЕ конфигурируются через `.env` файл! + +### 4. Инициализировать базу данных + +```bash +python -c "from database import DatabaseManager; DatabaseManager.init_postgres_db()" +``` + +## Запуск + +```bash +python main.py +``` + +или + +```bash +uvicorn main:app --reload --host 0.0.0.0 --port 8000 +``` + +Сервис будет доступен по адресу: `http://localhost:8000` + +## Быстрый старт с новой API + +### 1. Создать источник данных + +```bash +curl -X POST http://localhost:8000/api/v1/data-sources \ + -H "Content-Type: application/json" \ + -d '{ + "name": "MSSQL Production", + "source_type": "mssql", + "host": "mssql-server.example.com", + "port": 1433, + "database": "MyDatabase", + "username": "sa", + "password": "Password123!" + }' +``` + +**Ответ:** +```json +{ + "id": 1, + "name": "MSSQL Production", + "source_type": "mssql", + "host": "mssql-server.example.com", + "port": 1433, + "database": "MyDatabase", + "default_schema": "dbo", + "is_active": true, + "created_at": "2024-01-15T10:30:00", + "updated_at": "2024-01-15T10:30:00" +} +``` + +### 2. Проверить подключение + +```bash +curl -X POST http://localhost:8000/api/v1/data-sources/1/test +``` + +**Ответ:** +```json +{ + "source_id": 1, + "connection_ok": true +} +``` + +### 3. Создать расписание репликации + +```bash +curl -X POST http://localhost:8000/api/v1/migration-tables \ + -H "Content-Type: application/json" \ + -d '{ + "table_name": "Orders", + "source_id": 1, + "cron_schedule": "0 2 * * *", + "source_schema": "dbo", + "target_schema": "public" + }' +``` + +**Ответ:** +```json +{ + "id": 1, + "table_name": "Orders", + "source_id": 1, + "source_schema": "dbo", + "target_schema": "public", + "cron_schedule": "0 2 * * *", + "is_active": true, + "created_at": "2024-01-15T10:35:00", + "updated_at": "2024-01-15T10:35:00" +} +``` + +### 4. Запустить примеры API + +```bash +python examples_api.py +``` + +Этот скрипт демонстрирует все основные операции с API. + +## API Endpoints + +### Health Check + +- `GET /api/v1/health` - Проверка здоровья сервиса + +**Ответ:** + "status": "healthy", + "mssql_connected": true, + "postgres_connected": true, + "timestamp": "2026-03-29T10:00:00" +} +``` + +### Управление таблицами миграции + +#### Создать таблицу для миграции + +```bash +POST /api/v1/migration-tables +Content-Type: application/json + +{ + "table_name": "Orders", + "cron_schedule": "0 2 * * *", # Каждый день в 2:00 + "source_schema": "dbo", + "target_schema": "public" +} +``` + +#### Получить список всех таблиц + +```bash +GET /api/v1/migration-tables +``` + +#### Получить информацию о конкретной таблице + +```bash +GET /api/v1/migration-tables/{table_id} +``` + +#### Удалить таблицу из миграции + +```bash +DELETE /api/v1/migration-tables/{table_id} +``` + +### Управление планировщиком + +#### Получить статус планировщика + +```bash +GET /api/v1/scheduler/status +``` + +**Ответ:** +```json +{ + "is_running": true, + "migration_tables_count": 5, + "running_jobs": { + "1": 1680000000.123 + }, + "queued_jobs": [] +} +``` + +#### Запустить планировщик + +```bash +POST /api/v1/scheduler/start +``` + +#### Остановить планировщик + +```bash +POST /api/v1/scheduler/stop +``` + +#### Получить список задач планировщика + +```bash +GET /api/v1/scheduler/jobs +``` + +### История и статистика + +#### Получить список задач репликации + +```bash +GET /api/v1/replication-jobs?limit=100&offset=0&status=success +``` + +#### Получить информацию о конкретной задаче + +```bash +GET /api/v1/replication-jobs/{job_id} +``` + +#### Получить статистику + +```bash +GET /api/v1/statistics +``` + +**Ответ:** +```json +{ + "total_jobs": 150, + "successful_jobs": 148, + "failed_jobs": 2, + "success_rate": 98.67, + "timestamp": "2026-03-29T10:00:00" +} +``` + +## Примеры использования + +### 1. Добавить таблицу для репликации с расписанием + +```bash +curl -X POST "http://localhost:8000/api/v1/migration-tables" \ + -H "Content-Type: application/json" \ + -d '{ + "table_name": "Customers", + "cron_schedule": "*/30 * * * *", + "source_schema": "dbo", + "target_schema": "public" + }' +``` + +Это расписание означает: "выполнять репликацию каждые 30 минут" + +### 2. Добавить несколько таблиц с разным расписанием + +```bash +# Orders - каждый день в 2:00 AM +curl -X POST "http://localhost:8000/api/v1/migration-tables" \ + -H "Content-Type: application/json" \ + -d '{"table_name": "Orders", "cron_schedule": "0 2 * * *"}' + +# Products - каждый день в 3:00 AM +curl -X POST "http://localhost:8000/api/v1/migration-tables" \ + -H "Content-Type: application/json" \ + -d '{"table_name": "Products", "cron_schedule": "0 3 * * *"}' +``` + +### 3. Контролировать расписание при конфликтах + +Если две таблицы имеют пересекающееся расписание, они будут автоматически добавлены в очередь и выполняться последовательно: + +```bash +# Таблица 1: каждый день в 2:00 AM +# Таблица 2: каждый день в 2:15 AM +# Таблица 3: каждый день в 2:30 AM + +# Все три задачи начнут выполняться в порядке очереди +``` + +## Спецификация Cron для расписания + +**Формат:** `<минуты> <часы> <день месяца> <месяц> <день недели>` + +**Примеры:** + +- `0 2 * * *` - Каждый день в 2:00 AM +- `*/30 * * * *` - Каждые 30 минут +- `0 0 * * 0` - Каждое воскресенье в полночь +- `0 9,12,15 * * *` - В 9:00, 12:00 и 15:00 каждый день +- `0 */4 * * *` - Каждые 4 часа +- `0 0 1 * *` - Первый день каждого месяца в полночь + +## Life таблицы + +**Life таблицы** - это таблицы в MSSQL, которые хранят логи транзакций для оригинальных таблиц. + +**Соглашение по именованию:** +- Оригинальная таблица: `Orders` +- Life таблица: `LifeOrders` + +Сервис автоматически обрабатывает следующие операции: +- **INSERT** - Добавление новых записей +- **UPDATE** - Обновление существующих записей +- **DELETE** - Удаление записей + +## Мониторинг и отладка + +### Логи приложения + +Логи выводятся с информацией о каждом этапе репликации: + +``` +2026-03-29 10:00:00 - scheduler - INFO - Starting replication for table Orders +2026-03-29 10:00:15 - replication - INFO - Successfully replicated 1250 rows from Orders +2026-03-29 10:00:25 - scheduler - INFO - Processing queued job for Products +``` + +### API Documentation + +После запуска приложения, интерактивная документация доступна по адресам: +- Swagger UI: `http://localhost:8000/docs` +- ReDoc: `http://localhost:8000/redoc` + +## Конфигурация + +### DatabaseManager + +```python +from database import DatabaseManager + +# Проверить подключение к MSSQL +DatabaseManager.test_mssql_connection() + +# Проверить подключение к PostgreSQL +DatabaseManager.test_postgres_connection() + +# Инициализировать таблицы PostgreSQL +DatabaseManager.init_postgres_db() +``` + +### SchedulerManager + +```python +from scheduler import scheduler_manager + +# Добавить таблицу +scheduler_manager.add_migration_table( + table_name="Orders", + cron_schedule="0 2 * * *" +) + +# Получить список таблиц +tables = scheduler_manager.get_migration_tables() + +# Получить текущие выполняющиеся задачи +running = scheduler_manager.get_running_jobs() + +# Получить задачи в очереди +queued = scheduler_manager.get_queued_jobs() +``` + +## Производительность + +- **Асинхронная обработка**: Использует APScheduler для фонового выполнения +- **Очередь задач**: Автоматическое управление конфликтами расписания +- **Оптимизация DLT**:批量 обработка данных для минимизации операций с БД +- **Connection pooling**: Эффективное использование подключений к БД + +## Безопасность + +- Используйте переменные окружения для конфиденциальных данных (не коммитьте `.env`) +- Настройте `.env.example` как шаблон для других разработчиков +- Рассмотрите использование API ключей для защиты endpoints +- Логируйте все операции репликации для аудита + +## Развертывание на производстве + +### Docker + +```dockerfile +FROM python:3.11 + +WORKDIR /app + +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY . . + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] +``` + +### Docker Compose + +```yaml +version: '3.8' + +services: + replication-service: + build: . + ports: + - "8000:8000" + environment: + MSSQL_SERVER: mssql + POSTGRES_HOST: postgres + depends_on: + - postgres + + postgres: + image: postgres:15 + environment: + POSTGRES_PASSWORD: postgres +``` + +## Решение проблем + +### Ошибка подключения к MSSQL + +1. Проверить, запущен ли сервер MSSQL +2. Проверить правильность USER/PASSWORD в `.env` +3. Убедиться, что брандмауэр не блокирует порт 1433 +4. Проверить права доступа sa пользователя + +### Ошибка подключения к PostgreSQL + +1. Проверить, запущен ли сервер PostgreSQL +2. Проверить правильность HOST/PORT в `.env` +3. Убедиться, что база данных с заданным именем существует +4. Проверить права доступа пользователя + +### DLT ошибки + +1. Убедиться, что таблица существует в MSSQL +2. Проверить права доступа на чтение таблицы +3. Проверить, что целевая схема существует в PostgreSQL + +## Лицензия + +MIT + +## Поддержка + +Для сообщений об ошибках и предложений создавайте Issues в репозитории. diff --git a/replication_service/api.py b/replication_service/api.py new file mode 100644 index 0000000..8289fd6 --- /dev/null +++ b/replication_service/api.py @@ -0,0 +1,739 @@ +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel, ConfigDict +from typing import List, Optional, Dict +from datetime import datetime + +from database import PostgresSessionLocal, DatabaseManager +from models import MigrationTable, ReplicationJob, ReplicationStatus, DataSource, SourceType, TargetDatabase, DatabaseType +from scheduler import scheduler_manager + +router = APIRouter(prefix="/api/v1", tags=["replication"]) + + +# ========== TargetDatabase Pydantic Models ========== +class TargetDatabaseCreate(BaseModel): + """Модель для создания целевой БД""" + name: str + db_type: str = "pgsql" # pgsql или mssql + host: str + port: int + database: str + username: str + password: str + is_default: bool = False + description: Optional[str] = None + + +class TargetDatabaseUpdate(BaseModel): + """Модель для обновления целевой БД""" + name: Optional[str] = None + host: Optional[str] = None + port: Optional[int] = None + database: Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None + is_active: Optional[bool] = None + is_default: Optional[bool] = None + description: Optional[str] = None + + +class TargetDatabaseResponse(BaseModel): + """Модель ответа для целевой БД""" + model_config = ConfigDict(from_attributes=True) + + id: int + name: str + db_type: str + host: str + port: int + database: str + is_active: bool + is_default: bool + description: Optional[str] = None + created_at: datetime + updated_at: datetime + + +# ========== DataSource Pydantic Models ========== +class DataSourceCreate(BaseModel): + """Модель для создания источника данных""" + name: str + source_type: str # "mssql" или "pgsql" + host: str + port: int + database: str + username: str + password: str + default_schema: str = "dbo" + description: Optional[str] = None + + +class DataSourceUpdate(BaseModel): + """Модель для обновления источника данных""" + name: Optional[str] = None + host: Optional[str] = None + port: Optional[int] = None + database: Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None + default_schema: Optional[str] = None + description: Optional[str] = None + is_active: Optional[bool] = None + + +class DataSourceResponse(BaseModel): + """Модель ответа для источника данных""" + model_config = ConfigDict(from_attributes=True) + + id: int + name: str + source_type: str + host: str + port: int + database: str + default_schema: str + is_active: bool + description: Optional[str] = None + created_at: datetime + updated_at: datetime + + +# ========== MigrationTable Pydantic Models ========== +class MigrationTableCreate(BaseModel): + """Модель для создания таблицы миграции""" + table_name: str + source_id: int # ID источника из таблицы DataSource + cron_schedule: str + source_schema: Optional[str] = None # Если None, используется default_schema из DataSource + target_schema: str = "public" + target_id: Optional[int] = None # ID целевой БД (если None, используется default) + target_table_name: Optional[str] = None # Переименование таблицы + column_mapping: Optional[Dict[str, str]] = None # Переименование столбцов + use_life_table: bool = False # Обрабатывать Life таблицы + life_excluded_fields: Optional[List[str]] = None # Сервисные поля для исключения + + +class MigrationTableUpdate(BaseModel): + """Модель для обновления таблицы миграции""" + table_name: Optional[str] = None + source_id: Optional[int] = None + cron_schedule: Optional[str] = None + source_schema: Optional[str] = None + target_schema: Optional[str] = None + target_id: Optional[int] = None + target_table_name: Optional[str] = None + column_mapping: Optional[Dict[str, str]] = None + use_life_table: Optional[bool] = None + life_excluded_fields: Optional[List[str]] = None + is_active: Optional[bool] = None + + +class MigrationTableResponse(BaseModel): + """Модель ответа для таблицы миграции""" + model_config = ConfigDict(from_attributes=True) + + id: int + table_name: str + source_id: int + target_id: Optional[int] + source_schema: Optional[str] + target_schema: str + target_table_name: Optional[str] + column_mapping: Optional[Dict[str, str]] + use_life_table: bool + life_excluded_fields: Optional[List[str]] + cron_schedule: str + is_active: bool + created_at: datetime + updated_at: datetime + + +# ========== ReplicationJob Models ========== +class ReplicationJobResponse(BaseModel): + """Модель ответа для задачи репликации""" + model_config = ConfigDict(from_attributes=True) + + id: int + table_id: int + table_name: str + status: str + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + rows_processed: int + error_message: Optional[str] = None + created_at: datetime + + +class SchedulerStatusResponse(BaseModel): + """Модель ответа статуса планировщика""" + is_running: bool + migration_tables_count: int + running_jobs: dict + queued_jobs: list + + +# ========== Health Check ========== +@router.get("/health") +def health_check(): + """Проверка здоровья сервиса""" + session = PostgresSessionLocal() + postgres_ok = DatabaseManager.test_postgres_connection() + + # Получить все активные источники + try: + data_sources = session.query(DataSource).filter(DataSource.is_active == True).all() + sources_status = {} + for source in data_sources: + is_ok = DatabaseManager.test_source_connection(source.id) + sources_status[f"{source.name} ({source.source_type.value})"] = is_ok + except Exception as e: + sources_status = {"error": str(e)} + finally: + session.close() + + # Здоров если есть целевая БД и хотя бы один активный источник + is_healthy = postgres_ok and any(sources_status.values()) if sources_status else False + + return { + "status": "healthy" if is_healthy else "degraded", + "target_postgres": postgres_ok, + "data_sources": sources_status, + "timestamp": datetime.utcnow() + } + + +# ========== TargetDatabase Endpoints ========== +@router.post("/target-databases", response_model=TargetDatabaseResponse) +def create_target_database(db: TargetDatabaseCreate): + """Создать новую целевую БД""" + session = PostgresSessionLocal() + try: + # Проверить существование + existing = session.query(TargetDatabase).filter(TargetDatabase.name == db.name).first() + if existing: + raise HTTPException(status_code=400, detail=f"TargetDatabase with name '{db.name}' already exists") + + # Проверить db_type + try: + DatabaseType(db.db_type) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid db_type: {db.db_type}. Must be 'pgsql' or 'mssql'") + + # Если это default, повернуть остальные как не default + if db.is_default: + session.query(TargetDatabase).update({TargetDatabase.is_default: False}) + + new_db = TargetDatabase( + name=db.name, + db_type=DatabaseType(db.db_type), + host=db.host, + port=db.port, + database=db.database, + username=db.username, + password=db.password, + is_default=db.is_default, + description=db.description + ) + session.add(new_db) + session.commit() + + return TargetDatabaseResponse.model_validate(new_db) + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.get("/target-databases", response_model=List[TargetDatabaseResponse]) +def list_target_databases(active_only: bool = Query(False)): + """Получить список целевых БД""" + session = PostgresSessionLocal() + try: + query = session.query(TargetDatabase) + if active_only: + query = query.filter(TargetDatabase.is_active == True) + dbs = query.all() + return [TargetDatabaseResponse.model_validate(db) for db in dbs] + finally: + session.close() + + +@router.get("/target-databases/{target_id}", response_model=TargetDatabaseResponse) +def get_target_database(target_id: int): + """Получить целевую БД по ID""" + session = PostgresSessionLocal() + try: + db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() + if not db: + raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found") + return TargetDatabaseResponse.model_validate(db) + finally: + session.close() + + +@router.put("/target-databases/{target_id}", response_model=TargetDatabaseResponse) +def update_target_database(target_id: int, update: TargetDatabaseUpdate): + """Обновить целевую БД""" + session = PostgresSessionLocal() + try: + db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() + if not db: + raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found") + + update_data = update.model_dump(exclude_unset=True) + if update_data: + for key, value in update_data.items(): + if hasattr(db, key): + setattr(db, key, value) + + # Если это default, повернуть остальные + if update.is_default == True: + session.query(TargetDatabase).filter(TargetDatabase.id != target_id).update( + {TargetDatabase.is_default: False} + ) + + session.commit() + DatabaseManager.clear_engine_cache(target_id=target_id) + return TargetDatabaseResponse.model_validate(db) + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.delete("/target-databases/{target_id}") +def delete_target_database(target_id: int): + """Удалить целевую БД (мягкое удаление)""" + session = PostgresSessionLocal() + try: + db = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() + if not db: + raise HTTPException(status_code=404, detail=f"TargetDatabase with ID {target_id} not found") + + db.is_active = False + session.commit() + DatabaseManager.clear_engine_cache(target_id=target_id) + + return {"message": f"TargetDatabase {target_id} deactivated"} + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.post("/target-databases/{target_id}/test") +def test_target_database_connection(target_id: int): + """Проверить подключение к целевой БД""" + try: + is_ok = DatabaseManager.test_target_connection(target_id) + return {"target_id": target_id, "connection_ok": is_ok} + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +# ========== DataSource Endpoints ========== +@router.post("/data-sources", response_model=DataSourceResponse) +def create_data_source(source: DataSourceCreate): + """Создать новый источник данных""" + session = PostgresSessionLocal() + try: + # Проверить что источник с таким именем еще не существует + existing = session.query(DataSource).filter(DataSource.name == source.name).first() + if existing: + raise HTTPException(status_code=400, detail=f"DataSource with name '{source.name}' already exists") + + # Проверить что source_type валиден + try: + SourceType(source.source_type) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid source_type: {source.source_type}. Must be 'mssql' or 'pgsql'") + + # Создать новый источник + new_source = DataSource( + name=source.name, + source_type=SourceType(source.source_type), + host=source.host, + port=source.port, + database=source.database, + username=source.username, + password=source.password, + default_schema=source.default_schema, + description=source.description + ) + session.add(new_source) + session.commit() + + return DataSourceResponse.model_validate(new_source) + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.get("/data-sources", response_model=List[DataSourceResponse]) +def list_data_sources(active_only: bool = Query(False)): + """Получить список источников данных""" + session = PostgresSessionLocal() + try: + query = session.query(DataSource) + if active_only: + query = query.filter(DataSource.is_active == True) + sources = query.all() + return [DataSourceResponse.model_validate(s) for s in sources] + finally: + session.close() + + +@router.get("/data-sources/{source_id}", response_model=DataSourceResponse) +def get_data_source(source_id: int): + """Получить источник данных по ID""" + session = PostgresSessionLocal() + try: + source = session.query(DataSource).filter(DataSource.id == source_id).first() + if not source: + raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found") + return DataSourceResponse.model_validate(source) + finally: + session.close() + + +@router.put("/data-sources/{source_id}", response_model=DataSourceResponse) +def update_data_source(source_id: int, update: DataSourceUpdate): + """Обновить источник данных""" + session = PostgresSessionLocal() + try: + source = session.query(DataSource).filter(DataSource.id == source_id).first() + if not source: + raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found") + + # Обновить только переданные поля + update_data = update.model_dump(exclude_unset=True) + if update_data: + for key, value in update_data.items(): + if hasattr(source, key): + setattr(source, key, value) + + session.commit() + return DataSourceResponse.model_validate(source) + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.delete("/data-sources/{source_id}") +def delete_data_source(source_id: int): + """Удалить источник данных (мягкое удаление - установить is_active=False)""" + session = PostgresSessionLocal() + try: + source = session.query(DataSource).filter(DataSource.id == source_id).first() + if not source: + raise HTTPException(status_code=404, detail=f"DataSource with ID {source_id} not found") + + # Мягкое удаление + source.is_active = False + session.commit() + + # Очистить кеш подключений + DatabaseManager.clear_engine_cache(source_id) + + return {"message": f"DataSource {source_id} deactivated"} + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.post("/data-sources/{source_id}/test") +def test_data_source_connection(source_id: int): + """Проверить подключение к источнику данных""" + try: + is_ok = DatabaseManager.test_source_connection(source_id) + return {"source_id": source_id, "connection_ok": is_ok} + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +# ========== Migration Tables Endpoints ========== +@router.post("/migration-tables", response_model=MigrationTableResponse) +def create_migration_table(table: MigrationTableCreate): + """Создать новую таблицу для миграции""" + session = PostgresSessionLocal() + try: + # Проверить что источник существует и активен + source = session.query(DataSource).filter(DataSource.id == table.source_id).first() + if not source: + raise HTTPException(status_code=404, detail=f"DataSource with ID {table.source_id} not found") + if not source.is_active: + raise HTTPException(status_code=400, detail=f"DataSource {table.source_id} is inactive") + + # Создать запись миграции + migration_table = MigrationTable( + table_name=table.table_name, + source_id=table.source_id, + target_id=table.target_id, # Может быть None + source_schema=table.source_schema, + target_schema=table.target_schema, + target_table_name=table.target_table_name, + column_mapping=table.column_mapping, + use_life_table=table.use_life_table, + life_excluded_fields=table.life_excluded_fields, + cron_schedule=table.cron_schedule, + is_active=True + ) + session.add(migration_table) + session.commit() + + # Добавить в планировщик + scheduler_manager.add_job(migration_table) + + return MigrationTableResponse.model_validate(migration_table) + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.get("/migration-tables", response_model=List[MigrationTableResponse]) +def list_migration_tables(active_only: bool = Query(False)): + """Получить список таблиц для миграции""" + session = PostgresSessionLocal() + try: + query = session.query(MigrationTable) + if active_only: + query = query.filter(MigrationTable.is_active == True) + tables = query.all() + return [MigrationTableResponse.model_validate(t) for t in tables] + finally: + session.close() + + +@router.get("/migration-tables/{table_id}", response_model=MigrationTableResponse) +def get_migration_table(table_id: int): + """Получить таблицу миграции по ID""" + session = PostgresSessionLocal() + try: + table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first() + if not table: + raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found") + return MigrationTableResponse.model_validate(table) + finally: + session.close() + + +@router.put("/migration-tables/{table_id}", response_model=MigrationTableResponse) +def update_migration_table(table_id: int, update: MigrationTableUpdate): + """Обновить таблицу миграции""" + session = PostgresSessionLocal() + try: + table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first() + if not table: + raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found") + + # Проверить что источник существует если изменяется + if update.source_id and update.source_id != table.source_id: + source = session.query(DataSource).filter(DataSource.id == update.source_id).first() + if not source: + raise HTTPException(status_code=404, detail=f"DataSource with ID {update.source_id} not found") + if not source.is_active: + raise HTTPException(status_code=400, detail=f"DataSource {update.source_id} is inactive") + + # Обновить только переданные поля + update_data = update.model_dump(exclude_unset=True) + if update_data: + for key, value in update_data.items(): + if hasattr(table, key): + setattr(table, key, value) + + session.commit() + + # Пересоздать job в планировщике если нужно + scheduler_manager.remove_job(table_id) + scheduler_manager.add_job(table) + + return MigrationTableResponse.model_validate(table) + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +@router.delete("/migration-tables/{table_id}") +def delete_migration_table(table_id: int): + """Удалить таблицу миграции""" + session = PostgresSessionLocal() + try: + table = session.query(MigrationTable).filter(MigrationTable.id == table_id).first() + if not table: + raise HTTPException(status_code=404, detail=f"Migration table with ID {table_id} not found") + + # Удалить из планировщика + scheduler_manager.remove_job(table_id) + + # Удалить из БД + session.delete(table) + session.commit() + + return {"message": f"Migration table {table_id} deleted"} + except HTTPException: + raise + except Exception as e: + session.rollback() + raise HTTPException(status_code=400, detail=str(e)) + finally: + session.close() + + +# ========== Replication Jobs Endpoints ========== +@router.get("/replication-jobs", response_model=List[ReplicationJobResponse]) +def list_replication_jobs( + limit: int = Query(100, ge=1, le=1000), + offset: int = Query(0, ge=0), + status: Optional[str] = None +): + """Получить историю задач репликации""" + try: + session = PostgresSessionLocal() + query = session.query(ReplicationJob) + + if status: + query = query.filter(ReplicationJob.status == status) + + jobs = query.order_by(ReplicationJob.created_at.desc()).offset(offset).limit(limit).all() + session.close() + + return [ReplicationJobResponse.model_validate(job.__dict__) for job in jobs] + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/replication-jobs/{job_id}", response_model=ReplicationJobResponse) +def get_replication_job(job_id: int): + """Получить информацию о конкретной задаче репликации""" + try: + session = PostgresSessionLocal() + job = session.query(ReplicationJob).filter( + ReplicationJob.id == job_id + ).first() + session.close() + + if job: + return ReplicationJobResponse.model_validate(job.__dict__) + else: + raise HTTPException(status_code=404, detail="Replication job not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +# Scheduler endpoints +@router.get("/scheduler/status", response_model=SchedulerStatusResponse) +def get_scheduler_status(): + """Получить статус планировщика""" + tables = scheduler_manager.get_migration_tables() + running = scheduler_manager.get_running_jobs() + queued = scheduler_manager.get_queued_jobs() + + return SchedulerStatusResponse( + is_running=scheduler_manager.scheduler.running, + migration_tables_count=len(tables), + running_jobs=running, + queued_jobs=queued + ) + + +@router.post("/scheduler/start") +def start_scheduler(): + """Запустить планировщик""" + try: + if not scheduler_manager.scheduler.running: + scheduler_manager.start() + return {"message": "Scheduler started successfully"} + else: + return {"message": "Scheduler is already running"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/scheduler/stop") +def stop_scheduler(): + """Остановить планировщик""" + try: + if scheduler_manager.scheduler.running: + scheduler_manager.stop() + return {"message": "Scheduler stopped successfully"} + else: + return {"message": "Scheduler is not running"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/scheduler/jobs") +def get_scheduler_jobs(): + """Получить список всех задач в планировщике""" + try: + jobs = scheduler_manager.scheduler.get_jobs() + return { + "total_jobs": len(jobs), + "jobs": [ + { + "id": job.id, + "name": job.name, + "next_run_time": str(job.next_run_time) + } + for job in jobs + ] + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +# Statistics endpoints +@router.get("/statistics") +def get_statistics(): + """Получить статистику репликации""" + try: + session = PostgresSessionLocal() + + total_jobs = session.query(ReplicationJob).count() + successful_jobs = session.query(ReplicationJob).filter( + ReplicationJob.status == ReplicationStatus.SUCCESS + ).count() + failed_jobs = session.query(ReplicationJob).filter( + ReplicationJob.status == ReplicationStatus.FAILED + ).count() + + session.close() + + return { + "total_jobs": total_jobs, + "successful_jobs": successful_jobs, + "failed_jobs": failed_jobs, + "success_rate": (successful_jobs / total_jobs * 100) if total_jobs > 0 else 0, + "timestamp": datetime.utcnow() + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/replication_service/config.py b/replication_service/config.py new file mode 100644 index 0000000..41f3401 --- /dev/null +++ b/replication_service/config.py @@ -0,0 +1,24 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict +from typing import Optional + + +class Settings(BaseSettings): + """Конфигурация приложения""" + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") + + # PostgreSQL Target Connection (назначение для всех репликаций) + postgres_host: str + postgres_database: str + postgres_username: str + postgres_password: str + postgres_port: int = 5432 + + # Redis for Celery + redis_host: str = "localhost" + redis_port: int = 6379 + + # Other settings + api_key: Optional[str] = None + + +settings = Settings() \ No newline at end of file diff --git a/replication_service/database.py b/replication_service/database.py new file mode 100644 index 0000000..2f4b373 --- /dev/null +++ b/replication_service/database.py @@ -0,0 +1,292 @@ +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker, Session +from config import settings +from models import Base, DataSource, TargetDatabase, SourceType, DatabaseType +import pyodbc +import logging +from typing import Optional, Dict +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + +# PostgreSQL engine для хранения конфигурации и логов +postgres_engine = create_engine( + f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@" + f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}", + echo=False, + pool_pre_ping=True +) + +PostgresSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=postgres_engine) + +# PostgreSQL Target engine (назначение для всех репликаций по умолчанию) +POSTGRES_CONNECTION_STRING = ( + f"postgresql+psycopg2://{settings.postgres_username}:{settings.postgres_password}@" + f"{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_database}" +) + +postgres_target_engine = create_engine(POSTGRES_CONNECTION_STRING, echo=False) + +# Кеши для подключений +_source_engines_cache: Dict[int, object] = {} +_target_engines_cache: Dict[int, object] = {} +_cache_timestamp: Dict[int, datetime] = {} +CACHE_TTL_SECONDS = 300 # 5 минут + + +class DatabaseManager: + """Управление операциями с базами данных""" + + @staticmethod + def init_postgres_db(): + """Инициализация таблиц в PostgreSQL""" + Base.metadata.create_all(bind=postgres_engine) + logger.info("PostgreSQL database initialized") + + @staticmethod + def get_postgres_session() -> Session: + """Получить сессию для работы с PostgreSQL""" + return PostgresSessionLocal() + + @staticmethod + def _build_connection_string(source: DataSource) -> str: + """Строить строку подключения из DataSource""" + if source.source_type == SourceType.MSSQL: + return ( + f"mssql+pyodbc://{source.username}:{source.password}@" + f"{source.host}:{source.port}/{source.database}?" + f"driver=ODBC+Driver+17+for+SQL+Server" + ) + elif source.source_type == SourceType.PGSQL: + return ( + f"postgresql+psycopg2://{source.username}:{source.password}@" + f"{source.host}:{source.port}/{source.database}" + ) + else: + raise ValueError(f"Unknown source type: {source.source_type}") + + @staticmethod + def _is_cache_valid(source_id: int) -> bool: + """Проверить валидность кеша для источника""" + if source_id not in _cache_timestamp: + return False + elapsed = (datetime.utcnow() - _cache_timestamp[source_id]).total_seconds() + return elapsed < CACHE_TTL_SECONDS + + @staticmethod + def get_source_engine(source_id: int): + """Получить engine для источника данных по ID (с кешированием)""" + # Проверить кеш + if source_id in _source_engines_cache and DatabaseManager._is_cache_valid(source_id): + logger.debug(f"Using cached engine for source {source_id}") + return _source_engines_cache[source_id] + + # Загрузить DataSource из БД + session = DatabaseManager.get_postgres_session() + try: + data_source = session.query(DataSource).filter(DataSource.id == source_id).first() + if not data_source: + raise ValueError(f"DataSource with ID {source_id} not found") + + if not data_source.is_active: + raise ValueError(f"DataSource {source_id} is inactive") + + # Создать new engine + connection_string = DatabaseManager._build_connection_string(data_source) + engine = create_engine(connection_string, echo=False, pool_pre_ping=True) + + # Кешировать + _source_engines_cache[source_id] = engine + _cache_timestamp[source_id] = datetime.utcnow() + + logger.info(f"Created engine for source {source_id}: {data_source.name}") + return engine + finally: + session.close() + + @staticmethod + def get_data_source(source_id: int) -> Optional[DataSource]: + """Получить DataSource по ID""" + session = DatabaseManager.get_postgres_session() + try: + return session.query(DataSource).filter(DataSource.id == source_id).first() + finally: + session.close() + + @staticmethod + def test_source_connection(source_id: int) -> bool: + """Проверить подключение к источнику""" + try: + engine = DatabaseManager.get_source_engine(source_id) + with engine.connect() as connection: + connection.execute(text("SELECT 1")) + logger.info(f"Source {source_id} connection successful") + return True + except Exception as e: + logger.error(f"Source {source_id} connection failed: {e}") + return False + + @staticmethod + def test_postgres_connection(): + """Проверка подключения к PostgreSQL назначению""" + try: + with postgres_engine.connect() as connection: + result = connection.execute(text("SELECT 1")) + logger.info("PostgreSQL target connection successful") + return True + except Exception as e: + logger.error(f"PostgreSQL target connection failed: {e}") + return False + + @staticmethod + def clear_engine_cache(source_id: int = None, target_id: int = None): + """Очистить кеш подключений (для конкретного источника/целевой БД или всех)""" + if source_id: + _source_engines_cache.pop(source_id, None) + _cache_timestamp.pop(f"source_{source_id}", None) + logger.info(f"Cleared cache for source {source_id}") + elif target_id: + _target_engines_cache.pop(target_id, None) + _cache_timestamp.pop(f"target_{target_id}", None) + logger.info(f"Cleared cache for target {target_id}") + else: + _source_engines_cache.clear() + _target_engines_cache.clear() + _cache_timestamp.clear() + logger.info("Cleared all engine caches") + + @staticmethod + def _build_target_connection_string(target: TargetDatabase) -> str: + """Строить строку подключения для целевой БД""" + if target.db_type == DatabaseType.PGSQL: + return ( + f"postgresql+psycopg2://{target.username}:{target.password}@" + f"{target.host}:{target.port}/{target.database}" + ) + elif target.db_type == DatabaseType.MSSQL: + return ( + f"mssql+pyodbc://{target.username}:{target.password}@" + f"{target.host}:{target.port}/{target.database}?" + f"driver=ODBC+Driver+17+for+SQL+Server" + ) + else: + raise ValueError(f"Unknown target database type: {target.db_type}") + + @staticmethod + def _is_cache_valid(cache_key: str) -> bool: + """Проверить валидность кеша""" + if cache_key not in _cache_timestamp: + return False + elapsed = (datetime.utcnow() - _cache_timestamp[cache_key]).total_seconds() + return elapsed < CACHE_TTL_SECONDS + + @staticmethod + def get_target_engine(target_id: int = None): + """Получить engine для целевой БД (с кешированием)""" + # Если target_id не указан, использовать целевую БД по умолчанию + session = DatabaseManager.get_postgres_session() + try: + if not target_id: + target = session.query(TargetDatabase).filter( + TargetDatabase.is_active == True, + TargetDatabase.is_default == True + ).first() + if not target: + # Если нет default, использовать последнюю добавленную активную + target = session.query(TargetDatabase).filter( + TargetDatabase.is_active == True + ).order_by(TargetDatabase.id.desc()).first() + if not target: + logger.warning("No active target database found, using default PostgreSQL") + return postgres_target_engine + target_id = target.id + + # Проверить кеш + cache_key = f"target_{target_id}" + if target_id in _target_engines_cache and DatabaseManager._is_cache_valid(cache_key): + logger.debug(f"Using cached engine for target {target_id}") + return _target_engines_cache[target_id] + + # Загрузить из БД + target = session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() + if not target: + raise ValueError(f"TargetDatabase with ID {target_id} not found") + if not target.is_active: + raise ValueError(f"TargetDatabase {target_id} is inactive") + + # Создать engine + connection_string = DatabaseManager._build_target_connection_string(target) + engine = create_engine(connection_string, echo=False, pool_pre_ping=True) + + # Кешировать + _target_engines_cache[target_id] = engine + _cache_timestamp[cache_key] = datetime.utcnow() + + logger.info(f"Created engine for target {target_id}: {target.name}") + return engine + finally: + session.close() + + @staticmethod + def get_target_database(target_id: int = None) -> Optional[TargetDatabase]: + """Получить конфигурацию целевой БД""" + session = DatabaseManager.get_postgres_session() + try: + if not target_id: + # Получить default + target = session.query(TargetDatabase).filter( + TargetDatabase.is_active == True, + TargetDatabase.is_default == True + ).first() + if not target: + target = session.query(TargetDatabase).filter( + TargetDatabase.is_active == True + ).order_by(TargetDatabase.id.desc()).first() + return target + return session.query(TargetDatabase).filter(TargetDatabase.id == target_id).first() + finally: + session.close() + + @staticmethod + def test_target_connection(target_id: int = None) -> bool: + """Проверить подключение к целевой БД""" + try: + engine = DatabaseManager.get_target_engine(target_id) + with engine.connect() as connection: + connection.execute(text("SELECT 1")) + logger.info(f"Target {target_id} connection successful" if target_id else "Default target connection successful") + return True + except Exception as e: + logger.error(f"Target {target_id} connection failed: {e}" if target_id else f"Default target connection failed: {e}") + return False + + @staticmethod + def get_table_columns(table_name: str, engine) -> list: + """Получить список колонок таблицы""" + try: + inspector = __import__('sqlalchemy').inspect(engine) + columns = inspector.get_columns(table_name) + return [col['name'] for col in columns] + except Exception as e: + logger.error(f"Error getting columns for {table_name}: {e}") + return [] + + @staticmethod + def get_life_table_name(table_name: str) -> str: + """Получить имя Life таблицы по имени оригинальной таблицы""" + return f"Life{table_name}" + + @staticmethod + def check_life_table_exists(table_name: str, source_id: int) -> bool: + """Проверить существует ли Life таблица""" + life_table = DatabaseManager.get_life_table_name(table_name) + try: + engine = DatabaseManager.get_source_engine(source_id) + with engine.connect() as connection: + result = connection.execute( + text(f"SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{life_table}'") + ) + return result.fetchone() is not None + except Exception as e: + logger.warning(f"Error checking Life table {life_table}: {e}") + return False diff --git a/replication_service/docker-compose.yml b/replication_service/docker-compose.yml new file mode 100644 index 0000000..ad2ec74 --- /dev/null +++ b/replication_service/docker-compose.yml @@ -0,0 +1,83 @@ +version: '3.8' + +services: + # PostgreSQL база для хранения конфигурации и логов репликации + postgres: + image: postgres:15-alpine + container_name: replication-postgres + environment: + POSTGRES_DB: replication_db + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres_password + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - replication-network + + # Основной сервис репликации + replication-service: + build: + context: . + dockerfile: Dockerfile + container_name: replication-service + environment: + # MSSQL Connection + MSSQL_SERVER: ${MSSQL_SERVER:-mssql} + MSSQL_DATABASE: ${MSSQL_DATABASE:-production_db} + MSSQL_USERNAME: ${MSSQL_USERNAME:-sa} + MSSQL_PASSWORD: ${MSSQL_PASSWORD:-YourPassword123!} + MSSQL_PORT: ${MSSQL_PORT:-1433} + + # PostgreSQL Connection + POSTGRES_HOST: postgres + POSTGRES_DATABASE: replication_db + POSTGRES_USERNAME: postgres + POSTGRES_PASSWORD: postgres_password + POSTGRES_PORT: 5432 + + # Redis (optional) + REDIS_HOST: ${REDIS_HOST:-localhost} + REDIS_PORT: ${REDIS_PORT:-6379} + ports: + - "8000:8000" + depends_on: + postgres: + condition: service_healthy + volumes: + - ./logs:/app/logs + networks: + - replication-network + restart: unless-stopped + command: uvicorn main:app --host 0.0.0.0 --port 8000 + + # Redis для кэширования и сессий (optional) + redis: + image: redis:7-alpine + container_name: replication-redis + ports: + - "6379:6379" + volumes: + - redis_data:/data + networks: + - replication-network + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + postgres_data: + redis_data: + +networks: + replication-network: + driver: bridge diff --git a/replication_service/examples.py b/replication_service/examples.py new file mode 100644 index 0000000..514912a --- /dev/null +++ b/replication_service/examples.py @@ -0,0 +1,340 @@ +""" +Примеры использования Data Replication Service + +Этот файл содержит примеры того, как использовать сервис программно. +""" + +from database import DatabaseManager, PostgresSessionLocal +from scheduler import scheduler_manager +from replication import ReplicationService +from models import MigrationTable, ReplicationJob, ReplicationStatus +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def example_1_initialize_service(): + """Пример 1: Инициализация сервиса""" + print("=" * 50) + print("Пример 1: Инициализация сервиса") + print("=" * 50) + + # Инициализировать БД + DatabaseManager.init_postgres_db() + logger.info("✓ PostgreSQL БД инициализирована") + + # Проверить подключения + mssql_ok = DatabaseManager.test_mssql_connection() + postgres_ok = DatabaseManager.test_postgres_connection() + + logger.info(f"✓ MSSQL подключение: {'OK' if mssql_ok else 'FAILED'}") + logger.info(f"✓ PostgreSQL подключение: {'OK' if postgres_ok else 'FAILED'}") + + +def example_2_add_migration_tables(): + """Пример 2: Добавление таблиц для миграции""" + print("\n" + "=" * 50) + print("Пример 2: Добавление таблиц для миграции") + print("=" * 50) + + # Запустить планировщик + scheduler_manager.start() + + # Добавить таблицы с разным расписанием и источниками + tables = [ + { + "name": "Orders", + "source": "mssql", + "schedule": "0 2 * * *", # Каждый день в 2:00 AM + "description": "Заказы из MSSQL - ежедневно в 2:00" + }, + { + "name": "Customers", + "source": "mssql", + "schedule": "0 3 * * *", # Каждый день в 3:00 AM + "description": "Клиенты из MSSQL - ежедневно в 3:00" + }, + { + "name": "Products", + "source": "pgsql", + "schedule": "*/30 * * * *", # Каждые 30 минут + "description": "Товары из PostgreSQL - каждые 30 минут" + }, + { + "name": "Transactions", + "source": "mssql", + "schedule": "0 * * * *", # Каждый час + "description": "Транзакции из MSSQL - каждый час" + } + ] + + for table in tables: + migration_table = scheduler_manager.add_migration_table( + table_name=table["name"], + cron_schedule=table["schedule"], + source_type=table["source"], + source_schema="dbo" if table["source"] == "mssql" else "public", + target_schema="public" + ) + + if migration_table: + logger.info(f"✓ {table['description']} - добавлена (ID: {migration_table.id})") + else: + logger.error(f"✗ Ошибка при добавлении {table['name']}") + + +def example_3_list_migration_tables(): + """Пример 3: Получение списка таблиц для миграции""" + print("\n" + "=" * 50) + print("Пример 3: Список таблиц для миграции") + print("=" * 50) + + tables = scheduler_manager.get_migration_tables() + + if tables: + logger.info(f"Найдено {len(tables)} таблиц для миграции:\n") + for table in tables: + logger.info(f" ID: {table.id}") + logger.info(f" Таблица: {table.table_name}") + logger.info(f" Источник: {table.source_type.upper()} ({table.source_schema}.{table.table_name})") + logger.info(f" Цель: {table.target_schema}.{table.table_name}") + logger.info(f" Расписание: {table.cron_schedule}") + logger.info(f" Активна: {table.is_active}") + logger.info("") + else: + logger.info("Нет таблиц для миграции") + + +def example_4_scheduler_status(): + """Пример 4: Проверка статуса планировщика""" + print("\n" + "=" * 50) + print("Пример 4: Статус планировщика") + print("=" * 50) + + is_running = scheduler_manager.scheduler.running + tables = scheduler_manager.get_migration_tables() + running_jobs = scheduler_manager.get_running_jobs() + queued_jobs = scheduler_manager.get_queued_jobs() + + logger.info(f"Планировщик запущен: {is_running}") + logger.info(f"Таблиц для миграции: {len(tables)}") + logger.info(f"Выполняющихся задач: {len(running_jobs)}") + logger.info(f"Задач в очереди: {len(queued_jobs)}") + + # Вывести информацию о расписании + jobs = scheduler_manager.scheduler.get_jobs() + if jobs: + logger.info(f"\nРасписание задач:") + for job in jobs: + logger.info(f" - {job.name}") + logger.info(f" Следующее выполнение: {job.next_run_time}") + + +def example_5_manual_replication(): + """Пример 5: Ручная репликация таблицы""" + print("\n" + "=" * 50) + print("Пример 5: Ручная репликация таблицы") + print("=" * 50) + + service = ReplicationService() + + try: + # Создать запись о задаче + job = service.create_replication_job( + table_id=1, + table_name="Orders" + ) + logger.info(f"✓ Создана задача репликации (ID: {job.id})") + + # Выполнить репликацию + success, rows_count = service.replicate_table( + table_name="Orders", + source_schema="dbo", + target_schema="public" + ) + + if success: + logger.info(f"✓ Успешно реплицировано {rows_count} строк из Orders") + + # Обновить статус + service.update_job_status( + job.id, + ReplicationStatus.SUCCESS, + rows_count + ) + logger.info(f"✓ Статус задачи обновлен на SUCCESS") + else: + logger.error(f"✗ Ошибка при репликации Orders") + service.update_job_status( + job.id, + ReplicationStatus.FAILED, + error_message="Ошибка репликации" + ) + + finally: + service.close() + + +def example_6_view_replication_history(): + """Пример 6: Просмотр истории репликации""" + print("\n" + "=" * 50) + print("Пример 6: История репликации") + print("=" * 50) + + session = PostgresSessionLocal() + + try: + # Получить последние 5 задач + jobs = session.query(ReplicationJob).order_by( + ReplicationJob.created_at.desc() + ).limit(5).all() + + if jobs: + logger.info(f"Последние {len(jobs)} задач репликации:\n") + for job in jobs: + logger.info(f" ID: {job.id}") + logger.info(f" Таблица: {job.table_name}") + logger.info(f" Статус: {job.status}") + logger.info(f" Строк обработано: {job.rows_processed}") + logger.info(f" Создано: {job.created_at}") + if job.error_message: + logger.info(f" Ошибка: {job.error_message}") + logger.info("") + else: + logger.info("История репликации пуста") + + finally: + session.close() + + +def example_7_life_table_processing(): + """Пример 7: Обработка Life таблиц (логи изменений)""" + print("\n" + "=" * 50) + print("Пример 7: Обработка Life таблиц") + print("=" * 50) + + service = ReplicationService() + + try: + # Обработать изменения из Life таблицы + changes = service.process_life_table_changes("Orders") + + logger.info(f"✓ Обработаны изменения из LifeOrders:") + logger.info(f" INSERT операции: {changes['INSERT']}") + logger.info(f" UPDATE операции: {changes['UPDATE']}") + logger.info(f" DELETE операции: {changes['DELETE']}") + + finally: + service.close() + + +def example_8_remove_migration_table(): + """Пример 8: Удаление таблицы из миграции""" + print("\n" + "=" * 50) + print("Пример 8: Удаление таблицы из миграции") + print("=" * 50) + + # Удалить таблицу с ID 1 + success = scheduler_manager.remove_migration_table(table_id=1) + + if success: + logger.info("✓ Таблица успешно удалена из миграции") + else: + logger.info("✗ Таблица не найдена") + + +def example_9_statistics(): + """Пример 9: Статистика репликации""" + print("\n" + "=" * 50) + print("Пример 9: Статистика репликации") + print("=" * 50) + + session = PostgresSessionLocal() + + try: + total_jobs = session.query(ReplicationJob).count() + successful_jobs = session.query(ReplicationJob).filter( + ReplicationJob.status == ReplicationStatus.SUCCESS + ).count() + failed_jobs = session.query(ReplicationJob).filter( + ReplicationJob.status == ReplicationStatus.FAILED + ).count() + + success_rate = (successful_jobs / total_jobs * 100) if total_jobs > 0 else 0 + + logger.info(f"Всего задач: {total_jobs}") + logger.info(f"Успешных: {successful_jobs}") + logger.info(f"Ошибок: {failed_jobs}") + logger.info(f"Процент успеха: {success_rate:.2f}%") + + finally: + session.close() + + +def example_10_concurrent_schedules(): + """ + Пример 10: Демонстрация обработки конфликтов расписания + + Когда расписание нескольких таблиц пересекается, + они выполняются последовательно в очереди. + """ + print("\n" + "=" * 50) + print("Пример 10: Обработка конфликтов расписания") + print("=" * 50) + + scheduler_manager.start() + + # Добавить таблицы с пересекающимся расписанием + logger.info("Добавление таблиц с пересекающимся расписанием...") + + # Все три таблицы будут начинать репликацию в 2:00 AM + # но выполняться последовательно + scheduler_manager.add_migration_table( + table_name="Orders", + cron_schedule="0 2 * * *", # 2:00 AM + source_schema="dbo" + ) + + scheduler_manager.add_migration_table( + table_name="Customers", + cron_schedule="0 2 * * *", # 2:00 AM (конфликт) + source_schema="dbo" + ) + + scheduler_manager.add_migration_table( + table_name="Products", + cron_schedule="0 2 * * *", # 2:00 AM (конфликт) + source_schema="dbo" + ) + + logger.info("✓ Таблицы добавлены") + logger.info("\nКогда наступит 2:00 AM:") + logger.info(" 1. Начнет выполняться Orders") + logger.info(" 2. Customers будет добавлена в очередь") + logger.info(" 3. Products будет добавлена в очередь") + logger.info(" 4. После завершения Orders начнет выполняться Customers") + logger.info(" 5. После завершения Customers начнет выполняться Products") + + +if __name__ == "__main__": + # Выполнить примеры + try: + example_1_initialize_service() + example_2_add_migration_tables() + example_3_list_migration_tables() + example_4_scheduler_status() + # example_5_manual_replication() # Раскомментировать если нужна ручная репликация + example_6_view_replication_history() + # example_7_life_table_processing() # Раскомментировать если есть Life таблицы + # example_8_remove_migration_table() # Раскомментировать для удаления + example_9_statistics() + example_10_concurrent_schedules() + + logger.info("\n✓ Все примеры выполнены!") + + except Exception as e: + logger.error(f"Ошибка при выполнении примеров: {e}", exc_info=True) + + finally: + scheduler_manager.stop() diff --git a/replication_service/examples_api.py b/replication_service/examples_api.py new file mode 100644 index 0000000..ab7b253 --- /dev/null +++ b/replication_service/examples_api.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +""" +Примеры использования новой DataSource API для управления источниками данных. +""" + +import requests +import json +from typing import Dict, List + +BASE_URL = "http://localhost:8000/api/v1" + +# Цвета для вывода +GREEN = '\033[92m' +RED = '\033[91m' +BLUE = '\033[94m' +YELLOW = '\033[93m' +RESET = '\033[0m' + + +def print_section(title: str): + """Красиво вывести заголовок секции""" + print(f"\n{BLUE}{'='*60}{RESET}") + print(f"{BLUE}{title:^60}{RESET}") + print(f"{BLUE}{'='*60}{RESET}\n") + + +def print_response(response: requests.Response, title: str = "Response"): + """Вывести HTTP response в красивом формате""" + print(f"{YELLOW}{title}:{RESET}") + try: + print(json.dumps(response.json(), indent=2)) + except: + print(response.text) + print() + + +def example_1_create_data_sources(): + """Пример 1: Создание источников данных""" + print_section("Пример 1: Создание источников данных") + + # 1. Создать MSSQL источник + print(f"{BLUE}1. Создание MSSQL источника...{RESET}") + mssql_source = { + "name": "MSSQL Production", + "source_type": "mssql", + "host": "mssql-server.example.com", + "port": 1433, + "database": "SalesDatabase", + "username": "sa", + "password": "StrongPassword123!", + "default_schema": "dbo", + "description": "Production MSSQL server for sales data" + } + + response = requests.post(f"{BASE_URL}/data-sources", json=mssql_source) + print_response(response, "Created MSSQL Source") + mssql_id = response.json()['id'] if response.status_code == 200 else None + + # 2. Создать PostgreSQL источник + print(f"{BLUE}2. Создание PostgreSQL источника...{RESET}") + pgsql_source = { + "name": "PostgreSQL Analytics", + "source_type": "pgsql", + "host": "postgres-analytics.example.com", + "port": 5432, + "database": "analytics_db", + "username": "analyst", + "password": "AnalyticsPass456!", + "default_schema": "public", + "description": "Analytics database for reporting" + } + + response = requests.post(f"{BASE_URL}/data-sources", json=pgsql_source) + print_response(response, "Created PostgreSQL Source") + pgsql_id = response.json()['id'] if response.status_code == 200 else None + + return mssql_id, pgsql_id + + +def example_2_list_and_test_sources(mssql_id: int, pgsql_id: int): + """Пример 2: Получить список источников и протестировать подключения""" + print_section("Пример 2: Список источников и тестирование подключений") + + # 1. Получить полный список + print(f"{BLUE}1. Получение списка всех источников...{RESET}") + response = requests.get(f"{BASE_URL}/data-sources") + print_response(response, "All Data Sources") + + # 2. Получить только активные + print(f"{BLUE}2. Получение списка активных источников...{RESET}") + response = requests.get(f"{BASE_URL}/data-sources?active_only=true") + print_response(response, "Active Data Sources") + + # 3. Получить конкретный источник + if mssql_id: + print(f"{BLUE}3. Получение конкретного источника (ID: {mssql_id})...{RESET}") + response = requests.get(f"{BASE_URL}/data-sources/{mssql_id}") + print_response(response, f"Data Source {mssql_id}") + + # 4. Протестировать подключение + if mssql_id: + print(f"{BLUE}4. Тестирование подключения к MSSQL (ID: {mssql_id})...{RESET}") + response = requests.post(f"{BASE_URL}/data-sources/{mssql_id}/test") + print_response(response, "MSSQL Connection Test Result") + + +def example_3_create_migrations(mssql_id: int, pgsql_id: int): + """Пример 3: Создание миграций (расписание репликации)""" + print_section("Пример 3: Создание миграций таблиц") + + # 1. Создать миграцию для таблицы Orders из MSSQL + print(f"{BLUE}1. Создание миграции таблицы Orders из MSSQL...{RESET}") + migration_1 = { + "table_name": "Orders", + "source_id": mssql_id, + "source_schema": "dbo", + "target_schema": "public", + "cron_schedule": "0 2 * * *" # Каждый день в 2:00 AM + } + + response = requests.post(f"{BASE_URL}/migration-tables", json=migration_1) + print_response(response, "Created Migration for Orders") + migration_1_id = response.json()['id'] if response.status_code == 200 else None + + # 2. Создать миграцию для таблицы Products из PostgreSQL + if pgsql_id: + print(f"{BLUE}2. Создание миграции таблицы Products из PostgreSQL...{RESET}") + migration_2 = { + "table_name": "Products", + "source_id": pgsql_id, + "source_schema": "public", + "target_schema": "public", + "cron_schedule": "0 4 * * *" # Каждый день в 4:00 AM + } + + response = requests.post(f"{BASE_URL}/migration-tables", json=migration_2) + print_response(response, "Created Migration for Products") + + return migration_1_id + + +def example_4_list_migrations(migration_1_id: int): + """Пример 4: Получить список миграций""" + print_section("Пример 4: Получение списка миграций") + + # 1. Получить все миграции + print(f"{BLUE}1. Получение всех миграций...{RESET}") + response = requests.get(f"{BASE_URL}/migration-tables") + print_response(response, "All Migrations") + + # 2. Получить только активные + print(f"{BLUE}2. Получение активных миграций...{RESET}") + response = requests.get(f"{BASE_URL}/migration-tables?active_only=true") + print_response(response, "Active Migrations") + + # 3. Получить конкретную миграцию + if migration_1_id: + print(f"{BLUE}3. Получение конкретной миграции (ID: {migration_1_id})...{RESET}") + response = requests.get(f"{BASE_URL}/migration-tables/{migration_1_id}") + print_response(response, f"Migration {migration_1_id}") + + +def example_5_update_data_source(mssql_id: int): + """Пример 5: Обновить источник данных""" + print_section("Пример 5: Обновление источника данных") + + if not mssql_id: + print(f"{RED}No MSSQL source ID available{RESET}") + return + + print(f"{BLUE}Обновление узла MSSQL источника...{RESET}") + update_data = { + "host": "new-mssql-server.example.com", + "port": 1433 + } + + response = requests.put(f"{BASE_URL}/data-sources/{mssql_id}", json=update_data) + print_response(response, "Updated Data Source") + + +def example_6_update_migration(migration_1_id: int): + """Пример 6: Обновить миграцию""" + print_section("Пример 6: Обновление миграции") + + if not migration_1_id: + print(f"{RED}No migration ID available{RESET}") + return + + print(f"{BLUE}Обновление расписания миграции...{RESET}") + update_data = { + "cron_schedule": "0 3 * * *" # Изменить на 3:00 AM + } + + response = requests.put(f"{BASE_URL}/migration-tables/{migration_1_id}", json=update_data) + print_response(response, "Updated Migration") + + +def example_7_check_health(): + """Пример 7: Проверить здоровье сервиса""" + print_section("Пример 7: Проверка здоровья сервиса") + + response = requests.get(f"{BASE_URL}/health") + print_response(response, "Service Health Check") + + +def example_8_get_replication_jobs(): + """Пример 8: Получить историю задач репликации""" + print_section("Пример 8: История задач репликации") + + print(f"{BLUE}1. Получение последних 10 задач репликации...{RESET}") + response = requests.get(f"{BASE_URL}/replication-jobs?limit=10&offset=0") + print_response(response, "Recent Replication Jobs") + + print(f"{BLUE}2. Получение только успешных задач...{RESET}") + response = requests.get(f"{BASE_URL}/replication-jobs?status=success&limit=10") + print_response(response, "Successful Replication Jobs") + + +def main(): + """Главная функция - запустить все примеры""" + + print(f"\n{GREEN}{'='*60}{RESET}") + print(f"{GREEN}{'Data Replication Service - API Examples':^60}{RESET}") + print(f"{GREEN}{'='*60}{RESET}\n") + + try: + # Проверить подключение к сервису + response = requests.get(f"{BASE_URL.replace('/api/v1', '')}/") + if response.status_code != 200: + print(f"{RED}✗ Error: Cannot connect to service at {BASE_URL}{RESET}") + print(f"{YELLOW}Make sure the service is running on http://localhost:8000{RESET}") + return + + print(f"{GREEN}✓ Connected to Data Replication Service{RESET}\n") + + # Запустить примеры + mssql_id, pgsql_id = example_1_create_data_sources() + example_2_list_and_test_sources(mssql_id, pgsql_id) + migration_1_id = example_3_create_migrations(mssql_id, pgsql_id) + example_4_list_migrations(migration_1_id) + example_5_update_data_source(mssql_id) + example_6_update_migration(migration_1_id) + example_7_check_health() + example_8_get_replication_jobs() + + print(f"\n{GREEN}{'='*60}{RESET}") + print(f"{GREEN}{'✓ All examples completed successfully!':^60}{RESET}") + print(f"{GREEN}{'='*60}{RESET}\n") + + print(f"{YELLOW}📚 Documentation: See DATA_SOURCES.md for detailed API reference{RESET}\n") + + except requests.exceptions.ConnectionError: + print(f"{RED}✗ Connection error: Cannot connect to {BASE_URL}{RESET}") + print(f"{YELLOW}Make sure the service is running: python main.py{RESET}") + except Exception as e: + print(f"{RED}✗ Error: {e}{RESET}") + + +if __name__ == "__main__": + main() diff --git a/replication_service/main.py b/replication_service/main.py new file mode 100644 index 0000000..7226f4f --- /dev/null +++ b/replication_service/main.py @@ -0,0 +1,93 @@ +from fastapi import FastAPI +from contextlib import asynccontextmanager +import logging + +from database import DatabaseManager, PostgresSessionLocal +from models import DataSource +from scheduler import scheduler_manager +from api import router + +# Логирование +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Управление жизненным циклом приложения""" + # Startup + logger.info("Starting Data Replication Service...") + + # Инициализировать БД + DatabaseManager.init_postgres_db() + logger.info("PostgreSQL database initialized") + + # Проверить подключение к целевой БД + postgres_ok = DatabaseManager.test_postgres_connection() + + if not postgres_ok: + logger.warning("Target PostgreSQL connection failed! Replication service requires target database.") + else: + logger.info("Target PostgreSQL connection successful") + + # Проверить доступные источники данных + session = PostgresSessionLocal() + try: + data_sources = session.query(DataSource).filter(DataSource.is_active == True).all() + if not data_sources: + logger.warning("No active data sources configured. Add data sources via API before replication.") + else: + logger.info(f"Found {len(data_sources)} active data source(s):") + for source in data_sources: + is_ok = DatabaseManager.test_source_connection(source.id) + status = "✓" if is_ok else "✗" + logger.info(f" {status} {source.name} ({source.source_type.value})") + finally: + session.close() + + # Запустить планировщик + scheduler_manager.start() + logger.info("Scheduler started") + + yield + + # Shutdown + logger.info("Shutting down Data Replication Service...") + scheduler_manager.stop() + logger.info("Scheduler stopped") + + +app = FastAPI( + title="Data Replication Service", + description="Service for replicating data from multiple sources (MSSQL, PostgreSQL) to target PostgreSQL database using DLT. Configure data sources dynamically without restart.", + version="1.0.0", + lifespan=lifespan +) + +# Подключить маршруты API +app.include_router(router) + + +@app.get("/") +def read_root(): + """Root endpoint""" + return { + "message": "Data Replication Service is running", + "docs": "/docs", + "openapi": "/openapi.json", + "features": [ + "Dynamic data source management", + "Multiple source types (MSSQL, PostgreSQL)", + "Cron-based scheduling", + "DLT-powered data transfer", + "Real-time monitoring" + ] + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/replication_service/migrate_to_data_sources.py b/replication_service/migrate_to_data_sources.py new file mode 100644 index 0000000..cc72c37 --- /dev/null +++ b/replication_service/migrate_to_data_sources.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Миграционный скрипт для обновления схемы БД. +Этот скрипт обновляет migration_tables таблицу с новой структурой source_id. +""" + +import sys +sys.path.insert(0, '/home/brus/project-carrier/replication_service') + +from database import DatabaseManager, postgres_engine +from sqlalchemy import text +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def migrate_to_data_sources(): + """Миграция существующей конфигурации в новую систему DataSource""" + + logger.info("Starting migration to DataSource-based configuration...") + + # Инициализировать новую схему если еще не создана + DatabaseManager.init_postgres_db() + logger.info("Database schema initialized with DataSource model") + + session = DatabaseManager.get_postgres_session() + + try: + # Проверить существуют ли там уже данные в migration_tables с источником source_type + from models import MigrationTable, DataSource, SourceType + from sqlalchemy import inspect + + inspector = inspect(postgres_engine) + tables = inspector.get_table_names() + + if 'migration_tables' in tables: + logger.info("migration_tables exists, checking for old source_type column...") + + # Получить информацию о колонках + columns = [c['name'] for c in inspector.get_columns('migration_tables')] + + if 'source_type' in columns: + logger.info("Found old source_type column, migrating data...") + + # Чтение старых данных + with postgres_engine.connect() as connection: + # Получить уникальные источники из old migration_tables + result = connection.execute( + text("SELECT DISTINCT source_type FROM migration_tables") + ) + source_types = [row[0] for row in result] + logger.info(f"Found source types: {source_types}") + + # Для каждого типа источника, создать DataSource запись + for source_type in source_types: + if source_type == 'mssql': + # Попытаться загрузить данные из конфига если доступны + from config import settings + source = DataSource( + name=f"MSSQL Auto-migrated", + source_type=SourceType.MSSQL, + host=settings.mssql_server, + port=settings.mssql_port, + database=settings.mssql_database, + username=settings.mssql_username, + password=settings.mssql_password, + default_schema="dbo", + description="Auto-migrated from environment variables" + ) + elif source_type == 'pgsql': + from config import settings + source = DataSource( + name=f"PostgreSQL Auto-migrated", + source_type=SourceType.PGSQL, + host=settings.pgsql_source_host, + port=settings.pgsql_source_port, + database=settings.pgsql_source_database, + username=settings.pgsql_source_username, + password=settings.pgsql_source_password, + default_schema="public", + description="Auto-migrated from environment variables" + ) + else: + logger.warning(f"Unknown source type: {source_type}, skipping") + continue + + session.add(source) + + session.commit() + logger.info(f"Created {len(source_types)} DataSource records") + + # Обновить migration_tables с source_id + sources = session.query(DataSource).all() + source_map = {s.source_type.value: s.id for s in sources} + + migrations = session.query(MigrationTable).all() + for migration in migrations: + if hasattr(migration, 'source_type'): + source_type_val = migration.source_type.value if hasattr(migration.source_type, 'value') else str(migration.source_type) + if source_type_val in source_map: + migration.source_id = source_map[source_type_val] + + session.commit() + logger.info(f"Updated {len(migrations)} migration records with source_id") + + else: + logger.info("source_type column not found, schema already migrated or no migration_tables") + + else: + logger.info("migration_tables doesn't exist yet, fresh install") + + logger.info("✓ Migration completed successfully!") + return True + + except Exception as e: + logger.error(f"Migration failed: {e}", exc_info=True) + session.rollback() + return False + + finally: + session.close() + + +if __name__ == "__main__": + success = migrate_to_data_sources() + sys.exit(0 if success else 1) diff --git a/replication_service/models.py b/replication_service/models.py new file mode 100644 index 0000000..ec76a23 --- /dev/null +++ b/replication_service/models.py @@ -0,0 +1,116 @@ +from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, Enum, ForeignKey, JSON +from sqlalchemy.ext.declarative import declarative_base +from datetime import datetime +import enum +import json + +Base = declarative_base() + + +class ReplicationStatus(str, enum.Enum): + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + + +class SourceType(str, enum.Enum): + """Тип источника данных""" + MSSQL = "mssql" + PGSQL = "pgsql" + + +class DatabaseType(str, enum.Enum): + """Тип целевой БД""" + PGSQL = "pgsql" + MSSQL = "mssql" + + +class DataSource(Base): + """Модель для хранения конфигурации источников данных""" + __tablename__ = "data_sources" + + id = Column(Integer, primary_key=True) + name = Column(String(255), unique=True, nullable=False) + source_type = Column(Enum(SourceType), nullable=False) + host = Column(String(255), nullable=False) + port = Column(Integer, nullable=False) + database = Column(String(255), nullable=False) + username = Column(String(255), nullable=False) + password = Column(String(255), nullable=False) + default_schema = Column(String(255), default="dbo") + is_active = Column(Boolean, default=True) + description = Column(Text) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + +class TargetDatabase(Base): + """Модель для хранения конфигурации целевых баз данных""" + __tablename__ = "target_databases" + + id = Column(Integer, primary_key=True) + name = Column(String(255), unique=True, nullable=False) # Имя целевой БД + db_type = Column(Enum(DatabaseType), default=DatabaseType.PGSQL) + host = Column(String(255), nullable=False) + port = Column(Integer, nullable=False) + database = Column(String(255), nullable=False) + username = Column(String(255), nullable=False) + password = Column(String(255), nullable=False) + is_active = Column(Boolean, default=True) + is_default = Column(Boolean, default=False) # Целевая БД по умолчанию + description = Column(Text) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + +class MigrationTable(Base): + """Модель для хранения таблиц, которые нужно мигрировать""" + __tablename__ = "migration_tables" + + id = Column(Integer, primary_key=True) + table_name = Column(String(255), nullable=False) + source_id = Column(Integer, ForeignKey("data_sources.id"), nullable=False) + target_id = Column(Integer, ForeignKey("target_databases.id")) # Целевая БД (если None, используется default) + source_schema = Column(String(255), default="dbo") + target_schema = Column(String(255), default="public") + target_table_name = Column(String(255)) # Переименование таблицы при копировании (если None, используется table_name) + column_mapping = Column(JSON) # {"source_col": "target_col", ...} для переименования столбцов + + # Конфигурация Life таблиц + use_life_table = Column(Boolean, default=False) # Обрабатывать изменения из Life таблицы + life_excluded_fields = Column(JSON) # ["field1", "field2", ...] - сервисные поля для исключения + + is_active = Column(Boolean, default=True) + cron_schedule = Column(String(255), nullable=False) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + +class ReplicationJob(Base): + """Модель для логирования выполнения репликации""" + __tablename__ = "replication_jobs" + + id = Column(Integer, primary_key=True) + table_id = Column(Integer, nullable=False) + table_name = Column(String(255), nullable=False) + status = Column(Enum(ReplicationStatus), default=ReplicationStatus.PENDING) + started_at = Column(DateTime) + completed_at = Column(DateTime) + rows_processed = Column(Integer, default=0) + error_message = Column(Text) + created_at = Column(DateTime, default=datetime.utcnow) + + +class ChangeLog(Base): + """Модель для хранения логов изменений (Life таблицы)""" + __tablename__ = "change_logs" + + id = Column(Integer, primary_key=True) + table_id = Column(Integer, nullable=False) + table_name = Column(String(255), nullable=False) + operation = Column(String(10), nullable=False) # INSERT, UPDATE, DELETE + change_data = Column(Text, nullable=False) # JSON + change_timestamp = Column(DateTime, nullable=False) + processed = Column(Boolean, default=False) + created_at = Column(DateTime, default=datetime.utcnow) diff --git a/replication_service/replication.py b/replication_service/replication.py new file mode 100644 index 0000000..53cdbfd --- /dev/null +++ b/replication_service/replication.py @@ -0,0 +1,381 @@ +import dlt +from typing import List, Dict, Any, Tuple, Optional +from datetime import datetime +from sqlalchemy import text +import logging +import json + +try: + import pandas as pd + PANDAS_AVAILABLE = True +except ImportError: + PANDAS_AVAILABLE = False + +from database import DatabaseManager, PostgresSessionLocal +from models import ReplicationJob, ReplicationStatus, ChangeLog, DataSource + +logger = logging.getLogger(__name__) + + +class ReplicationService: + """Сервис репликации данных из различных источников в целевую БД с поддержкой трансформаций""" + + def __init__(self): + self.session = PostgresSessionLocal() + + def create_replication_job(self, table_id: int, table_name: str) -> ReplicationJob: + """Создать запись о задаче репликации""" + job = ReplicationJob( + table_id=table_id, + table_name=table_name, + status=ReplicationStatus.PENDING, + started_at=datetime.utcnow() + ) + self.session.add(job) + self.session.commit() + return job + + def update_job_status(self, job_id: int, status: ReplicationStatus, + rows_processed: int = 0, error_message: str = None): + """Обновить статус задачи репликации""" + try: + job = self.session.query(ReplicationJob).filter(ReplicationJob.id == job_id).first() + if job: + job.status = status + job.rows_processed = rows_processed + job.completed_at = datetime.utcnow() + if error_message: + job.error_message = error_message + self.session.commit() + except Exception as e: + logger.error(f"Error updating job status: {e}") + self.session.rollback() + + def _apply_column_mapping(self, rows: List[Dict[str, Any]], column_mapping: Optional[Dict[str, str]]) -> List[Dict[str, Any]]: + """Применить переименование столбцов""" + if not column_mapping: + return rows + + mapped_rows = [] + for row in rows: + new_row = {} + for old_name, value in row.items(): + new_name = column_mapping.get(old_name, old_name) + new_row[new_name] = value + mapped_rows.append(new_row) + + logger.debug(f"Applied column mapping: {column_mapping}") + return mapped_rows + + def _filter_excluded_fields(self, rows: List[Dict[str, Any]], excluded_fields: Optional[List[str]]) -> List[Dict[str, Any]]: + """Исключить сервисные поля""" + if not excluded_fields: + return rows + + filtered_rows = [] + for row in rows: + filtered_row = {k: v for k, v in row.items() if k not in excluded_fields} + filtered_rows.append(filtered_row) + + logger.debug(f"Excluded fields: {excluded_fields}") + return filtered_rows + + def replicate_table( + self, + table_name: str, + source_id: int, + source_schema: str = None, + target_schema: str = "public", + target_id: int = None, + target_table_name: str = None, + column_mapping: Optional[Dict[str, str]] = None, + life_excluded_fields: Optional[List[str]] = None + ) -> Tuple[bool, int]: + """ + Реплицировать таблицу из источника в целевую БД + + Args: + table_name: Имя таблицы в источнике + source_id: ID источника данных + source_schema: Схема в источнике (если None, используется default_schema) + target_schema: Схема в целевой БД + target_id: ID целевой БД (если None, используется default) + target_table_name: Переименование таблицы при копировании (если None, используется table_name) + column_mapping: Переименование столбцов {"source_col": "target_col"} + life_excluded_fields: Поля для исключения (для Life таблиц) + + Returns: + Кортеж (успешность, количество строк) + """ + try: + # Получить информацию об источнике + data_source = DatabaseManager.get_data_source(source_id) + if not data_source: + raise ValueError(f"DataSource with ID {source_id} not found") + + # Использовать имя целевой таблицы или имя исходной таблицы + final_table_name = target_table_name or table_name + + # Использовать default_schema если не указана + if source_schema is None: + source_schema = data_source.default_schema + + logger.info( + f"Starting replication for table {table_name} -> {final_table_name} " + f"from source {data_source.name} (ID: {source_id}) " + f"to target DB (ID: {target_id or 'default'})" + ) + + # Определить тип целевой БД для выбора стратегии репликации + target_db = DatabaseManager.get_target_database(target_id) + + # Использовать DLT для PostgreSQL (включая default PostgreSQL) + is_postgres_target = False + if target_db: + is_postgres_target = target_db.db_type.value == 'pgsql' + else: + is_postgres_target = True + + if is_postgres_target: + # Использовать DLT sql_database источник для PostgreSQL + from config import settings + from dlt.destinations import postgres + from dlt.sources.sql_database import sql_table + + # Создаем PostgreSQL destination с кредами из конфига + postgres_destination = postgres( + credentials={ + "drivername": "postgresql", + "username": settings.postgres_username, + "password": settings.postgres_password, + "host": settings.postgres_host, + "port": settings.postgres_port, + "database": settings.postgres_database + } + ) + + pipeline = dlt.pipeline( + pipeline_name=f"replication_{data_source.source_type.value}_{table_name}", + destination=postgres_destination, + dataset_name=target_schema, + full_refresh=False + ) + + # Использовать sql_table источник для чтения данных из источника + source_table = sql_table( + credentials=DatabaseManager._build_connection_string(data_source), + table=table_name, + schema=source_schema, + chunk_size=5000, + detect_precision_hints=True, + write_disposition="append" # Дополнять таблицу, а не заменять + ) + + # Переименовать таблицу если нужно + if final_table_name != table_name: + source_table.apply_hints(table_name=final_table_name) + + load_info = pipeline.run(source_table) + + # Получить количество загруженных строк из load_info + row_count = 0 + if load_info and hasattr(load_info, 'packages'): + for package in load_info.packages: + if hasattr(package, 'jobs'): + for job_name, jobs in package.jobs.items(): + for job in jobs: + if hasattr(job, 'rows') and job.rows: + row_count += job.rows + + logger.info( + f"Successfully replicated table {table_name} " + f"(Source: {data_source.name}) to {final_table_name}. Rows: {row_count}" + ) + return True, row_count + else: + # Для других БД используем прямую вставку через SQLAlchemy + source_engine = DatabaseManager.get_source_engine(source_id) + target_engine = DatabaseManager.get_target_engine(target_id) + + # Читать данные из источника + with source_engine.connect() as connection: + query = f"SELECT * FROM {source_schema}.{table_name}" + result = connection.execute(text(query)) + columns = [desc[0] for desc in result.keys()] + + rows = [] + row_count = 0 + for row in result: + row_dict = dict(zip(columns, row)) + rows.append(row_dict) + row_count += 1 + + if rows: + # Применить фильтры и трансформации + rows = self._filter_excluded_fields(rows, life_excluded_fields) + rows = self._apply_column_mapping(rows, column_mapping) + + # Вставить строки через SQLAlchemy + self._insert_rows(target_engine, rows, final_table_name, target_schema) + + logger.info( + f"Successfully replicated {row_count} rows from {table_name} " + f"(Source: {data_source.name}) to {final_table_name}" + ) + return True, row_count + else: + logger.warning(f"No data found in table {table_name} from {data_source.name}") + return True, 0 + + except Exception as e: + logger.error(f"Error replicating table {table_name} from source {source_id}: {e}", exc_info=True) + return False, 0 + + def _insert_rows(self, engine, rows: List[Dict[str, Any]], table_name: str, schema: str): + """Вставить строки в целевую БД через SQLAlchemy""" + if not rows: + return + + try: + with engine.begin() as connection: + # Получить имена столбцов из первой строки + columns = list(rows[0].keys()) + + # Создать INSERT statement с именованными параметрами + col_names = ", ".join(columns) + placeholders = ", ".join([f":{col}" for col in columns]) + query = f"INSERT INTO {schema}.{table_name} ({col_names}) VALUES ({placeholders})" + + # Вставить все строки, используя executemany с именованными параметрами + # Фильтруем строки для согласованности столбцов + filtered_rows = [] + for row in rows: + filtered_row = {col: row.get(col) for col in columns} + filtered_rows.append(filtered_row) + + connection.execute(text(query), filtered_rows) + + logger.debug(f"Inserted {len(rows)} rows into {schema}.{table_name}") + except Exception as e: + logger.error(f"Error inserting rows into {schema}.{table_name}: {e}") + raise + + def process_life_table_changes( + self, + table_name: str, + source_id: int, + use_life_table: bool = False, + life_excluded_fields: Optional[List[str]] = None, + target_id: int = None, + target_table_name: str = None, + column_mapping: Optional[Dict[str, str]] = None + ) -> Dict[str, int]: + """ + Обработать изменения из Life таблицы + + Args: + table_name: Имя оригинальной таблицы + source_id: ID источника данных + use_life_table: Обрабатывать ли Life таблицу (если False, вернуть пусто) + life_excluded_fields: Поля для исключения при обработке + target_id: ID целевой БД + target_table_name: Переименование таблицы + column_mapping: Маппинг столбцов + + Returns: + Словарь с количеством обработанных операций + """ + operations = {"INSERT": 0, "UPDATE": 0, "DELETE": 0} + + # Если Life таблица отключена, вернуть пусто + if not use_life_table: + logger.info(f"Life table processing disabled for {table_name}") + return operations + + life_table_name = DatabaseManager.get_life_table_name(table_name) + + try: + # Получить информацию об источнике + data_source = DatabaseManager.get_data_source(source_id) + if not data_source: + raise ValueError(f"DataSource with ID {source_id} not found") + + # Проверить существует ли Life таблица + if not DatabaseManager.check_life_table_exists(table_name, source_id): + logger.warning(f"Life table {life_table_name} does not exist for {table_name}") + return operations + + # Использовать целевое имя таблицы или оригинальное + final_table_name = target_table_name or table_name + + # Получить engine для источника и целевой БД + source_engine = DatabaseManager.get_source_engine(source_id) + target_engine = DatabaseManager.get_target_engine(target_id) + + with source_engine.connect() as connection: + # Получить необработанные изменения + query = f""" + SELECT * FROM {life_table_name} + WHERE NOT ISNULL(IsProcessed, 0) + ORDER BY ChangeTime ASC + """ + + result = connection.execute(text(query)) + columns = [desc[0] for desc in result.keys()] + + for row in result: + row_dict = dict(zip(columns, row)) + operation = row_dict.get('Operation', 'INSERT') + + try: + # Применить фильтры к данным + filtered_row = self._filter_excluded_fields([row_dict], life_excluded_fields)[0] + mapped_row = self._apply_column_mapping([filtered_row], column_mapping)[0] + + self._apply_change( + table_name=final_table_name, + operation=operation, + row_data=mapped_row, + source_id=source_id, + target_engine=target_engine + ) + operations[operation] += 1 + except Exception as e: + logger.error(f"Error applying change: {e}") + + logger.info(f"Life table processing completed: {operations} for {table_name}") + return operations + except Exception as e: + logger.error(f"Error processing Life table changes for {table_name}: {e}") + return operations + + def _apply_change( + self, + table_name: str, + operation: str, + row_data: dict, + source_id: int, + target_engine=None + ): + """Применить изменение к целевой таблице""" + try: + # Логировать изменение + change_log = ChangeLog( + table_id=source_id, + table_name=table_name, + operation=operation, + change_data=json.dumps(row_data), + change_timestamp=datetime.utcnow() + ) + self.session.add(change_log) + self.session.commit() + + logger.debug(f"Applied {operation} change to {table_name}") + except Exception as e: + logger.error(f"Error logging change: {e}") + self.session.rollback() + + def close(self): + """Закрыть сессию""" + if self.session: + self.session.close() diff --git a/replication_service/requirements.txt b/replication_service/requirements.txt new file mode 100644 index 0000000..651edd2 --- /dev/null +++ b/replication_service/requirements.txt @@ -0,0 +1,36 @@ +# Web Framework +fastapi==0.135.2 +uvicorn==0.42.0 + +# Database +sqlalchemy==2.0.48 +psycopg2-binary==2.9.11 +pyodbc==5.3.0 +asyncpg==0.31.0 + +# ETL +dlt==1.24.0 + +# Data Processing +pandas==3.0.1 +numpy==2.4.4 + +# Settings & Validation +pydantic==2.12.5 +pydantic-settings==2.13.1 +python-dotenv==1.2.2 + +# Scheduler +apscheduler==3.11.2 + +# Utilities +requests==2.33.0 +pyyaml==6.0.3 +tomlkit==0.14.0 +simplejson==3.20.2 +humanize==4.15.0 +pendulum==3.2.0 +pytz==2026.1.post1 +tenacity==9.1.4 +pathvalidate==3.3.1 +requirements-parser==0.13.0 \ No newline at end of file diff --git a/replication_service/scheduler.py b/replication_service/scheduler.py new file mode 100644 index 0000000..33470d8 --- /dev/null +++ b/replication_service/scheduler.py @@ -0,0 +1,275 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED +from datetime import datetime, timedelta +from typing import List, Optional +import logging +import threading + +from database import PostgresSessionLocal, DatabaseManager +from models import MigrationTable, ReplicationStatus, ReplicationJob, DataSource +from replication import ReplicationService + +logger = logging.getLogger(__name__) + + +class SchedulerManager: + """Управление расписанием репликации таблиц с использованием DataSource""" + + def __init__(self): + self.scheduler = BackgroundScheduler() + self.scheduler.add_listener(self._scheduler_listener) + self.running_jobs = {} # Отслеживание текущих запущенных задач + self.job_queue = [] # Очередь задач при пересечении расписания + self.lock = threading.Lock() + self.replication_service = ReplicationService() + + def start(self): + """Запустить планировщик""" + if not self.scheduler.running: + self.scheduler.start() + logger.info("Scheduler started") + self._load_migration_tables() + + def stop(self): + """Остановить планировщик""" + if self.scheduler.running: + self.scheduler.shutdown() + logger.info("Scheduler stopped") + + def _load_migration_tables(self): + """Загрузить таблицы для миграции и установить расписание""" + try: + session = PostgresSessionLocal() + tables = session.query(MigrationTable).filter( + MigrationTable.is_active == True + ).all() + + for table in tables: + self._schedule_table(table) + source = session.query(DataSource).filter(DataSource.id == table.source_id).first() + source_name = f"{source.name}" if source else f"Source{table.source_id}" + logger.info(f"Scheduled table {table.table_name} from {source_name} with cron: {table.cron_schedule}") + + session.close() + except Exception as e: + logger.error(f"Error loading migration tables: {e}") + + def _schedule_table(self, table: MigrationTable): + """Установить расписание для конкретной таблицы""" + try: + job_id = f"replicate_{table.id}_{table.table_name}" + + # Удалить старую задачу если существует + if self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + # Парсить cron выражение + cron_trigger = CronTrigger.from_crontab(table.cron_schedule) + + # Добавить новую задачу со всеми новыми параметрами + self.scheduler.add_job( + self._execute_replication, + cron_trigger, + id=job_id, + args=[ + table.id, + table.table_name, + table.source_id, + table.source_schema, + table.target_schema, + table.target_id if hasattr(table, 'target_id') else None, + table.target_table_name if hasattr(table, 'target_table_name') else None, + table.column_mapping if hasattr(table, 'column_mapping') else None, + table.use_life_table if hasattr(table, 'use_life_table') else False, + table.life_excluded_fields if hasattr(table, 'life_excluded_fields') else None + ], + name=f"Replicate {table.table_name} (SourceID: {table.source_id})", + max_instances=1 # Только одно выполнение одновременно для этой таблицы + ) + + logger.info(f"Scheduled table {table.table_name} with source_id {table.source_id}") + + except Exception as e: + logger.error(f"Error scheduling table {table.table_name}: {e}") + + def _execute_replication( + self, + table_id: int, + table_name: str, + source_id: int, + source_schema: Optional[str], + target_schema: str, + target_id: Optional[int] = None, + target_table_name: Optional[str] = None, + column_mapping: Optional[dict] = None, + use_life_table: bool = False, + life_excluded_fields: Optional[list] = None + ): + """Выполнить репликацию таблицы со всеми параметрами""" + with self.lock: + # Проверить нет ли уже выполняющихся задач + if table_id in self.running_jobs: + logger.info(f"Table {table_name} (source_id={source_id}) is already being replicated, queuing job") + self.job_queue.append({ + 'table_id': table_id, + 'table_name': table_name, + 'source_id': source_id, + 'source_schema': source_schema, + 'target_schema': target_schema, + 'target_id': target_id, + 'target_table_name': target_table_name, + 'column_mapping': column_mapping, + 'use_life_table': use_life_table, + 'life_excluded_fields': life_excluded_fields + }) + return + + # Отметить как выполняющаяся + job_id = datetime.utcnow().timestamp() + self.running_jobs[table_id] = job_id + + try: + logger.info(f"Starting replication for table {table_name} from source_id={source_id} to target_id={target_id or 'default'}") + + # Получить информацию об источнике + session = PostgresSessionLocal() + data_source = session.query(DataSource).filter(DataSource.id == source_id).first() + source_name = f"{data_source.name}" if data_source else f"Source{source_id}" + + # Создать запись о задаче + job_record = self.replication_service.create_replication_job(table_id, table_name) + session.close() + + # Выполнить репликацию с поддержкой всех новых параметров + success, rows_count = self.replication_service.replicate_table( + table_name=table_name, + source_id=source_id, + source_schema=source_schema, + target_schema=target_schema, + target_id=target_id, + target_table_name=target_table_name, + column_mapping=column_mapping, + life_excluded_fields=life_excluded_fields + ) + + # Обработать изменения из Life таблицы (если включено) + life_changes = {"INSERT": 0, "UPDATE": 0, "DELETE": 0} + if data_source and use_life_table: + try: + life_changes = self.replication_service.process_life_table_changes( + table_name=table_name, + source_id=source_id, + use_life_table=use_life_table, + life_excluded_fields=life_excluded_fields, + target_id=target_id, + target_table_name=target_table_name, + column_mapping=column_mapping + ) + except Exception as e: + logger.debug(f"Error processing Life table changes: {e}") + + if success: + self.replication_service.update_job_status( + job_record.id, + ReplicationStatus.SUCCESS, + rows_count + ) + logger.info( + f"Successfully replicated table {table_name} from {source_name} to target_id={target_id or 'default'}. " + f"Rows: {rows_count}, " + f"Changes - INSERT: {life_changes['INSERT']}, " + f"UPDATE: {life_changes['UPDATE']}, " + f"DELETE: {life_changes['DELETE']}" + ) + else: + self.replication_service.update_job_status( + job_record.id, + ReplicationStatus.FAILED, + error_message=f"Failed to replicate {table_name} from {source_name}" + ) + logger.error(f"Failed to replicate table {table_name} from {source_name}") + + except Exception as e: + logger.error(f"Error during replication of {table_name} from source_id={source_id}: {e}", exc_info=True) + try: + self.replication_service.update_job_status( + job_record.id, + ReplicationStatus.FAILED, + error_message=str(e) + ) + except: + pass + + finally: + # Удалить из выполняющихся + with self.lock: + if table_id in self.running_jobs: + del self.running_jobs[table_id] + + # Выполнить следующую задачу из очереди + if self.job_queue: + next_job = self.job_queue.pop(0) + logger.info(f"Processing queued job for {next_job['table_name']} (source_id={next_job['source_id']})") + self._execute_replication( + next_job['table_id'], + next_job['table_name'], + next_job['source_id'], + next_job['source_schema'], + next_job['target_schema'], + next_job.get('target_id'), + next_job.get('target_table_name'), + next_job.get('column_mapping'), + next_job.get('use_life_table', False), + next_job.get('life_excluded_fields') + ) + + def add_job(self, migration_table: MigrationTable): + """Добавить таблицу в расписание""" + self._schedule_table(migration_table) + + def remove_job(self, table_id: int): + """Удалить таблицу из расписания""" + try: + job_id = f"replicate_{table_id}_*" + # Получить все jobs с этим префиксом + for job in self.scheduler.get_jobs(): + if job.id.startswith(f"replicate_{table_id}_"): + self.scheduler.remove_job(job.id) + logger.info(f"Removed scheduled job for table_id {table_id}") + return True + return False + except Exception as e: + logger.error(f"Error removing job for table_id {table_id}: {e}") + return False + + def _scheduler_listener(self, event): + """Listener для событий планировщика""" + if event.code == 'EVENT_JOB_ERROR': + logger.error(f"Job failed with exception") + else: + logger.debug(f"Job executed successfully") + + def get_status(self) -> dict: + """Получить статус планировщика""" + return { + "is_running": self.scheduler.running, + "running_jobs": len(self.running_jobs), + "queued_jobs": len(self.job_queue), + "total_jobs": len(self.scheduler.get_jobs()) + } + + def get_migration_tables(self) -> List[MigrationTable]: + """Получить список всех активных таблиц миграции""" + session = PostgresSessionLocal() + try: + tables = session.query(MigrationTable).filter( + MigrationTable.is_active == True + ).all() + return tables + finally: + session.close() + + +# Глобальный экземпляр SchedulerManager +scheduler_manager = SchedulerManager() diff --git a/replication_service/utils.py b/replication_service/utils.py new file mode 100644 index 0000000..3d665cb --- /dev/null +++ b/replication_service/utils.py @@ -0,0 +1,283 @@ +""" +Utility scripts для Data Replication Service +""" + +import sys +import logging +from database import DatabaseManager, PostgresSessionLocal +from models import MigrationTable +from scheduler import scheduler_manager + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def init_db(): + """Инициализировать базу данных""" + logger.info("Инициализация PostgreSQL...") + DatabaseManager.init_postgres_db() + logger.info("✓ База данных инициализирована успешно") + + +def test_connections(): + """Проверить подключения к обеим БД""" + logger.info("Проверка подключений...") + + mssql_ok = DatabaseManager.test_mssql_connection() + postgres_ok = DatabaseManager.test_postgres_connection() + + status = "✓" if mssql_ok else "✗" + logger.info(f"{status} MSSQL: {'Подключено' if mssql_ok else 'Не подключено'}") + + status = "✓" if postgres_ok else "✗" + logger.info(f"{status} PostgreSQL: {'Подключено' if postgres_ok else 'Не подключено'}") + + return mssql_ok and postgres_ok + + +def list_tables(): + """Вывести список таблиц для миграции""" + session = PostgresSessionLocal() + tables = session.query(MigrationTable).filter( + MigrationTable.is_active == True + ).all() + + if not tables: + logger.info("Нет таблиц для миграции") + session.close() + return + + logger.info(f"\nТаблицы для миграции ({len(tables)} шт):\n") + logger.info(f"{'ID':<4} {'Таблица':<20} {'Расписание':<20} {'Активна':<10}") + logger.info("-" * 54) + + for table in tables: + logger.info( + f"{table.id:<4} {table.table_name:<20} {table.cron_schedule:<20} " + f"{'Да' if table.is_active else 'Нет':<10}" + ) + + session.close() + + +def add_table(table_name: str, cron_schedule: str, source_type: str = "mssql"): + """Добавить таблицу для миграции""" + logger.info(f"Добавление таблицы {table_name} ({source_type}) с расписанием '{cron_schedule}'...") + + table = scheduler_manager.add_migration_table( + table_name=table_name, + cron_schedule=cron_schedule, + source_type=source_type, + source_schema="dbo" if source_type == "mssql" else "public", + target_schema="public" + ) + + if table: + logger.info(f"✓ Таблица добавлена успешно (ID: {table.id})") + else: + logger.error(f"✗ Ошибка при добавлении таблицы") + + +def remove_table(table_id: int): + """Удалить таблицу из миграции""" + logger.info(f"Удаление таблицы с ID {table_id}...") + + if scheduler_manager.remove_migration_table(table_id): + logger.info("✓ Таблица удалена успешно") + else: + logger.error("✗ Таблица не найдена") + + +def show_schedules(): + """Показать текущее расписание задач""" + jobs = scheduler_manager.scheduler.get_jobs() + + if not jobs: + logger.info("Нет запланированных задач") + return + + logger.info(f"\nЗапланированные задачи ({len(jobs)} шт):\n") + logger.info(f"{'ID':<40} {'Имя':<30} {'Следующее выполнение':<25}") + logger.info("-" * 95) + + for job in jobs: + logger.info( + f"{job.id:<40} {job.name:<30} " + f"{str(job.next_run_time):<25}" + ) + + +def start_scheduler(): + """Запустить планировщик""" + if scheduler_manager.scheduler.running: + logger.info("Планировщик уже запущен") + else: + logger.info("Запуск планировщика...") + scheduler_manager.start() + logger.info("✓ Планировщик запущен") + + +def stop_scheduler(): + """Остановить планировщик""" + if not scheduler_manager.scheduler.running: + logger.info("Планировщик уже остановлен") + else: + logger.info("Остановка планировщика...") + scheduler_manager.stop() + logger.info("✓ Планировщик остановлен") + + +def status(): + """Показать статус системы""" + is_running = scheduler_manager.scheduler.running + tables = scheduler_manager.get_migration_tables() + running_jobs = scheduler_manager.get_running_jobs() + queued_jobs = scheduler_manager.get_queued_jobs() + + logger.info("\n" + "=" * 50) + logger.info("Статус Data Replication Service") + logger.info("=" * 50) + + status_icon = "▶" if is_running else "⏸" + logger.info(f"{status_icon} Планировщик: {'Запущен' if is_running else 'Остановлен'}") + logger.info(f"📊 Таблиц для миграции: {len(tables)}") + logger.info(f"⚙️ Выполняющихся задач: {len(running_jobs)}") + logger.info(f"📋 Задач в очереди: {len(queued_jobs)}") + + # Проверить подключения + mssql_ok = DatabaseManager.test_mssql_connection() + postgres_ok = DatabaseManager.test_postgres_connection() + + mssql_status = "✓" if mssql_ok else "✗" + postgres_status = "✓" if postgres_ok else "✗" + + logger.info(f"{mssql_status} MSSQL: {'Подключено' if mssql_ok else 'Ошибка'}") + logger.info(f"{postgres_status} PostgreSQL: {'Подключено' if postgres_ok else 'Ошибка'}") + + +def help_command(): + """Показать справку""" + print(""" +Data Replication Service - Утилита управления + +Использование: python utils.py <команда> [аргументы] + +Команды: + init-db Инициализировать базу данных PostgreSQL + test-connections Проверить подключения к MSSQL и PostgreSQL + list-tables Вывести список таблиц для миграции + add-table Добавить таблицу для миграции + Аргументы: <имя_таблицы> + Пример: python utils.py add-table Orders "0 2 * * *" + remove-table Удалить таблицу из миграции + Аргументы: + Пример: python utils.py remove-table 1 + show-schedules Показать расписание запланированных задач + start-scheduler Запустить планировщик + stop-scheduler Остановить планировщик + status Показать статус системы + help Показать эту справку + +Примеры использования: + + # Инициализировать БД + python utils.py init-db + + # Проверить подключения + python utils.py test-connections + + # Добавить таблицу для репликации из MSSQL каждый день в 2:00 AM + python utils.py add-table Orders "0 2 * * *" mssql + + # Добавить таблицу для репликации из PostgreSQL каждые 30 минут + python utils.py add-table Products "*/30 * * * *" pgsql + + # Добавить таблицу (по умолчанию MSSQL) + python utils.py add-table Customers "0 3 * * *" + + # Вывести список таблиц + python utils.py list-tables + + # Показать текущее расписание + python utils.py show-schedules + + # Запустить планировщик + python utils.py start-scheduler + + # Остановить планировщик + python utils.py stop-scheduler + + # Показать статус + python utils.py status + +Cron расписание (формат: минуты часы день_месяца месяц день_недели): + 0 2 * * * - Каждый день в 2:00 AM + */30 * * * * - Каждые 30 минут + 0 0 * * 0 - Каждое воскресенье в полночь + 0 9,12,15 * * * - В 9:00, 12:00 и 15:00 каждый день + 0 */4 * * * - Каждые 4 часа + 0 0 1 * * - Первый день месяца в полночь + """) + + +def main(): + """Главная функция""" + if len(sys.argv) < 2: + help_command() + return + + command = sys.argv[1] + + try: + if command == "init-db": + init_db() + + elif command == "test-connections": + test_connections() + + elif command == "list-tables": + list_tables() + + elif command == "add-table": + if len(sys.argv) < 4: + logger.error("Ошибка: add-table требует минимум 2 аргумента (имя и расписание)") + print("Использование: python utils.py add-table <имя> [источник]") + print("Источник: mssql (по умолчанию) или pgsql") + sys.exit(1) + source_type = sys.argv[4] if len(sys.argv) > 4 else "mssql" + add_table(sys.argv[2], sys.argv[3], source_type) + + elif command == "remove-table": + if len(sys.argv) < 3: + logger.error("Ошибка: remove-table требует 1 аргумент (ID)") + print("Использование: python utils.py remove-table ") + sys.exit(1) + remove_table(int(sys.argv[2])) + + elif command == "show-schedules": + show_schedules() + + elif command == "start-scheduler": + start_scheduler() + + elif command == "stop-scheduler": + stop_scheduler() + + elif command == "status": + status() + + elif command == "help": + help_command() + + else: + logger.error(f"Неизвестная команда: {command}") + help_command() + sys.exit(1) + + except Exception as e: + logger.error(f"Ошибка при выполнении команды: {e}", exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + main()