Files
project-carrier/replication_service/migrate_to_data_sources.py
2026-03-29 23:24:15 +09:00

128 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Миграционный скрипт для обновления схемы БД.
Этот скрипт обновляет migration_tables таблицу с новой структурой source_id.
"""
import sys
sys.path.insert(0, '/home/brus/project-carrier/replication_service')
from database import DatabaseManager, postgres_engine
from sqlalchemy import text
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def migrate_to_data_sources():
"""Миграция существующей конфигурации в новую систему DataSource"""
logger.info("Starting migration to DataSource-based configuration...")
# Инициализировать новую схему если еще не создана
DatabaseManager.init_postgres_db()
logger.info("Database schema initialized with DataSource model")
session = DatabaseManager.get_postgres_session()
try:
# Проверить существуют ли там уже данные в migration_tables с источником source_type
from models import MigrationTable, DataSource, SourceType
from sqlalchemy import inspect
inspector = inspect(postgres_engine)
tables = inspector.get_table_names()
if 'migration_tables' in tables:
logger.info("migration_tables exists, checking for old source_type column...")
# Получить информацию о колонках
columns = [c['name'] for c in inspector.get_columns('migration_tables')]
if 'source_type' in columns:
logger.info("Found old source_type column, migrating data...")
# Чтение старых данных
with postgres_engine.connect() as connection:
# Получить уникальные источники из old migration_tables
result = connection.execute(
text("SELECT DISTINCT source_type FROM migration_tables")
)
source_types = [row[0] for row in result]
logger.info(f"Found source types: {source_types}")
# Для каждого типа источника, создать DataSource запись
for source_type in source_types:
if source_type == 'mssql':
# Попытаться загрузить данные из конфига если доступны
from config import settings
source = DataSource(
name=f"MSSQL Auto-migrated",
source_type=SourceType.MSSQL,
host=settings.mssql_server,
port=settings.mssql_port,
database=settings.mssql_database,
username=settings.mssql_username,
password=settings.mssql_password,
default_schema="dbo",
description="Auto-migrated from environment variables"
)
elif source_type == 'pgsql':
from config import settings
source = DataSource(
name=f"PostgreSQL Auto-migrated",
source_type=SourceType.PGSQL,
host=settings.pgsql_source_host,
port=settings.pgsql_source_port,
database=settings.pgsql_source_database,
username=settings.pgsql_source_username,
password=settings.pgsql_source_password,
default_schema="public",
description="Auto-migrated from environment variables"
)
else:
logger.warning(f"Unknown source type: {source_type}, skipping")
continue
session.add(source)
session.commit()
logger.info(f"Created {len(source_types)} DataSource records")
# Обновить migration_tables с source_id
sources = session.query(DataSource).all()
source_map = {s.source_type.value: s.id for s in sources}
migrations = session.query(MigrationTable).all()
for migration in migrations:
if hasattr(migration, 'source_type'):
source_type_val = migration.source_type.value if hasattr(migration.source_type, 'value') else str(migration.source_type)
if source_type_val in source_map:
migration.source_id = source_map[source_type_val]
session.commit()
logger.info(f"Updated {len(migrations)} migration records with source_id")
else:
logger.info("source_type column not found, schema already migrated or no migration_tables")
else:
logger.info("migration_tables doesn't exist yet, fresh install")
logger.info("✓ Migration completed successfully!")
return True
except Exception as e:
logger.error(f"Migration failed: {e}", exc_info=True)
session.rollback()
return False
finally:
session.close()
if __name__ == "__main__":
success = migrate_to_data_sources()
sys.exit(0 if success else 1)