""" Примеры использования Data Replication Service Этот файл содержит примеры того, как использовать сервис программно. """ from database import DatabaseManager, PostgresSessionLocal from scheduler import scheduler_manager from replication import ReplicationService from models import MigrationTable, ReplicationJob, ReplicationStatus import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def example_1_initialize_service(): """Пример 1: Инициализация сервиса""" print("=" * 50) print("Пример 1: Инициализация сервиса") print("=" * 50) # Инициализировать БД DatabaseManager.init_postgres_db() logger.info("✓ PostgreSQL БД инициализирована") # Проверить подключения mssql_ok = DatabaseManager.test_mssql_connection() postgres_ok = DatabaseManager.test_postgres_connection() logger.info(f"✓ MSSQL подключение: {'OK' if mssql_ok else 'FAILED'}") logger.info(f"✓ PostgreSQL подключение: {'OK' if postgres_ok else 'FAILED'}") def example_2_add_migration_tables(): """Пример 2: Добавление таблиц для миграции""" print("\n" + "=" * 50) print("Пример 2: Добавление таблиц для миграции") print("=" * 50) # Запустить планировщик scheduler_manager.start() # Добавить таблицы с разным расписанием и источниками tables = [ { "name": "Orders", "source": "mssql", "schedule": "0 2 * * *", # Каждый день в 2:00 AM "description": "Заказы из MSSQL - ежедневно в 2:00" }, { "name": "Customers", "source": "mssql", "schedule": "0 3 * * *", # Каждый день в 3:00 AM "description": "Клиенты из MSSQL - ежедневно в 3:00" }, { "name": "Products", "source": "pgsql", "schedule": "*/30 * * * *", # Каждые 30 минут "description": "Товары из PostgreSQL - каждые 30 минут" }, { "name": "Transactions", "source": "mssql", "schedule": "0 * * * *", # Каждый час "description": "Транзакции из MSSQL - каждый час" } ] for table in tables: migration_table = scheduler_manager.add_migration_table( table_name=table["name"], cron_schedule=table["schedule"], source_type=table["source"], source_schema="dbo" if table["source"] == "mssql" else "public", target_schema="public" ) if migration_table: logger.info(f"✓ {table['description']} - добавлена (ID: {migration_table.id})") else: logger.error(f"✗ Ошибка при добавлении {table['name']}") def example_3_list_migration_tables(): """Пример 3: Получение списка таблиц для миграции""" print("\n" + "=" * 50) print("Пример 3: Список таблиц для миграции") print("=" * 50) tables = scheduler_manager.get_migration_tables() if tables: logger.info(f"Найдено {len(tables)} таблиц для миграции:\n") for table in tables: logger.info(f" ID: {table.id}") logger.info(f" Таблица: {table.table_name}") logger.info(f" Источник: {table.source_type.upper()} ({table.source_schema}.{table.table_name})") logger.info(f" Цель: {table.target_schema}.{table.table_name}") logger.info(f" Расписание: {table.cron_schedule}") logger.info(f" Активна: {table.is_active}") logger.info("") else: logger.info("Нет таблиц для миграции") def example_4_scheduler_status(): """Пример 4: Проверка статуса планировщика""" print("\n" + "=" * 50) print("Пример 4: Статус планировщика") print("=" * 50) is_running = scheduler_manager.scheduler.running tables = scheduler_manager.get_migration_tables() running_jobs = scheduler_manager.get_running_jobs() queued_jobs = scheduler_manager.get_queued_jobs() logger.info(f"Планировщик запущен: {is_running}") logger.info(f"Таблиц для миграции: {len(tables)}") logger.info(f"Выполняющихся задач: {len(running_jobs)}") logger.info(f"Задач в очереди: {len(queued_jobs)}") # Вывести информацию о расписании jobs = scheduler_manager.scheduler.get_jobs() if jobs: logger.info(f"\nРасписание задач:") for job in jobs: logger.info(f" - {job.name}") logger.info(f" Следующее выполнение: {job.next_run_time}") def example_5_manual_replication(): """Пример 5: Ручная репликация таблицы""" print("\n" + "=" * 50) print("Пример 5: Ручная репликация таблицы") print("=" * 50) service = ReplicationService() try: # Создать запись о задаче job = service.create_replication_job( table_id=1, table_name="Orders" ) logger.info(f"✓ Создана задача репликации (ID: {job.id})") # Выполнить репликацию success, rows_count = service.replicate_table( table_name="Orders", source_schema="dbo", target_schema="public" ) if success: logger.info(f"✓ Успешно реплицировано {rows_count} строк из Orders") # Обновить статус service.update_job_status( job.id, ReplicationStatus.SUCCESS, rows_count ) logger.info(f"✓ Статус задачи обновлен на SUCCESS") else: logger.error(f"✗ Ошибка при репликации Orders") service.update_job_status( job.id, ReplicationStatus.FAILED, error_message="Ошибка репликации" ) finally: service.close() def example_6_view_replication_history(): """Пример 6: Просмотр истории репликации""" print("\n" + "=" * 50) print("Пример 6: История репликации") print("=" * 50) session = PostgresSessionLocal() try: # Получить последние 5 задач jobs = session.query(ReplicationJob).order_by( ReplicationJob.created_at.desc() ).limit(5).all() if jobs: logger.info(f"Последние {len(jobs)} задач репликации:\n") for job in jobs: logger.info(f" ID: {job.id}") logger.info(f" Таблица: {job.table_name}") logger.info(f" Статус: {job.status}") logger.info(f" Строк обработано: {job.rows_processed}") logger.info(f" Создано: {job.created_at}") if job.error_message: logger.info(f" Ошибка: {job.error_message}") logger.info("") else: logger.info("История репликации пуста") finally: session.close() def example_7_life_table_processing(): """Пример 7: Обработка Life таблиц (логи изменений)""" print("\n" + "=" * 50) print("Пример 7: Обработка Life таблиц") print("=" * 50) service = ReplicationService() try: # Обработать изменения из Life таблицы changes = service.process_life_table_changes("Orders") logger.info(f"✓ Обработаны изменения из LifeOrders:") logger.info(f" INSERT операции: {changes['INSERT']}") logger.info(f" UPDATE операции: {changes['UPDATE']}") logger.info(f" DELETE операции: {changes['DELETE']}") finally: service.close() def example_8_remove_migration_table(): """Пример 8: Удаление таблицы из миграции""" print("\n" + "=" * 50) print("Пример 8: Удаление таблицы из миграции") print("=" * 50) # Удалить таблицу с ID 1 success = scheduler_manager.remove_migration_table(table_id=1) if success: logger.info("✓ Таблица успешно удалена из миграции") else: logger.info("✗ Таблица не найдена") def example_9_statistics(): """Пример 9: Статистика репликации""" print("\n" + "=" * 50) print("Пример 9: Статистика репликации") print("=" * 50) session = PostgresSessionLocal() try: total_jobs = session.query(ReplicationJob).count() successful_jobs = session.query(ReplicationJob).filter( ReplicationJob.status == ReplicationStatus.SUCCESS ).count() failed_jobs = session.query(ReplicationJob).filter( ReplicationJob.status == ReplicationStatus.FAILED ).count() success_rate = (successful_jobs / total_jobs * 100) if total_jobs > 0 else 0 logger.info(f"Всего задач: {total_jobs}") logger.info(f"Успешных: {successful_jobs}") logger.info(f"Ошибок: {failed_jobs}") logger.info(f"Процент успеха: {success_rate:.2f}%") finally: session.close() def example_10_concurrent_schedules(): """ Пример 10: Демонстрация обработки конфликтов расписания Когда расписание нескольких таблиц пересекается, они выполняются последовательно в очереди. """ print("\n" + "=" * 50) print("Пример 10: Обработка конфликтов расписания") print("=" * 50) scheduler_manager.start() # Добавить таблицы с пересекающимся расписанием logger.info("Добавление таблиц с пересекающимся расписанием...") # Все три таблицы будут начинать репликацию в 2:00 AM # но выполняться последовательно scheduler_manager.add_migration_table( table_name="Orders", cron_schedule="0 2 * * *", # 2:00 AM source_schema="dbo" ) scheduler_manager.add_migration_table( table_name="Customers", cron_schedule="0 2 * * *", # 2:00 AM (конфликт) source_schema="dbo" ) scheduler_manager.add_migration_table( table_name="Products", cron_schedule="0 2 * * *", # 2:00 AM (конфликт) source_schema="dbo" ) logger.info("✓ Таблицы добавлены") logger.info("\nКогда наступит 2:00 AM:") logger.info(" 1. Начнет выполняться Orders") logger.info(" 2. Customers будет добавлена в очередь") logger.info(" 3. Products будет добавлена в очередь") logger.info(" 4. После завершения Orders начнет выполняться Customers") logger.info(" 5. После завершения Customers начнет выполняться Products") if __name__ == "__main__": # Выполнить примеры try: example_1_initialize_service() example_2_add_migration_tables() example_3_list_migration_tables() example_4_scheduler_status() # example_5_manual_replication() # Раскомментировать если нужна ручная репликация example_6_view_replication_history() # example_7_life_table_processing() # Раскомментировать если есть Life таблицы # example_8_remove_migration_table() # Раскомментировать для удаления example_9_statistics() example_10_concurrent_schedules() logger.info("\n✓ Все примеры выполнены!") except Exception as e: logger.error(f"Ошибка при выполнении примеров: {e}", exc_info=True) finally: scheduler_manager.stop()