""" Utility scripts для Data Replication Service """ import sys import logging from database import DatabaseManager, PostgresSessionLocal from models import MigrationTable from scheduler import scheduler_manager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def init_db(): """Инициализировать базу данных""" logger.info("Инициализация PostgreSQL...") DatabaseManager.init_postgres_db() logger.info("✓ База данных инициализирована успешно") def test_connections(): """Проверить подключения к обеим БД""" logger.info("Проверка подключений...") mssql_ok = DatabaseManager.test_mssql_connection() postgres_ok = DatabaseManager.test_postgres_connection() status = "✓" if mssql_ok else "✗" logger.info(f"{status} MSSQL: {'Подключено' if mssql_ok else 'Не подключено'}") status = "✓" if postgres_ok else "✗" logger.info(f"{status} PostgreSQL: {'Подключено' if postgres_ok else 'Не подключено'}") return mssql_ok and postgres_ok def list_tables(): """Вывести список таблиц для миграции""" session = PostgresSessionLocal() tables = session.query(MigrationTable).filter( MigrationTable.is_active == True ).all() if not tables: logger.info("Нет таблиц для миграции") session.close() return logger.info(f"\nТаблицы для миграции ({len(tables)} шт):\n") logger.info(f"{'ID':<4} {'Таблица':<20} {'Расписание':<20} {'Активна':<10}") logger.info("-" * 54) for table in tables: logger.info( f"{table.id:<4} {table.table_name:<20} {table.cron_schedule:<20} " f"{'Да' if table.is_active else 'Нет':<10}" ) session.close() def add_table(table_name: str, cron_schedule: str, source_type: str = "mssql"): """Добавить таблицу для миграции""" logger.info(f"Добавление таблицы {table_name} ({source_type}) с расписанием '{cron_schedule}'...") table = scheduler_manager.add_migration_table( table_name=table_name, cron_schedule=cron_schedule, source_type=source_type, source_schema="dbo" if source_type == "mssql" else "public", target_schema="public" ) if table: logger.info(f"✓ Таблица добавлена успешно (ID: {table.id})") else: logger.error(f"✗ Ошибка при добавлении таблицы") def remove_table(table_id: int): """Удалить таблицу из миграции""" logger.info(f"Удаление таблицы с ID {table_id}...") if scheduler_manager.remove_migration_table(table_id): logger.info("✓ Таблица удалена успешно") else: logger.error("✗ Таблица не найдена") def show_schedules(): """Показать текущее расписание задач""" jobs = scheduler_manager.scheduler.get_jobs() if not jobs: logger.info("Нет запланированных задач") return logger.info(f"\nЗапланированные задачи ({len(jobs)} шт):\n") logger.info(f"{'ID':<40} {'Имя':<30} {'Следующее выполнение':<25}") logger.info("-" * 95) for job in jobs: logger.info( f"{job.id:<40} {job.name:<30} " f"{str(job.next_run_time):<25}" ) def start_scheduler(): """Запустить планировщик""" if scheduler_manager.scheduler.running: logger.info("Планировщик уже запущен") else: logger.info("Запуск планировщика...") scheduler_manager.start() logger.info("✓ Планировщик запущен") def stop_scheduler(): """Остановить планировщик""" if not scheduler_manager.scheduler.running: logger.info("Планировщик уже остановлен") else: logger.info("Остановка планировщика...") scheduler_manager.stop() logger.info("✓ Планировщик остановлен") def status(): """Показать статус системы""" is_running = scheduler_manager.scheduler.running tables = scheduler_manager.get_migration_tables() running_jobs = scheduler_manager.get_running_jobs() queued_jobs = scheduler_manager.get_queued_jobs() logger.info("\n" + "=" * 50) logger.info("Статус Data Replication Service") logger.info("=" * 50) status_icon = "▶" if is_running else "⏸" logger.info(f"{status_icon} Планировщик: {'Запущен' if is_running else 'Остановлен'}") logger.info(f"📊 Таблиц для миграции: {len(tables)}") logger.info(f"⚙️ Выполняющихся задач: {len(running_jobs)}") logger.info(f"📋 Задач в очереди: {len(queued_jobs)}") # Проверить подключения mssql_ok = DatabaseManager.test_mssql_connection() postgres_ok = DatabaseManager.test_postgres_connection() mssql_status = "✓" if mssql_ok else "✗" postgres_status = "✓" if postgres_ok else "✗" logger.info(f"{mssql_status} MSSQL: {'Подключено' if mssql_ok else 'Ошибка'}") logger.info(f"{postgres_status} PostgreSQL: {'Подключено' if postgres_ok else 'Ошибка'}") def help_command(): """Показать справку""" print(""" Data Replication Service - Утилита управления Использование: python utils.py <команда> [аргументы] Команды: init-db Инициализировать базу данных PostgreSQL test-connections Проверить подключения к MSSQL и PostgreSQL list-tables Вывести список таблиц для миграции add-table Добавить таблицу для миграции Аргументы: <имя_таблицы> Пример: python utils.py add-table Orders "0 2 * * *" remove-table Удалить таблицу из миграции Аргументы: Пример: python utils.py remove-table 1 show-schedules Показать расписание запланированных задач start-scheduler Запустить планировщик stop-scheduler Остановить планировщик status Показать статус системы help Показать эту справку Примеры использования: # Инициализировать БД python utils.py init-db # Проверить подключения python utils.py test-connections # Добавить таблицу для репликации из MSSQL каждый день в 2:00 AM python utils.py add-table Orders "0 2 * * *" mssql # Добавить таблицу для репликации из PostgreSQL каждые 30 минут python utils.py add-table Products "*/30 * * * *" pgsql # Добавить таблицу (по умолчанию MSSQL) python utils.py add-table Customers "0 3 * * *" # Вывести список таблиц python utils.py list-tables # Показать текущее расписание python utils.py show-schedules # Запустить планировщик python utils.py start-scheduler # Остановить планировщик python utils.py stop-scheduler # Показать статус python utils.py status Cron расписание (формат: минуты часы день_месяца месяц день_недели): 0 2 * * * - Каждый день в 2:00 AM */30 * * * * - Каждые 30 минут 0 0 * * 0 - Каждое воскресенье в полночь 0 9,12,15 * * * - В 9:00, 12:00 и 15:00 каждый день 0 */4 * * * - Каждые 4 часа 0 0 1 * * - Первый день месяца в полночь """) def main(): """Главная функция""" if len(sys.argv) < 2: help_command() return command = sys.argv[1] try: if command == "init-db": init_db() elif command == "test-connections": test_connections() elif command == "list-tables": list_tables() elif command == "add-table": if len(sys.argv) < 4: logger.error("Ошибка: add-table требует минимум 2 аргумента (имя и расписание)") print("Использование: python utils.py add-table <имя> [источник]") print("Источник: mssql (по умолчанию) или pgsql") sys.exit(1) source_type = sys.argv[4] if len(sys.argv) > 4 else "mssql" add_table(sys.argv[2], sys.argv[3], source_type) elif command == "remove-table": if len(sys.argv) < 3: logger.error("Ошибка: remove-table требует 1 аргумент (ID)") print("Использование: python utils.py remove-table ") sys.exit(1) remove_table(int(sys.argv[2])) elif command == "show-schedules": show_schedules() elif command == "start-scheduler": start_scheduler() elif command == "stop-scheduler": stop_scheduler() elif command == "status": status() elif command == "help": help_command() else: logger.error(f"Неизвестная команда: {command}") help_command() sys.exit(1) except Exception as e: logger.error(f"Ошибка при выполнении команды: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": main()