Files
2026-03-29 23:24:15 +09:00

117 lines
5.0 KiB
Python

from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, Enum, ForeignKey, JSON
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import enum
import json
Base = declarative_base()
class ReplicationStatus(str, enum.Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
class SourceType(str, enum.Enum):
"""Тип источника данных"""
MSSQL = "mssql"
PGSQL = "pgsql"
class DatabaseType(str, enum.Enum):
"""Тип целевой БД"""
PGSQL = "pgsql"
MSSQL = "mssql"
class DataSource(Base):
"""Модель для хранения конфигурации источников данных"""
__tablename__ = "data_sources"
id = Column(Integer, primary_key=True)
name = Column(String(255), unique=True, nullable=False)
source_type = Column(Enum(SourceType), nullable=False)
host = Column(String(255), nullable=False)
port = Column(Integer, nullable=False)
database = Column(String(255), nullable=False)
username = Column(String(255), nullable=False)
password = Column(String(255), nullable=False)
default_schema = Column(String(255), default="dbo")
is_active = Column(Boolean, default=True)
description = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class TargetDatabase(Base):
"""Модель для хранения конфигурации целевых баз данных"""
__tablename__ = "target_databases"
id = Column(Integer, primary_key=True)
name = Column(String(255), unique=True, nullable=False) # Имя целевой БД
db_type = Column(Enum(DatabaseType), default=DatabaseType.PGSQL)
host = Column(String(255), nullable=False)
port = Column(Integer, nullable=False)
database = Column(String(255), nullable=False)
username = Column(String(255), nullable=False)
password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
is_default = Column(Boolean, default=False) # Целевая БД по умолчанию
description = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class MigrationTable(Base):
"""Модель для хранения таблиц, которые нужно мигрировать"""
__tablename__ = "migration_tables"
id = Column(Integer, primary_key=True)
table_name = Column(String(255), nullable=False)
source_id = Column(Integer, ForeignKey("data_sources.id"), nullable=False)
target_id = Column(Integer, ForeignKey("target_databases.id")) # Целевая БД (если None, используется default)
source_schema = Column(String(255), default="dbo")
target_schema = Column(String(255), default="public")
target_table_name = Column(String(255)) # Переименование таблицы при копировании (если None, используется table_name)
column_mapping = Column(JSON) # {"source_col": "target_col", ...} для переименования столбцов
# Конфигурация Life таблиц
use_life_table = Column(Boolean, default=False) # Обрабатывать изменения из Life таблицы
life_excluded_fields = Column(JSON) # ["field1", "field2", ...] - сервисные поля для исключения
is_active = Column(Boolean, default=True)
cron_schedule = Column(String(255), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ReplicationJob(Base):
"""Модель для логирования выполнения репликации"""
__tablename__ = "replication_jobs"
id = Column(Integer, primary_key=True)
table_id = Column(Integer, nullable=False)
table_name = Column(String(255), nullable=False)
status = Column(Enum(ReplicationStatus), default=ReplicationStatus.PENDING)
started_at = Column(DateTime)
completed_at = Column(DateTime)
rows_processed = Column(Integer, default=0)
error_message = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
class ChangeLog(Base):
"""Модель для хранения логов изменений (Life таблицы)"""
__tablename__ = "change_logs"
id = Column(Integer, primary_key=True)
table_id = Column(Integer, nullable=False)
table_name = Column(String(255), nullable=False)
operation = Column(String(10), nullable=False) # INSERT, UPDATE, DELETE
change_data = Column(Text, nullable=False) # JSON
change_timestamp = Column(DateTime, nullable=False)
processed = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)