Files
replicator/app/models/replication.py
2026-03-13 17:11:39 +09:00

195 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/models/replication.py
from typing import List, Optional
from sqlalchemy import JSON, Column, ForeignKey, Null, String, DateTime, BigInteger, Integer, Boolean, Time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, Mapped, mapped_column
from datetime import datetime
Base = declarative_base()
class ReplicationMetadata(Base):
"""Модель для хранения метаданных репликации"""
__tablename__ = 'replication_metadata'
#__table_args__ = {"schema": "replicator"}
id: Mapped[int] = mapped_column(Integer, primary_key=True)
table_name: Mapped[str] = mapped_column(String(100), nullable=False)
life_table_name: Mapped[str] = mapped_column(String(100), nullable=True)
description: Mapped[str] = mapped_column(String(255), nullable=True)
last_sync_time: Mapped[datetime] = mapped_column(DateTime, nullable=True)
last_id: Mapped[int] = mapped_column(BigInteger, nullable=True)
total_rows: Mapped[int] = mapped_column(BigInteger, default=0)
last_error: Mapped[str] = mapped_column(String(500), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
schedule: Mapped["ReplicationSchedule"] = relationship("ReplicationSchedule", back_populates="table", cascade="all, delete-orphan")
def __repr__(self):
return f"<ReplicationMetadata(table='{self.table_name}')>"
@classmethod
def create_if_not_exists(cls, session, table_name: str):
"""Создает запись метаданных, если её нет"""
record = session.query(cls).filter_by(table_name=table_name).first()
if not record:
record = cls(
table_name=table_name,
last_sync_time=datetime(1900, 1, 1), # Начало времен
last_id=0,
total_rows=0
)
session.add(record)
session.commit()
return record
def update_sync_time(self, session):
"""Обновляет время последней синхронизации"""
self.last_sync_time = datetime.now()
self.updated_at = datetime.now()
session.commit()
def update_last_id(self, session, last_id: int):
"""Обновляет последний обработанный ID"""
self.last_id = last_id
self.updated_at = datetime.now()
session.commit()
def increment_total_rows(self, session, count: int = 1):
"""Увеличивает счетчик общего количества строк"""
self.total_rows += count
self.updated_at = datetime.now()
session.commit()
def has_use_life(self):
"""Разрешение использовать Life таблицу для миграции"""
return self.life_table_name is not None
class ReplicationSchedule(Base):
"""Модель для расписания миграции таблицы"""
__tablename__ = 'replication_schedules'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
metadata_id: Mapped[int] = mapped_column(Integer, ForeignKey('replication_metadata.id', ondelete='CASCADE'), nullable=False)
schedule_time: Mapped[datetime] = mapped_column(Time, nullable=False, default=datetime.strptime("00:00", "%H:%M").time())
days: Mapped[JSON] = mapped_column(JSON, nullable=False, default=list) # Храним список дней как JSON
full_reload: Mapped[bool] = mapped_column(Boolean, default=False)
enabled: Mapped[bool] = mapped_column(Boolean, default=True)
last_run: Mapped[datetime] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
name: Mapped[str] = mapped_column(String(100), nullable=True)
description: Mapped[str] = mapped_column(String(500), nullable=True)
# Связь с метаданными
table: Mapped[ReplicationMetadata] = relationship("ReplicationMetadata", back_populates="schedule")
# Маппинг дней для обратной совместимости
DAYS_MAP = {
'monday': 0, 'mon': 0, 'пн': 0, 'понедельник': 0,
'tuesday': 1, 'tue': 1, 'вт': 1, 'вторник': 1,
'wednesday': 2, 'wed': 2, 'ср': 2, 'среда': 2,
'thursday': 3, 'thu': 3, 'чт': 3, 'четверг': 3,
'friday': 4, 'fri': 4, 'пт': 4, 'пятница': 4,
'saturday': 5, 'sat': 5, 'сб': 5, 'суббота': 5,
'sunday': 6, 'sun': 6, 'вс': 6, 'воскресенье': 6
}
@property
def days_list(self) -> List[int]:
"""Получить список дней как числа (0-6)"""
if not self.days:
return list(range(7)) # Все дни по умолчанию
# Преобразуем названия в числа
result = []
for day in self.days:
day_lower = day.lower().strip()
if day_lower in self.DAYS_MAP:
result.append(self.DAYS_MAP[day_lower])
elif isinstance(day, int) and 0 <= day <= 6:
result.append(day)
return result if result else list(range(7))
@property
def days_display(self) -> List[str]:
"""Получить отображаемые названия дней"""
reverse_map = {v: k for k, v in self.DAYS_MAP.items()}
days_list = []
for day_num in sorted(self.days_list):
# Ищем русское название
for name, num in self.DAYS_MAP.items():
if num == day_num and name in ['пн', 'вт', 'ср', 'чт', 'пт', 'сб', 'вс']:
days_list.append(name)
break
else:
days_list.append(str(day_num))
return days_list
def should_run_today(self, check_date: Optional[datetime] = None) -> bool:
"""Проверить, должна ли таблица запускаться сегодня"""
if check_date is None:
check_date = datetime.now()
today = check_date.weekday()
return today in self.days_list
def to_dict(self) -> dict:
"""Безопасное преобразование в словарь (работает и с отсоединенными объектами)"""
# Используем __dict__, но исключаем служебные поля SQLAlchemy
data = {
'id': self.id,
'metadata_id': self.metadata_id,
'schedule_time': self.schedule_time.strftime("%H:%M") if self.schedule_time else "00:00",
'days': self.days, # Сохраняем как есть, а days_display вычислим отдельно
'full_reload': self.full_reload,
'enabled': self.enabled,
'last_run': self.last_run.isoformat() if self.last_run else None,
'name': self.name,
'description': self.description,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'table': self.table
}
# Вычисляем days_display на основе days
if hasattr(self, 'days') and self.days:
data['days_display'] = self._days_to_str_from_list(self.days)
else:
data['days_display'] = []
return data
def _days_to_str_from_list(self, days_list):
"""Вычисляет отображаемые дни из списка"""
result = []
for day_num in sorted(days_list if days_list else []):
for name, num in self.DAYS_MAP.items():
if num == day_num and name in ['пн', 'вт', 'ср', 'чт', 'пт', 'сб', 'вс']:
result.append(name)
break
return result
def __repr__(self):
return f"<ReplicationSchedule(table='{self.table_name}', time='{self.schedule_time}')>"
class ReplicationLog(Base):
"""Модель для логирования операций репликации"""
__tablename__ = 'replication_logs'
id = Column(Integer, primary_key=True, autoincrement=True)
table_name = Column(String(100), nullable=False)
operation = Column(String(20), nullable=False) # INSERT, UPDATE, DELETE
records_count = Column(Integer, default=0)
status = Column(String(20), nullable=False) # SUCCESS, ERROR
error_message = Column(String(500), nullable=True)
created_at = Column(DateTime, default=datetime.now)
def __repr__(self):
return f"<ReplicationLog(table='{self.table_name}', op='{self.operation}', status='{self.status}')>"