#!/usr/bin/env python3 """ Примеры использования новой DataSource API для управления источниками данных. """ import requests import json from typing import Dict, List BASE_URL = "http://localhost:8000/api/v1" # Цвета для вывода GREEN = '\033[92m' RED = '\033[91m' BLUE = '\033[94m' YELLOW = '\033[93m' RESET = '\033[0m' def print_section(title: str): """Красиво вывести заголовок секции""" print(f"\n{BLUE}{'='*60}{RESET}") print(f"{BLUE}{title:^60}{RESET}") print(f"{BLUE}{'='*60}{RESET}\n") def print_response(response: requests.Response, title: str = "Response"): """Вывести HTTP response в красивом формате""" print(f"{YELLOW}{title}:{RESET}") try: print(json.dumps(response.json(), indent=2)) except: print(response.text) print() def example_1_create_data_sources(): """Пример 1: Создание источников данных""" print_section("Пример 1: Создание источников данных") # 1. Создать MSSQL источник print(f"{BLUE}1. Создание MSSQL источника...{RESET}") mssql_source = { "name": "MSSQL Production", "source_type": "mssql", "host": "mssql-server.example.com", "port": 1433, "database": "SalesDatabase", "username": "sa", "password": "StrongPassword123!", "default_schema": "dbo", "description": "Production MSSQL server for sales data" } response = requests.post(f"{BASE_URL}/data-sources", json=mssql_source) print_response(response, "Created MSSQL Source") mssql_id = response.json()['id'] if response.status_code == 200 else None # 2. Создать PostgreSQL источник print(f"{BLUE}2. Создание PostgreSQL источника...{RESET}") pgsql_source = { "name": "PostgreSQL Analytics", "source_type": "pgsql", "host": "postgres-analytics.example.com", "port": 5432, "database": "analytics_db", "username": "analyst", "password": "AnalyticsPass456!", "default_schema": "public", "description": "Analytics database for reporting" } response = requests.post(f"{BASE_URL}/data-sources", json=pgsql_source) print_response(response, "Created PostgreSQL Source") pgsql_id = response.json()['id'] if response.status_code == 200 else None return mssql_id, pgsql_id def example_2_list_and_test_sources(mssql_id: int, pgsql_id: int): """Пример 2: Получить список источников и протестировать подключения""" print_section("Пример 2: Список источников и тестирование подключений") # 1. Получить полный список print(f"{BLUE}1. Получение списка всех источников...{RESET}") response = requests.get(f"{BASE_URL}/data-sources") print_response(response, "All Data Sources") # 2. Получить только активные print(f"{BLUE}2. Получение списка активных источников...{RESET}") response = requests.get(f"{BASE_URL}/data-sources?active_only=true") print_response(response, "Active Data Sources") # 3. Получить конкретный источник if mssql_id: print(f"{BLUE}3. Получение конкретного источника (ID: {mssql_id})...{RESET}") response = requests.get(f"{BASE_URL}/data-sources/{mssql_id}") print_response(response, f"Data Source {mssql_id}") # 4. Протестировать подключение if mssql_id: print(f"{BLUE}4. Тестирование подключения к MSSQL (ID: {mssql_id})...{RESET}") response = requests.post(f"{BASE_URL}/data-sources/{mssql_id}/test") print_response(response, "MSSQL Connection Test Result") def example_3_create_migrations(mssql_id: int, pgsql_id: int): """Пример 3: Создание миграций (расписание репликации)""" print_section("Пример 3: Создание миграций таблиц") # 1. Создать миграцию для таблицы Orders из MSSQL print(f"{BLUE}1. Создание миграции таблицы Orders из MSSQL...{RESET}") migration_1 = { "table_name": "Orders", "source_id": mssql_id, "source_schema": "dbo", "target_schema": "public", "cron_schedule": "0 2 * * *" # Каждый день в 2:00 AM } response = requests.post(f"{BASE_URL}/migration-tables", json=migration_1) print_response(response, "Created Migration for Orders") migration_1_id = response.json()['id'] if response.status_code == 200 else None # 2. Создать миграцию для таблицы Products из PostgreSQL if pgsql_id: print(f"{BLUE}2. Создание миграции таблицы Products из PostgreSQL...{RESET}") migration_2 = { "table_name": "Products", "source_id": pgsql_id, "source_schema": "public", "target_schema": "public", "cron_schedule": "0 4 * * *" # Каждый день в 4:00 AM } response = requests.post(f"{BASE_URL}/migration-tables", json=migration_2) print_response(response, "Created Migration for Products") return migration_1_id def example_4_list_migrations(migration_1_id: int): """Пример 4: Получить список миграций""" print_section("Пример 4: Получение списка миграций") # 1. Получить все миграции print(f"{BLUE}1. Получение всех миграций...{RESET}") response = requests.get(f"{BASE_URL}/migration-tables") print_response(response, "All Migrations") # 2. Получить только активные print(f"{BLUE}2. Получение активных миграций...{RESET}") response = requests.get(f"{BASE_URL}/migration-tables?active_only=true") print_response(response, "Active Migrations") # 3. Получить конкретную миграцию if migration_1_id: print(f"{BLUE}3. Получение конкретной миграции (ID: {migration_1_id})...{RESET}") response = requests.get(f"{BASE_URL}/migration-tables/{migration_1_id}") print_response(response, f"Migration {migration_1_id}") def example_5_update_data_source(mssql_id: int): """Пример 5: Обновить источник данных""" print_section("Пример 5: Обновление источника данных") if not mssql_id: print(f"{RED}No MSSQL source ID available{RESET}") return print(f"{BLUE}Обновление узла MSSQL источника...{RESET}") update_data = { "host": "new-mssql-server.example.com", "port": 1433 } response = requests.put(f"{BASE_URL}/data-sources/{mssql_id}", json=update_data) print_response(response, "Updated Data Source") def example_6_update_migration(migration_1_id: int): """Пример 6: Обновить миграцию""" print_section("Пример 6: Обновление миграции") if not migration_1_id: print(f"{RED}No migration ID available{RESET}") return print(f"{BLUE}Обновление расписания миграции...{RESET}") update_data = { "cron_schedule": "0 3 * * *" # Изменить на 3:00 AM } response = requests.put(f"{BASE_URL}/migration-tables/{migration_1_id}", json=update_data) print_response(response, "Updated Migration") def example_7_check_health(): """Пример 7: Проверить здоровье сервиса""" print_section("Пример 7: Проверка здоровья сервиса") response = requests.get(f"{BASE_URL}/health") print_response(response, "Service Health Check") def example_8_get_replication_jobs(): """Пример 8: Получить историю задач репликации""" print_section("Пример 8: История задач репликации") print(f"{BLUE}1. Получение последних 10 задач репликации...{RESET}") response = requests.get(f"{BASE_URL}/replication-jobs?limit=10&offset=0") print_response(response, "Recent Replication Jobs") print(f"{BLUE}2. Получение только успешных задач...{RESET}") response = requests.get(f"{BASE_URL}/replication-jobs?status=success&limit=10") print_response(response, "Successful Replication Jobs") def main(): """Главная функция - запустить все примеры""" print(f"\n{GREEN}{'='*60}{RESET}") print(f"{GREEN}{'Data Replication Service - API Examples':^60}{RESET}") print(f"{GREEN}{'='*60}{RESET}\n") try: # Проверить подключение к сервису response = requests.get(f"{BASE_URL.replace('/api/v1', '')}/") if response.status_code != 200: print(f"{RED}✗ Error: Cannot connect to service at {BASE_URL}{RESET}") print(f"{YELLOW}Make sure the service is running on http://localhost:8000{RESET}") return print(f"{GREEN}✓ Connected to Data Replication Service{RESET}\n") # Запустить примеры mssql_id, pgsql_id = example_1_create_data_sources() example_2_list_and_test_sources(mssql_id, pgsql_id) migration_1_id = example_3_create_migrations(mssql_id, pgsql_id) example_4_list_migrations(migration_1_id) example_5_update_data_source(mssql_id) example_6_update_migration(migration_1_id) example_7_check_health() example_8_get_replication_jobs() print(f"\n{GREEN}{'='*60}{RESET}") print(f"{GREEN}{'✓ All examples completed successfully!':^60}{RESET}") print(f"{GREEN}{'='*60}{RESET}\n") print(f"{YELLOW}📚 Documentation: See DATA_SOURCES.md for detailed API reference{RESET}\n") except requests.exceptions.ConnectionError: print(f"{RED}✗ Connection error: Cannot connect to {BASE_URL}{RESET}") print(f"{YELLOW}Make sure the service is running: python main.py{RESET}") except Exception as e: print(f"{RED}✗ Error: {e}{RESET}") if __name__ == "__main__": main()