599 lines
27 KiB
Python
599 lines
27 KiB
Python
import re
|
||
from typing import Optional, List, Dict, Any
|
||
import traceback
|
||
from datetime import datetime
|
||
import pandas as pd
|
||
from app.models.replication import ReplicationSchedule
|
||
from app.services.replication_state import replication_state
|
||
from app.services.data_reader import data_reader
|
||
from app.services.data_writer import data_writer
|
||
from app.services.schema_manager import schema_manager
|
||
from app.utils.index_helpers import get_primary_key, get_max_id_from_postgres, get_foreign_keys, get_indexes
|
||
from app.utils.email_sender import email_sender
|
||
from app.core.logging import migration_logger
|
||
from app.core.config import settings
|
||
from app.core.database import db_connector
|
||
|
||
|
||
class DatabaseMigrator:
|
||
"""Мигратор данных по ID и по Life-таблицам"""
|
||
|
||
def __init__(self):
|
||
self.state = replication_state
|
||
self.reader = data_reader
|
||
self.writer = data_writer
|
||
self.schema = schema_manager
|
||
self.is_running = False
|
||
self.current_table = None
|
||
self.start_time = None
|
||
self.all_foreign_keys = {}
|
||
self.errors = []
|
||
|
||
# Таблицы, которые используют Life-механизм
|
||
self.life_tables = getattr(settings, 'LIFE_TABLES', [])
|
||
|
||
# Карта соответствия: основная таблица -> Life-таблица
|
||
self.life_mapping = {}
|
||
|
||
def _parse_table_name(self, table_name: str) -> Dict[str, str]:
|
||
"""
|
||
Парсит имя таблицы и возвращает компоненты.
|
||
|
||
Примеры:
|
||
"oms_kl_VisitResult" -> {
|
||
'schema': 'oms',
|
||
'basename': 'kl_VisitResult',
|
||
'full_name': 'oms_kl_VisitResult'
|
||
}
|
||
"stt_MedicalHistory" -> {
|
||
'schema': 'stt',
|
||
'basename': 'MedicalHistory',
|
||
'full_name': 'stt_MedicalHistory'
|
||
}
|
||
"""
|
||
# Ищем префикс (oms_, stt_, и т.д.)
|
||
match = re.match(r'^([A-Za-z]+)_(.*)$', table_name)
|
||
if match:
|
||
schema = match.group(1)
|
||
basename = match.group(2)
|
||
return {
|
||
'schema': schema,
|
||
'basename': basename,
|
||
'full_name': table_name
|
||
}
|
||
else:
|
||
# Если нет префикса
|
||
return {
|
||
'schema': '',
|
||
'basename': table_name,
|
||
'full_name': table_name
|
||
}
|
||
|
||
def _get_life_table_name(self, table_name: str) -> Optional[str]:
|
||
"""Получает имя Life-таблицы для основной таблицы"""
|
||
parsed = self._parse_table_name(table_name)
|
||
|
||
if parsed['schema']:
|
||
return f"Life_{parsed['schema']}_{parsed['basename']}"
|
||
else:
|
||
return f"Life_{table_name}"
|
||
|
||
def _get_life_id_field(self, table_name: str) -> str:
|
||
"""Получает имя LifeID поля"""
|
||
parsed = self._parse_table_name(table_name)
|
||
|
||
return f"{parsed['basename']}LifeID"
|
||
|
||
def _get_base_id_field(self, table_name: str) -> str:
|
||
"""Получает имя базового ID поля"""
|
||
parsed = self._parse_table_name(table_name)
|
||
return f"{parsed['basename']}ID"
|
||
|
||
def migrate_table_by_time(self, table_name: str, life_table_name: str, last_sync_time: datetime) -> Dict[str, int]:
|
||
"""Миграция таблицы через Life-механизм по времени"""
|
||
base_id_field = self._get_base_id_field(table_name)
|
||
life_id_field = self._get_life_id_field(table_name)
|
||
|
||
migration_logger.info(f"Миграция {table_name} через {life_table_name} с {last_sync_time}")
|
||
|
||
stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0}
|
||
|
||
try:
|
||
# Получаем последние версии из Life-таблицы
|
||
query = f"""
|
||
WITH LatestLife AS (
|
||
SELECT
|
||
{base_id_field},
|
||
MAX({life_id_field}) as MaxLifeID
|
||
FROM {life_table_name}
|
||
WHERE x_DateTime > CAST(? AS datetime)
|
||
GROUP BY {base_id_field}
|
||
)
|
||
SELECT dl.*
|
||
FROM {life_table_name} dl
|
||
INNER JOIN LatestLife ll
|
||
ON dl.{life_id_field} = ll.MaxLifeID
|
||
"""
|
||
|
||
# Читаем данные чанками
|
||
chunk_size = getattr(settings, 'CHUNK_SIZE', 1000)
|
||
|
||
for chunk in self.reader.read_custom_query_chunked(query, params=(last_sync_time,), chunksize=chunk_size):
|
||
if chunk.empty:
|
||
continue
|
||
|
||
# Разделяем по операциям
|
||
inserts = chunk[chunk['x_Operation'] == 'i']
|
||
updates = chunk[chunk['x_Operation'] == 'u']
|
||
deletes = chunk[chunk['x_Operation'] == 'd']
|
||
|
||
# Обрабатываем вставки
|
||
if not inserts.empty:
|
||
inserts_to_write = self._prepare_data_for_write(inserts, table_name)
|
||
if not inserts_to_write.empty:
|
||
self.state.log_operation(
|
||
table_name=table_name,
|
||
operation='INSERT',
|
||
records_count=len(inserts)
|
||
)
|
||
self.writer.upsert_data(table_name, inserts_to_write, base_id_field)
|
||
stats['inserted'] += len(inserts)
|
||
|
||
# Обрабатываем обновления
|
||
if not updates.empty:
|
||
updates_to_write = self._prepare_data_for_write(updates, table_name)
|
||
if not updates_to_write.empty:
|
||
self.state.log_operation(
|
||
table_name=table_name,
|
||
operation='UPDATE',
|
||
records_count=len(updates)
|
||
)
|
||
self.writer.upsert_data(table_name, updates_to_write, base_id_field)
|
||
stats['updated'] += len(updates)
|
||
|
||
# Обрабатываем удаления
|
||
if not deletes.empty:
|
||
self.state.log_operation(
|
||
table_name=table_name,
|
||
operation='DELETE',
|
||
records_count=len(deletes)
|
||
)
|
||
delete_ids = deletes[base_id_field].tolist()
|
||
self.writer.delete_data(table_name, base_id_field, delete_ids)
|
||
stats['deleted'] += len(deletes)
|
||
|
||
stats['total'] += len(chunk)
|
||
|
||
migration_logger.info(f" Чанк: +{len(inserts)} вставок, ~{len(updates)} обновлений, -{len(deletes)} удалений")
|
||
|
||
if stats['total'] > 0:
|
||
migration_logger.info(f"{table_name}: +{stats['inserted']} вставок, ~{stats['updated']} обновлений, -{stats['deleted']} удалений")
|
||
else:
|
||
migration_logger.info(f"ℹ️ {table_name}: изменений нет")
|
||
|
||
except Exception as e:
|
||
error_msg = f"Ошибка при миграции {table_name} через Life: {e}"
|
||
migration_logger.error(error_msg)
|
||
migration_logger.error(e.args)
|
||
self.state.log_operation(
|
||
table_name=table_name,
|
||
operation='ERROR',
|
||
records_count=0,
|
||
status='ERROR',
|
||
error_message=str(e)[:500]
|
||
)
|
||
self.errors.append({
|
||
'table': table_name,
|
||
'error': error_msg,
|
||
'traceback': traceback.format_exc(),
|
||
'time': datetime.now()
|
||
})
|
||
raise
|
||
|
||
return stats
|
||
|
||
def _prepare_data_for_write(self, df: pd.DataFrame, table_name: str) -> pd.DataFrame:
|
||
"""Подготавливает данные из Life-таблицы для записи в основную таблицу"""
|
||
# Исключаем служебные поля
|
||
exclude_fields = {'x_Operation', 'x_DateTime', 'x_Seance', 'x_User'}
|
||
|
||
# Определяем, какие поля оставить
|
||
fields_to_keep = []
|
||
for col in df.columns:
|
||
if col not in exclude_fields and not col.endswith('LifeID'):
|
||
fields_to_keep.append(col)
|
||
|
||
result = df[fields_to_keep].copy()
|
||
|
||
# Убеждаемся, что нет дубликатов по ID
|
||
base_id_field = self._get_base_id_field(table_name)
|
||
result = result.drop_duplicates(subset=[base_id_field])
|
||
|
||
return result
|
||
|
||
def migrate_table(self, table_name: str, schedule_id: int, metadata_id: int, life_table_name: Optional[str], uses_life: bool = False, full_reload: bool = False) -> bool:
|
||
"""Миграция одной таблицы (поддерживает и ID, и Life)"""
|
||
migration_logger.table_start(table_name)
|
||
self.current_table = table_name
|
||
|
||
try:
|
||
# Получаем ID колонку для статистики
|
||
id_column = get_primary_key(table_name)
|
||
|
||
if uses_life and not full_reload and life_table_name:
|
||
# МИГРАЦИЯ ЧЕРЕЗ LIFE-ТАБЛИЦУ ПО ВРЕМЕНИ
|
||
last_sync = self.state.get_table_last_sync(metadata_id)
|
||
|
||
if last_sync:
|
||
stats = self.migrate_table_by_time(table_name, life_table_name, last_sync)
|
||
|
||
# Обновляем время синхронизации
|
||
self.state.update_table_sync_time(schedule_id)
|
||
|
||
# Обновляем статистику
|
||
if id_column:
|
||
self._update_table_statistics(table_name, id_column)
|
||
|
||
migration_logger.table_success(table_name, stats['total'])
|
||
return True
|
||
else:
|
||
# Если синхронизации не было - делаем полную загрузку
|
||
migration_logger.info(f"Первая синхронизация {table_name} - полная загрузка")
|
||
full_reload = True
|
||
|
||
if full_reload:
|
||
# ПОЛНАЯ ПЕРЕЗАГРУЗКА (по ID)
|
||
result = self._full_reload_by_id(table_name)
|
||
|
||
# Обновляем статистику после полной загрузки
|
||
if result and id_column:
|
||
self._update_table_statistics(table_name, id_column)
|
||
|
||
return result
|
||
else:
|
||
# ИНКРЕМЕНТАЛЬНАЯ ПО ID (для таблиц без Life)
|
||
result = self._incremental_by_id(table_name)
|
||
|
||
# Обновляем статистику после инкрементальной загрузки
|
||
if result and id_column:
|
||
self._update_table_statistics(table_name, id_column)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
error_msg = f"Критическая ошибка при обработке {table_name}: {e}"
|
||
migration_logger.error(error_msg)
|
||
self.errors.append({
|
||
'table': table_name,
|
||
'error': error_msg,
|
||
'traceback': traceback.format_exc(),
|
||
'time': datetime.now()
|
||
})
|
||
return False
|
||
finally:
|
||
self.current_table = None
|
||
|
||
def _full_reload_by_id(self, table_name: str) -> bool:
|
||
"""Полная перезагрузка таблицы по ID"""
|
||
migration_logger.info(f"Полная загрузка {table_name} по ID")
|
||
|
||
try:
|
||
# Получаем ID колонку
|
||
id_column = get_primary_key(table_name)
|
||
if not id_column:
|
||
error_msg = f"Не могу найти ID колонку для {table_name}"
|
||
migration_logger.error(error_msg)
|
||
self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()})
|
||
return False
|
||
|
||
# Получаем метаданные
|
||
foreign_keys = get_foreign_keys(table_name)
|
||
indexes = get_indexes(table_name)
|
||
|
||
# Загружаем данные чанками
|
||
first_chunk = True
|
||
total_rows = 0
|
||
|
||
for chunk in self.reader.read_by_id_chunked(table_name, id_column, None):
|
||
if first_chunk:
|
||
self.writer.create_table(table_name, chunk)
|
||
first_chunk = False
|
||
else:
|
||
self.writer.append_data(table_name, chunk)
|
||
|
||
total_rows += len(chunk)
|
||
|
||
if total_rows == 0:
|
||
migration_logger.warning(f"Таблица {table_name} пуста")
|
||
return True
|
||
|
||
# Создаем индексы
|
||
if indexes:
|
||
self.writer.create_indexes(table_name, indexes)
|
||
migration_logger.info(f"📇 Создано {len(indexes)} индексов")
|
||
|
||
# Сохраняем информацию о внешних ключах
|
||
if foreign_keys:
|
||
self.all_foreign_keys[table_name] = foreign_keys
|
||
migration_logger.info(f"🔗 Сохранено {len(foreign_keys)} внешних ключей")
|
||
|
||
# Обновляем last_id и время синхронизации
|
||
#max_id = self._get_max_id(table_name, id_column)
|
||
#self.state.update_last_id(table_name, max_id)
|
||
#self.state.update_table_sync_time(table_name)
|
||
#self.state.update_table_stats(table_name, total_rows)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"Ошибка при полной загрузке {table_name}: {e}"
|
||
migration_logger.error(error_msg)
|
||
self.errors.append({
|
||
'table': table_name,
|
||
'error': error_msg,
|
||
'traceback': traceback.format_exc(),
|
||
'time': datetime.now()
|
||
})
|
||
return False
|
||
|
||
def _incremental_by_id(self, table_name: str, metadata) -> bool:
|
||
"""Инкрементальная загрузка по ID (для таблиц без Life)"""
|
||
migration_logger.info(f"Инкрементальная загрузка {table_name} по ID")
|
||
|
||
try:
|
||
id_column = get_primary_key(table_name)
|
||
if not id_column:
|
||
error_msg = f"Не могу найти ID колонку для {table_name}"
|
||
migration_logger.error(error_msg)
|
||
self.errors.append({'table': table_name, 'error': error_msg, 'time': datetime.now()})
|
||
return False
|
||
|
||
foreign_keys = get_foreign_keys(table_name)
|
||
indexes = get_indexes(table_name)
|
||
|
||
# Проверяем новые колонки
|
||
new_columns = self.schema.detect_new_columns(table_name)
|
||
if new_columns:
|
||
self.schema.add_new_columns(table_name, new_columns)
|
||
|
||
# Получаем последний ID
|
||
last_id = self.state.get_last_id(table_name)
|
||
migration_logger.info(f"last_id из состояния: {last_id}")
|
||
|
||
if last_id is None:
|
||
last_id = get_max_id_from_postgres(table_name, id_column)
|
||
migration_logger.info(f"Последний ID в PG: {last_id}")
|
||
|
||
# Загружаем новые данные
|
||
total_loaded = 0
|
||
first_chunk = True
|
||
|
||
for chunk in self.reader.read_by_id_chunked(table_name, id_column, last_id):
|
||
if first_chunk:
|
||
# Проверяем структуру
|
||
pg_cols = {c['name'] for c in self.schema.get_postgres_columns(table_name)}
|
||
if not pg_cols.issubset(set(chunk.columns)):
|
||
missing = pg_cols - set(chunk.columns)
|
||
migration_logger.warning(f"В PG есть колонки, которых нет в чанке: {missing}")
|
||
first_chunk = False
|
||
|
||
self.writer.append_data(table_name, chunk)
|
||
total_loaded += len(chunk)
|
||
|
||
#if total_loaded > 0:
|
||
# # Обновляем последний ID и время синхронизации
|
||
# max_id = self._get_max_id(table_name, id_column)
|
||
# self.state.update_last_id(table_name, max_id)
|
||
# self.state.update_table_sync_time(table_name)
|
||
# self.state.update_table_stats(table_name, total_loaded)
|
||
|
||
# Сохраняем FK для создания позже
|
||
if foreign_keys:
|
||
if table_name not in self.all_foreign_keys:
|
||
self.all_foreign_keys[table_name] = []
|
||
self.all_foreign_keys[table_name].extend(foreign_keys)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"Ошибка при инкрементальной загрузке {table_name}: {e}"
|
||
migration_logger.error(error_msg)
|
||
self.errors.append({
|
||
'table': table_name,
|
||
'error': error_msg,
|
||
'traceback': traceback.format_exc(),
|
||
'time': datetime.now()
|
||
})
|
||
return False
|
||
|
||
def _get_max_id(self, table_name: str, id_column: str) -> int:
|
||
"""Получает максимальный ID из источника"""
|
||
max_id_query = f"SELECT MAX({id_column}) as max_id FROM {table_name}"
|
||
max_df = pd.read_sql_query(max_id_query, db_connector.src_engine)
|
||
return int(max_df.iloc[0]['max_id']) if not max_df.empty else 0
|
||
|
||
def create_all_foreign_keys(self):
|
||
"""Создать все внешние ключи после завершения миграции"""
|
||
if not self.all_foreign_keys:
|
||
migration_logger.info("Нет внешних ключей для создания")
|
||
return
|
||
|
||
migration_logger.info("="*60)
|
||
migration_logger.info("СОЗДАНИЕ ВНЕШНИХ КЛЮЧЕЙ")
|
||
migration_logger.info("="*60)
|
||
|
||
for table_name, foreign_keys in self.all_foreign_keys.items():
|
||
try:
|
||
existing = self.schema.check_foreign_keys_exist(table_name)
|
||
existing_names = {f['name'] for f in existing}
|
||
|
||
to_create = [fk for fk in foreign_keys if fk['name'] not in existing_names]
|
||
|
||
if to_create:
|
||
self.writer.create_foreign_keys(table_name, to_create)
|
||
else:
|
||
migration_logger.info(f"Все внешние ключи для {table_name} уже существуют")
|
||
|
||
except Exception as e:
|
||
error_msg = f"Ошибка создания FK для {table_name}: {e}"
|
||
migration_logger.error(error_msg)
|
||
self.errors.append({
|
||
'table': table_name,
|
||
'error': error_msg,
|
||
'traceback': traceback.format_exc(),
|
||
'time': datetime.now()
|
||
})
|
||
|
||
def run_migration(
|
||
self, table_name: str, schedule_id: int, metadata_id: int,
|
||
life_table_name: Optional[str] = None, uses_life: bool = False,
|
||
full_reload: bool = False, send_email: bool = True
|
||
):
|
||
"""Запуск миграции таблицы"""
|
||
self.is_running = True
|
||
self.start_time = datetime.now()
|
||
self.all_foreign_keys = {}
|
||
self.errors = []
|
||
|
||
last_replication = self.state.get_last_replication_time()
|
||
|
||
migration_logger.info("="*70)
|
||
migration_logger.info("НАЧАЛО МИГРАЦИИ")
|
||
migration_logger.info(f"Время старта: {self.start_time}")
|
||
if last_replication:
|
||
migration_logger.info(f"Последняя миграция: {last_replication}")
|
||
migration_logger.info(f"Таблица для обработки: {table_name}")
|
||
migration_logger.info(f"Режим: {'ПОЛНАЯ' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'}")
|
||
migration_logger.info("="*70)
|
||
|
||
if not self.is_running:
|
||
migration_logger.warning("Миграция остановлена пользователем")
|
||
return
|
||
|
||
migration_logger.info(f"Обработка таблицы {table_name}")
|
||
results = self.migrate_table(table_name, schedule_id, metadata_id, life_table_name, uses_life, full_reload)
|
||
|
||
# Создаем внешние ключи после всех таблиц
|
||
self.create_all_foreign_keys()
|
||
|
||
total_time = (datetime.now() - self.start_time).total_seconds()
|
||
stats = self.state.get_all_stats()
|
||
|
||
self._log_final_stats(results, stats, total_time)
|
||
|
||
# Отправляем уведомление
|
||
if send_email:
|
||
self._send_notification(results, stats, total_time)
|
||
|
||
self.is_running = False
|
||
return results
|
||
|
||
def _log_final_stats(self, has_migrated: bool, stats: dict, total_time: float):
|
||
"""Логирует финальную статистику"""
|
||
migration_logger.info("="*70)
|
||
migration_logger.info("ИТОГОВАЯ СТАТИСТИКА")
|
||
migration_logger.info("="*70)
|
||
migration_logger.info(f"Ошибок: {len(self.errors)}")
|
||
migration_logger.info(f"Всего строк в БД: {stats.get('total_rows', 0)}")
|
||
migration_logger.info(f"Общее время: {total_time:.1f}с")
|
||
migration_logger.info("="*70)
|
||
|
||
def _send_notification(self, has_migrated: bool, stats: dict, total_time: float):
|
||
"""Отправляет уведомление о результате"""
|
||
if self.errors:
|
||
error_body = self._build_error_email_body(has_migrated, stats, total_time)
|
||
email_sender.send_email(
|
||
subject=f"МИГРАЦИЯ С ОШИБКАМИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
|
||
body=error_body
|
||
)
|
||
# else:
|
||
# email_sender.send_success_notification(stats, total_time)
|
||
|
||
def _build_error_email_body(self, has_migrated: bool, stats: dict, total_time: float) -> str:
|
||
"""Строит тело письма с ошибками"""
|
||
body = f"""
|
||
МИГРАЦИЯ ЗАВЕРШЕНА С ОШИБКАМИ
|
||
{'='*60}
|
||
|
||
Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||
Длительность: {total_time:.1f} сек
|
||
|
||
СТАТИСТИКА:
|
||
{'='*40}
|
||
Ошибок: {len(self.errors)}
|
||
Всего строк: {stats.get('total_rows', 0)}
|
||
|
||
СПИСОК ОШИБОК:
|
||
{'='*40}
|
||
"""
|
||
for i, err in enumerate(self.errors, 1):
|
||
body += f"\n{i}. Таблица: {err.get('table', 'N/A')}\n"
|
||
body += f" Ошибка: {err['error']}\n"
|
||
body += f" Время: {err['time'].strftime('%H:%M:%S') if 'time' in err else 'N/A'}\n"
|
||
|
||
return body
|
||
|
||
def stop_migration(self):
|
||
self.is_running = False
|
||
migration_logger.warning("Миграция остановлена")
|
||
email_sender.send_email(
|
||
subject=f"МИГРАЦИЯ ОСТАНОВЛЕНА - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
|
||
body=f"Миграция была остановлена пользователем в {datetime.now().strftime('%H:%M:%S')}"
|
||
)
|
||
|
||
def get_status(self) -> dict:
|
||
if not self.is_running:
|
||
return {
|
||
'is_running': False,
|
||
'last_errors': len(self.errors),
|
||
'last_replication': self.state.get_last_replication_info()
|
||
}
|
||
|
||
elapsed = (datetime.now() - self.start_time).total_seconds() if self.start_time else 0
|
||
|
||
return {
|
||
'is_running': True,
|
||
'current_table': self.current_table,
|
||
'elapsed_seconds': elapsed,
|
||
'errors_count': len(self.errors)
|
||
}
|
||
|
||
def _update_table_statistics(self, table_name: str, id_column: str):
|
||
"""
|
||
Обновляет статистику таблицы на основе реальных данных в PostgreSQL
|
||
Вызывается сразу после миграции таблицы
|
||
"""
|
||
try:
|
||
migration_logger.info(f"Обновление статистики для {table_name}...")
|
||
|
||
# Получаем реальную статистику из PostgreSQL
|
||
dst_stats = self.reader.get_table_stats(table_name, id_column)
|
||
|
||
# Обновляем в метаданных
|
||
self.state.update_last_id(table_name, dst_stats['max_id'])
|
||
|
||
# Для total_rows нужно установить точное значение, а не добавлять
|
||
# Поэтому используем отдельный метод для установки
|
||
replication_state._set_table_total_rows(table_name, dst_stats['total_rows'])
|
||
|
||
migration_logger.info(f"Статистика обновлена: {dst_stats['total_rows']} строк, max_id={dst_stats['max_id']}")
|
||
|
||
# Логируем операцию
|
||
self.state.log_operation(
|
||
table_name=table_name,
|
||
operation='STATS_UPDATE',
|
||
records_count=dst_stats['total_rows'],
|
||
status='SUCCESS'
|
||
)
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка обновления статистики для {table_name}: {e}")
|
||
self.state.log_operation(
|
||
table_name=table_name,
|
||
operation='STATS_UPDATE',
|
||
records_count=0,
|
||
status='ERROR',
|
||
error_message=str(e)[:500]
|
||
)
|
||
|
||
migrator = DatabaseMigrator() |