341 lines
13 KiB
Python
341 lines
13 KiB
Python
"""
|
||
Примеры использования Data Replication Service
|
||
|
||
Этот файл содержит примеры того, как использовать сервис программно.
|
||
"""
|
||
|
||
from database import DatabaseManager, PostgresSessionLocal
|
||
from scheduler import scheduler_manager
|
||
from replication import ReplicationService
|
||
from models import MigrationTable, ReplicationJob, ReplicationStatus
|
||
import logging
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def example_1_initialize_service():
|
||
"""Пример 1: Инициализация сервиса"""
|
||
print("=" * 50)
|
||
print("Пример 1: Инициализация сервиса")
|
||
print("=" * 50)
|
||
|
||
# Инициализировать БД
|
||
DatabaseManager.init_postgres_db()
|
||
logger.info("✓ PostgreSQL БД инициализирована")
|
||
|
||
# Проверить подключения
|
||
mssql_ok = DatabaseManager.test_mssql_connection()
|
||
postgres_ok = DatabaseManager.test_postgres_connection()
|
||
|
||
logger.info(f"✓ MSSQL подключение: {'OK' if mssql_ok else 'FAILED'}")
|
||
logger.info(f"✓ PostgreSQL подключение: {'OK' if postgres_ok else 'FAILED'}")
|
||
|
||
|
||
def example_2_add_migration_tables():
|
||
"""Пример 2: Добавление таблиц для миграции"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 2: Добавление таблиц для миграции")
|
||
print("=" * 50)
|
||
|
||
# Запустить планировщик
|
||
scheduler_manager.start()
|
||
|
||
# Добавить таблицы с разным расписанием и источниками
|
||
tables = [
|
||
{
|
||
"name": "Orders",
|
||
"source": "mssql",
|
||
"schedule": "0 2 * * *", # Каждый день в 2:00 AM
|
||
"description": "Заказы из MSSQL - ежедневно в 2:00"
|
||
},
|
||
{
|
||
"name": "Customers",
|
||
"source": "mssql",
|
||
"schedule": "0 3 * * *", # Каждый день в 3:00 AM
|
||
"description": "Клиенты из MSSQL - ежедневно в 3:00"
|
||
},
|
||
{
|
||
"name": "Products",
|
||
"source": "pgsql",
|
||
"schedule": "*/30 * * * *", # Каждые 30 минут
|
||
"description": "Товары из PostgreSQL - каждые 30 минут"
|
||
},
|
||
{
|
||
"name": "Transactions",
|
||
"source": "mssql",
|
||
"schedule": "0 * * * *", # Каждый час
|
||
"description": "Транзакции из MSSQL - каждый час"
|
||
}
|
||
]
|
||
|
||
for table in tables:
|
||
migration_table = scheduler_manager.add_migration_table(
|
||
table_name=table["name"],
|
||
cron_schedule=table["schedule"],
|
||
source_type=table["source"],
|
||
source_schema="dbo" if table["source"] == "mssql" else "public",
|
||
target_schema="public"
|
||
)
|
||
|
||
if migration_table:
|
||
logger.info(f"✓ {table['description']} - добавлена (ID: {migration_table.id})")
|
||
else:
|
||
logger.error(f"✗ Ошибка при добавлении {table['name']}")
|
||
|
||
|
||
def example_3_list_migration_tables():
|
||
"""Пример 3: Получение списка таблиц для миграции"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 3: Список таблиц для миграции")
|
||
print("=" * 50)
|
||
|
||
tables = scheduler_manager.get_migration_tables()
|
||
|
||
if tables:
|
||
logger.info(f"Найдено {len(tables)} таблиц для миграции:\n")
|
||
for table in tables:
|
||
logger.info(f" ID: {table.id}")
|
||
logger.info(f" Таблица: {table.table_name}")
|
||
logger.info(f" Источник: {table.source_type.upper()} ({table.source_schema}.{table.table_name})")
|
||
logger.info(f" Цель: {table.target_schema}.{table.table_name}")
|
||
logger.info(f" Расписание: {table.cron_schedule}")
|
||
logger.info(f" Активна: {table.is_active}")
|
||
logger.info("")
|
||
else:
|
||
logger.info("Нет таблиц для миграции")
|
||
|
||
|
||
def example_4_scheduler_status():
|
||
"""Пример 4: Проверка статуса планировщика"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 4: Статус планировщика")
|
||
print("=" * 50)
|
||
|
||
is_running = scheduler_manager.scheduler.running
|
||
tables = scheduler_manager.get_migration_tables()
|
||
running_jobs = scheduler_manager.get_running_jobs()
|
||
queued_jobs = scheduler_manager.get_queued_jobs()
|
||
|
||
logger.info(f"Планировщик запущен: {is_running}")
|
||
logger.info(f"Таблиц для миграции: {len(tables)}")
|
||
logger.info(f"Выполняющихся задач: {len(running_jobs)}")
|
||
logger.info(f"Задач в очереди: {len(queued_jobs)}")
|
||
|
||
# Вывести информацию о расписании
|
||
jobs = scheduler_manager.scheduler.get_jobs()
|
||
if jobs:
|
||
logger.info(f"\nРасписание задач:")
|
||
for job in jobs:
|
||
logger.info(f" - {job.name}")
|
||
logger.info(f" Следующее выполнение: {job.next_run_time}")
|
||
|
||
|
||
def example_5_manual_replication():
|
||
"""Пример 5: Ручная репликация таблицы"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 5: Ручная репликация таблицы")
|
||
print("=" * 50)
|
||
|
||
service = ReplicationService()
|
||
|
||
try:
|
||
# Создать запись о задаче
|
||
job = service.create_replication_job(
|
||
table_id=1,
|
||
table_name="Orders"
|
||
)
|
||
logger.info(f"✓ Создана задача репликации (ID: {job.id})")
|
||
|
||
# Выполнить репликацию
|
||
success, rows_count = service.replicate_table(
|
||
table_name="Orders",
|
||
source_schema="dbo",
|
||
target_schema="public"
|
||
)
|
||
|
||
if success:
|
||
logger.info(f"✓ Успешно реплицировано {rows_count} строк из Orders")
|
||
|
||
# Обновить статус
|
||
service.update_job_status(
|
||
job.id,
|
||
ReplicationStatus.SUCCESS,
|
||
rows_count
|
||
)
|
||
logger.info(f"✓ Статус задачи обновлен на SUCCESS")
|
||
else:
|
||
logger.error(f"✗ Ошибка при репликации Orders")
|
||
service.update_job_status(
|
||
job.id,
|
||
ReplicationStatus.FAILED,
|
||
error_message="Ошибка репликации"
|
||
)
|
||
|
||
finally:
|
||
service.close()
|
||
|
||
|
||
def example_6_view_replication_history():
|
||
"""Пример 6: Просмотр истории репликации"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 6: История репликации")
|
||
print("=" * 50)
|
||
|
||
session = PostgresSessionLocal()
|
||
|
||
try:
|
||
# Получить последние 5 задач
|
||
jobs = session.query(ReplicationJob).order_by(
|
||
ReplicationJob.created_at.desc()
|
||
).limit(5).all()
|
||
|
||
if jobs:
|
||
logger.info(f"Последние {len(jobs)} задач репликации:\n")
|
||
for job in jobs:
|
||
logger.info(f" ID: {job.id}")
|
||
logger.info(f" Таблица: {job.table_name}")
|
||
logger.info(f" Статус: {job.status}")
|
||
logger.info(f" Строк обработано: {job.rows_processed}")
|
||
logger.info(f" Создано: {job.created_at}")
|
||
if job.error_message:
|
||
logger.info(f" Ошибка: {job.error_message}")
|
||
logger.info("")
|
||
else:
|
||
logger.info("История репликации пуста")
|
||
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def example_7_life_table_processing():
|
||
"""Пример 7: Обработка Life таблиц (логи изменений)"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 7: Обработка Life таблиц")
|
||
print("=" * 50)
|
||
|
||
service = ReplicationService()
|
||
|
||
try:
|
||
# Обработать изменения из Life таблицы
|
||
changes = service.process_life_table_changes("Orders")
|
||
|
||
logger.info(f"✓ Обработаны изменения из LifeOrders:")
|
||
logger.info(f" INSERT операции: {changes['INSERT']}")
|
||
logger.info(f" UPDATE операции: {changes['UPDATE']}")
|
||
logger.info(f" DELETE операции: {changes['DELETE']}")
|
||
|
||
finally:
|
||
service.close()
|
||
|
||
|
||
def example_8_remove_migration_table():
|
||
"""Пример 8: Удаление таблицы из миграции"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 8: Удаление таблицы из миграции")
|
||
print("=" * 50)
|
||
|
||
# Удалить таблицу с ID 1
|
||
success = scheduler_manager.remove_migration_table(table_id=1)
|
||
|
||
if success:
|
||
logger.info("✓ Таблица успешно удалена из миграции")
|
||
else:
|
||
logger.info("✗ Таблица не найдена")
|
||
|
||
|
||
def example_9_statistics():
|
||
"""Пример 9: Статистика репликации"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 9: Статистика репликации")
|
||
print("=" * 50)
|
||
|
||
session = PostgresSessionLocal()
|
||
|
||
try:
|
||
total_jobs = session.query(ReplicationJob).count()
|
||
successful_jobs = session.query(ReplicationJob).filter(
|
||
ReplicationJob.status == ReplicationStatus.SUCCESS
|
||
).count()
|
||
failed_jobs = session.query(ReplicationJob).filter(
|
||
ReplicationJob.status == ReplicationStatus.FAILED
|
||
).count()
|
||
|
||
success_rate = (successful_jobs / total_jobs * 100) if total_jobs > 0 else 0
|
||
|
||
logger.info(f"Всего задач: {total_jobs}")
|
||
logger.info(f"Успешных: {successful_jobs}")
|
||
logger.info(f"Ошибок: {failed_jobs}")
|
||
logger.info(f"Процент успеха: {success_rate:.2f}%")
|
||
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def example_10_concurrent_schedules():
|
||
"""
|
||
Пример 10: Демонстрация обработки конфликтов расписания
|
||
|
||
Когда расписание нескольких таблиц пересекается,
|
||
они выполняются последовательно в очереди.
|
||
"""
|
||
print("\n" + "=" * 50)
|
||
print("Пример 10: Обработка конфликтов расписания")
|
||
print("=" * 50)
|
||
|
||
scheduler_manager.start()
|
||
|
||
# Добавить таблицы с пересекающимся расписанием
|
||
logger.info("Добавление таблиц с пересекающимся расписанием...")
|
||
|
||
# Все три таблицы будут начинать репликацию в 2:00 AM
|
||
# но выполняться последовательно
|
||
scheduler_manager.add_migration_table(
|
||
table_name="Orders",
|
||
cron_schedule="0 2 * * *", # 2:00 AM
|
||
source_schema="dbo"
|
||
)
|
||
|
||
scheduler_manager.add_migration_table(
|
||
table_name="Customers",
|
||
cron_schedule="0 2 * * *", # 2:00 AM (конфликт)
|
||
source_schema="dbo"
|
||
)
|
||
|
||
scheduler_manager.add_migration_table(
|
||
table_name="Products",
|
||
cron_schedule="0 2 * * *", # 2:00 AM (конфликт)
|
||
source_schema="dbo"
|
||
)
|
||
|
||
logger.info("✓ Таблицы добавлены")
|
||
logger.info("\nКогда наступит 2:00 AM:")
|
||
logger.info(" 1. Начнет выполняться Orders")
|
||
logger.info(" 2. Customers будет добавлена в очередь")
|
||
logger.info(" 3. Products будет добавлена в очередь")
|
||
logger.info(" 4. После завершения Orders начнет выполняться Customers")
|
||
logger.info(" 5. После завершения Customers начнет выполняться Products")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Выполнить примеры
|
||
try:
|
||
example_1_initialize_service()
|
||
example_2_add_migration_tables()
|
||
example_3_list_migration_tables()
|
||
example_4_scheduler_status()
|
||
# example_5_manual_replication() # Раскомментировать если нужна ручная репликация
|
||
example_6_view_replication_history()
|
||
# example_7_life_table_processing() # Раскомментировать если есть Life таблицы
|
||
# example_8_remove_migration_table() # Раскомментировать для удаления
|
||
example_9_statistics()
|
||
example_10_concurrent_schedules()
|
||
|
||
logger.info("\n✓ Все примеры выполнены!")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при выполнении примеров: {e}", exc_info=True)
|
||
|
||
finally:
|
||
scheduler_manager.stop()
|