Files
2026-03-29 23:24:15 +09:00

284 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Utility scripts для Data Replication Service
"""
import sys
import logging
from database import DatabaseManager, PostgresSessionLocal
from models import MigrationTable
from scheduler import scheduler_manager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def init_db():
"""Инициализировать базу данных"""
logger.info("Инициализация PostgreSQL...")
DatabaseManager.init_postgres_db()
logger.info("✓ База данных инициализирована успешно")
def test_connections():
"""Проверить подключения к обеим БД"""
logger.info("Проверка подключений...")
mssql_ok = DatabaseManager.test_mssql_connection()
postgres_ok = DatabaseManager.test_postgres_connection()
status = "" if mssql_ok else ""
logger.info(f"{status} MSSQL: {'Подключено' if mssql_ok else 'Не подключено'}")
status = "" if postgres_ok else ""
logger.info(f"{status} PostgreSQL: {'Подключено' if postgres_ok else 'Не подключено'}")
return mssql_ok and postgres_ok
def list_tables():
"""Вывести список таблиц для миграции"""
session = PostgresSessionLocal()
tables = session.query(MigrationTable).filter(
MigrationTable.is_active == True
).all()
if not tables:
logger.info("Нет таблиц для миграции")
session.close()
return
logger.info(f"\nТаблицы для миграции ({len(tables)} шт):\n")
logger.info(f"{'ID':<4} {'Таблица':<20} {'Расписание':<20} {'Активна':<10}")
logger.info("-" * 54)
for table in tables:
logger.info(
f"{table.id:<4} {table.table_name:<20} {table.cron_schedule:<20} "
f"{'Да' if table.is_active else 'Нет':<10}"
)
session.close()
def add_table(table_name: str, cron_schedule: str, source_type: str = "mssql"):
"""Добавить таблицу для миграции"""
logger.info(f"Добавление таблицы {table_name} ({source_type}) с расписанием '{cron_schedule}'...")
table = scheduler_manager.add_migration_table(
table_name=table_name,
cron_schedule=cron_schedule,
source_type=source_type,
source_schema="dbo" if source_type == "mssql" else "public",
target_schema="public"
)
if table:
logger.info(f"✓ Таблица добавлена успешно (ID: {table.id})")
else:
logger.error(f"✗ Ошибка при добавлении таблицы")
def remove_table(table_id: int):
"""Удалить таблицу из миграции"""
logger.info(f"Удаление таблицы с ID {table_id}...")
if scheduler_manager.remove_migration_table(table_id):
logger.info("✓ Таблица удалена успешно")
else:
logger.error("✗ Таблица не найдена")
def show_schedules():
"""Показать текущее расписание задач"""
jobs = scheduler_manager.scheduler.get_jobs()
if not jobs:
logger.info("Нет запланированных задач")
return
logger.info(f"\nЗапланированные задачи ({len(jobs)} шт):\n")
logger.info(f"{'ID':<40} {'Имя':<30} {'Следующее выполнение':<25}")
logger.info("-" * 95)
for job in jobs:
logger.info(
f"{job.id:<40} {job.name:<30} "
f"{str(job.next_run_time):<25}"
)
def start_scheduler():
"""Запустить планировщик"""
if scheduler_manager.scheduler.running:
logger.info("Планировщик уже запущен")
else:
logger.info("Запуск планировщика...")
scheduler_manager.start()
logger.info("✓ Планировщик запущен")
def stop_scheduler():
"""Остановить планировщик"""
if not scheduler_manager.scheduler.running:
logger.info("Планировщик уже остановлен")
else:
logger.info("Остановка планировщика...")
scheduler_manager.stop()
logger.info("✓ Планировщик остановлен")
def status():
"""Показать статус системы"""
is_running = scheduler_manager.scheduler.running
tables = scheduler_manager.get_migration_tables()
running_jobs = scheduler_manager.get_running_jobs()
queued_jobs = scheduler_manager.get_queued_jobs()
logger.info("\n" + "=" * 50)
logger.info("Статус Data Replication Service")
logger.info("=" * 50)
status_icon = "" if is_running else ""
logger.info(f"{status_icon} Планировщик: {'Запущен' if is_running else 'Остановлен'}")
logger.info(f"📊 Таблиц для миграции: {len(tables)}")
logger.info(f"⚙️ Выполняющихся задач: {len(running_jobs)}")
logger.info(f"📋 Задач в очереди: {len(queued_jobs)}")
# Проверить подключения
mssql_ok = DatabaseManager.test_mssql_connection()
postgres_ok = DatabaseManager.test_postgres_connection()
mssql_status = "" if mssql_ok else ""
postgres_status = "" if postgres_ok else ""
logger.info(f"{mssql_status} MSSQL: {'Подключено' if mssql_ok else 'Ошибка'}")
logger.info(f"{postgres_status} PostgreSQL: {'Подключено' if postgres_ok else 'Ошибка'}")
def help_command():
"""Показать справку"""
print("""
Data Replication Service - Утилита управления
Использование: python utils.py <команда> [аргументы]
Команды:
init-db Инициализировать базу данных PostgreSQL
test-connections Проверить подключения к MSSQL и PostgreSQL
list-tables Вывести список таблиц для миграции
add-table Добавить таблицу для миграции
Аргументы: <имя_таблицы> <cron_расписание>
Пример: python utils.py add-table Orders "0 2 * * *"
remove-table Удалить таблицу из миграции
Аргументы: <id_таблицы>
Пример: python utils.py remove-table 1
show-schedules Показать расписание запланированных задач
start-scheduler Запустить планировщик
stop-scheduler Остановить планировщик
status Показать статус системы
help Показать эту справку
Примеры использования:
# Инициализировать БД
python utils.py init-db
# Проверить подключения
python utils.py test-connections
# Добавить таблицу для репликации из MSSQL каждый день в 2:00 AM
python utils.py add-table Orders "0 2 * * *" mssql
# Добавить таблицу для репликации из PostgreSQL каждые 30 минут
python utils.py add-table Products "*/30 * * * *" pgsql
# Добавить таблицу (по умолчанию MSSQL)
python utils.py add-table Customers "0 3 * * *"
# Вывести список таблиц
python utils.py list-tables
# Показать текущее расписание
python utils.py show-schedules
# Запустить планировщик
python utils.py start-scheduler
# Остановить планировщик
python utils.py stop-scheduler
# Показать статус
python utils.py status
Cron расписание (формат: минуты часы день_месяца месяц день_недели):
0 2 * * * - Каждый день в 2:00 AM
*/30 * * * * - Каждые 30 минут
0 0 * * 0 - Каждое воскресенье в полночь
0 9,12,15 * * * - В 9:00, 12:00 и 15:00 каждый день
0 */4 * * * - Каждые 4 часа
0 0 1 * * - Первый день месяца в полночь
""")
def main():
"""Главная функция"""
if len(sys.argv) < 2:
help_command()
return
command = sys.argv[1]
try:
if command == "init-db":
init_db()
elif command == "test-connections":
test_connections()
elif command == "list-tables":
list_tables()
elif command == "add-table":
if len(sys.argv) < 4:
logger.error("Ошибка: add-table требует минимум 2 аргумента (имя и расписание)")
print("Использование: python utils.py add-table <имя> <cron> [источник]")
print("Источник: mssql (по умолчанию) или pgsql")
sys.exit(1)
source_type = sys.argv[4] if len(sys.argv) > 4 else "mssql"
add_table(sys.argv[2], sys.argv[3], source_type)
elif command == "remove-table":
if len(sys.argv) < 3:
logger.error("Ошибка: remove-table требует 1 аргумент (ID)")
print("Использование: python utils.py remove-table <id>")
sys.exit(1)
remove_table(int(sys.argv[2]))
elif command == "show-schedules":
show_schedules()
elif command == "start-scheduler":
start_scheduler()
elif command == "stop-scheduler":
stop_scheduler()
elif command == "status":
status()
elif command == "help":
help_command()
else:
logger.error(f"Неизвестная команда: {command}")
help_command()
sys.exit(1)
except Exception as e:
logger.error(f"Ошибка при выполнении команды: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()