382 lines
17 KiB
Python
382 lines
17 KiB
Python
import dlt
|
||
from typing import List, Dict, Any, Tuple, Optional
|
||
from datetime import datetime
|
||
from sqlalchemy import text
|
||
import logging
|
||
import json
|
||
|
||
try:
|
||
import pandas as pd
|
||
PANDAS_AVAILABLE = True
|
||
except ImportError:
|
||
PANDAS_AVAILABLE = False
|
||
|
||
from database import DatabaseManager, PostgresSessionLocal
|
||
from models import ReplicationJob, ReplicationStatus, ChangeLog, DataSource
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ReplicationService:
|
||
"""Сервис репликации данных из различных источников в целевую БД с поддержкой трансформаций"""
|
||
|
||
def __init__(self):
|
||
self.session = PostgresSessionLocal()
|
||
|
||
def create_replication_job(self, table_id: int, table_name: str) -> ReplicationJob:
|
||
"""Создать запись о задаче репликации"""
|
||
job = ReplicationJob(
|
||
table_id=table_id,
|
||
table_name=table_name,
|
||
status=ReplicationStatus.PENDING,
|
||
started_at=datetime.utcnow()
|
||
)
|
||
self.session.add(job)
|
||
self.session.commit()
|
||
return job
|
||
|
||
def update_job_status(self, job_id: int, status: ReplicationStatus,
|
||
rows_processed: int = 0, error_message: str = None):
|
||
"""Обновить статус задачи репликации"""
|
||
try:
|
||
job = self.session.query(ReplicationJob).filter(ReplicationJob.id == job_id).first()
|
||
if job:
|
||
job.status = status
|
||
job.rows_processed = rows_processed
|
||
job.completed_at = datetime.utcnow()
|
||
if error_message:
|
||
job.error_message = error_message
|
||
self.session.commit()
|
||
except Exception as e:
|
||
logger.error(f"Error updating job status: {e}")
|
||
self.session.rollback()
|
||
|
||
def _apply_column_mapping(self, rows: List[Dict[str, Any]], column_mapping: Optional[Dict[str, str]]) -> List[Dict[str, Any]]:
|
||
"""Применить переименование столбцов"""
|
||
if not column_mapping:
|
||
return rows
|
||
|
||
mapped_rows = []
|
||
for row in rows:
|
||
new_row = {}
|
||
for old_name, value in row.items():
|
||
new_name = column_mapping.get(old_name, old_name)
|
||
new_row[new_name] = value
|
||
mapped_rows.append(new_row)
|
||
|
||
logger.debug(f"Applied column mapping: {column_mapping}")
|
||
return mapped_rows
|
||
|
||
def _filter_excluded_fields(self, rows: List[Dict[str, Any]], excluded_fields: Optional[List[str]]) -> List[Dict[str, Any]]:
|
||
"""Исключить сервисные поля"""
|
||
if not excluded_fields:
|
||
return rows
|
||
|
||
filtered_rows = []
|
||
for row in rows:
|
||
filtered_row = {k: v for k, v in row.items() if k not in excluded_fields}
|
||
filtered_rows.append(filtered_row)
|
||
|
||
logger.debug(f"Excluded fields: {excluded_fields}")
|
||
return filtered_rows
|
||
|
||
def replicate_table(
|
||
self,
|
||
table_name: str,
|
||
source_id: int,
|
||
source_schema: str = None,
|
||
target_schema: str = "public",
|
||
target_id: int = None,
|
||
target_table_name: str = None,
|
||
column_mapping: Optional[Dict[str, str]] = None,
|
||
life_excluded_fields: Optional[List[str]] = None
|
||
) -> Tuple[bool, int]:
|
||
"""
|
||
Реплицировать таблицу из источника в целевую БД
|
||
|
||
Args:
|
||
table_name: Имя таблицы в источнике
|
||
source_id: ID источника данных
|
||
source_schema: Схема в источнике (если None, используется default_schema)
|
||
target_schema: Схема в целевой БД
|
||
target_id: ID целевой БД (если None, используется default)
|
||
target_table_name: Переименование таблицы при копировании (если None, используется table_name)
|
||
column_mapping: Переименование столбцов {"source_col": "target_col"}
|
||
life_excluded_fields: Поля для исключения (для Life таблиц)
|
||
|
||
Returns:
|
||
Кортеж (успешность, количество строк)
|
||
"""
|
||
try:
|
||
# Получить информацию об источнике
|
||
data_source = DatabaseManager.get_data_source(source_id)
|
||
if not data_source:
|
||
raise ValueError(f"DataSource with ID {source_id} not found")
|
||
|
||
# Использовать имя целевой таблицы или имя исходной таблицы
|
||
final_table_name = target_table_name or table_name
|
||
|
||
# Использовать default_schema если не указана
|
||
if source_schema is None:
|
||
source_schema = data_source.default_schema
|
||
|
||
logger.info(
|
||
f"Starting replication for table {table_name} -> {final_table_name} "
|
||
f"from source {data_source.name} (ID: {source_id}) "
|
||
f"to target DB (ID: {target_id or 'default'})"
|
||
)
|
||
|
||
# Определить тип целевой БД для выбора стратегии репликации
|
||
target_db = DatabaseManager.get_target_database(target_id)
|
||
|
||
# Использовать DLT для PostgreSQL (включая default PostgreSQL)
|
||
is_postgres_target = False
|
||
if target_db:
|
||
is_postgres_target = target_db.db_type.value == 'pgsql'
|
||
else:
|
||
is_postgres_target = True
|
||
|
||
if is_postgres_target:
|
||
# Использовать DLT sql_database источник для PostgreSQL
|
||
from config import settings
|
||
from dlt.destinations import postgres
|
||
from dlt.sources.sql_database import sql_table
|
||
|
||
# Создаем PostgreSQL destination с кредами из конфига
|
||
postgres_destination = postgres(
|
||
credentials={
|
||
"drivername": "postgresql",
|
||
"username": settings.postgres_username,
|
||
"password": settings.postgres_password,
|
||
"host": settings.postgres_host,
|
||
"port": settings.postgres_port,
|
||
"database": settings.postgres_database
|
||
}
|
||
)
|
||
|
||
pipeline = dlt.pipeline(
|
||
pipeline_name=f"replication_{data_source.source_type.value}_{table_name}",
|
||
destination=postgres_destination,
|
||
dataset_name=target_schema,
|
||
full_refresh=False
|
||
)
|
||
|
||
# Использовать sql_table источник для чтения данных из источника
|
||
source_table = sql_table(
|
||
credentials=DatabaseManager._build_connection_string(data_source),
|
||
table=table_name,
|
||
schema=source_schema,
|
||
chunk_size=5000,
|
||
detect_precision_hints=True,
|
||
write_disposition="append" # Дополнять таблицу, а не заменять
|
||
)
|
||
|
||
# Переименовать таблицу если нужно
|
||
if final_table_name != table_name:
|
||
source_table.apply_hints(table_name=final_table_name)
|
||
|
||
load_info = pipeline.run(source_table)
|
||
|
||
# Получить количество загруженных строк из load_info
|
||
row_count = 0
|
||
if load_info and hasattr(load_info, 'packages'):
|
||
for package in load_info.packages:
|
||
if hasattr(package, 'jobs'):
|
||
for job_name, jobs in package.jobs.items():
|
||
for job in jobs:
|
||
if hasattr(job, 'rows') and job.rows:
|
||
row_count += job.rows
|
||
|
||
logger.info(
|
||
f"Successfully replicated table {table_name} "
|
||
f"(Source: {data_source.name}) to {final_table_name}. Rows: {row_count}"
|
||
)
|
||
return True, row_count
|
||
else:
|
||
# Для других БД используем прямую вставку через SQLAlchemy
|
||
source_engine = DatabaseManager.get_source_engine(source_id)
|
||
target_engine = DatabaseManager.get_target_engine(target_id)
|
||
|
||
# Читать данные из источника
|
||
with source_engine.connect() as connection:
|
||
query = f"SELECT * FROM {source_schema}.{table_name}"
|
||
result = connection.execute(text(query))
|
||
columns = [desc[0] for desc in result.keys()]
|
||
|
||
rows = []
|
||
row_count = 0
|
||
for row in result:
|
||
row_dict = dict(zip(columns, row))
|
||
rows.append(row_dict)
|
||
row_count += 1
|
||
|
||
if rows:
|
||
# Применить фильтры и трансформации
|
||
rows = self._filter_excluded_fields(rows, life_excluded_fields)
|
||
rows = self._apply_column_mapping(rows, column_mapping)
|
||
|
||
# Вставить строки через SQLAlchemy
|
||
self._insert_rows(target_engine, rows, final_table_name, target_schema)
|
||
|
||
logger.info(
|
||
f"Successfully replicated {row_count} rows from {table_name} "
|
||
f"(Source: {data_source.name}) to {final_table_name}"
|
||
)
|
||
return True, row_count
|
||
else:
|
||
logger.warning(f"No data found in table {table_name} from {data_source.name}")
|
||
return True, 0
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error replicating table {table_name} from source {source_id}: {e}", exc_info=True)
|
||
return False, 0
|
||
|
||
def _insert_rows(self, engine, rows: List[Dict[str, Any]], table_name: str, schema: str):
|
||
"""Вставить строки в целевую БД через SQLAlchemy"""
|
||
if not rows:
|
||
return
|
||
|
||
try:
|
||
with engine.begin() as connection:
|
||
# Получить имена столбцов из первой строки
|
||
columns = list(rows[0].keys())
|
||
|
||
# Создать INSERT statement с именованными параметрами
|
||
col_names = ", ".join(columns)
|
||
placeholders = ", ".join([f":{col}" for col in columns])
|
||
query = f"INSERT INTO {schema}.{table_name} ({col_names}) VALUES ({placeholders})"
|
||
|
||
# Вставить все строки, используя executemany с именованными параметрами
|
||
# Фильтруем строки для согласованности столбцов
|
||
filtered_rows = []
|
||
for row in rows:
|
||
filtered_row = {col: row.get(col) for col in columns}
|
||
filtered_rows.append(filtered_row)
|
||
|
||
connection.execute(text(query), filtered_rows)
|
||
|
||
logger.debug(f"Inserted {len(rows)} rows into {schema}.{table_name}")
|
||
except Exception as e:
|
||
logger.error(f"Error inserting rows into {schema}.{table_name}: {e}")
|
||
raise
|
||
|
||
def process_life_table_changes(
|
||
self,
|
||
table_name: str,
|
||
source_id: int,
|
||
use_life_table: bool = False,
|
||
life_excluded_fields: Optional[List[str]] = None,
|
||
target_id: int = None,
|
||
target_table_name: str = None,
|
||
column_mapping: Optional[Dict[str, str]] = None
|
||
) -> Dict[str, int]:
|
||
"""
|
||
Обработать изменения из Life таблицы
|
||
|
||
Args:
|
||
table_name: Имя оригинальной таблицы
|
||
source_id: ID источника данных
|
||
use_life_table: Обрабатывать ли Life таблицу (если False, вернуть пусто)
|
||
life_excluded_fields: Поля для исключения при обработке
|
||
target_id: ID целевой БД
|
||
target_table_name: Переименование таблицы
|
||
column_mapping: Маппинг столбцов
|
||
|
||
Returns:
|
||
Словарь с количеством обработанных операций
|
||
"""
|
||
operations = {"INSERT": 0, "UPDATE": 0, "DELETE": 0}
|
||
|
||
# Если Life таблица отключена, вернуть пусто
|
||
if not use_life_table:
|
||
logger.info(f"Life table processing disabled for {table_name}")
|
||
return operations
|
||
|
||
life_table_name = DatabaseManager.get_life_table_name(table_name)
|
||
|
||
try:
|
||
# Получить информацию об источнике
|
||
data_source = DatabaseManager.get_data_source(source_id)
|
||
if not data_source:
|
||
raise ValueError(f"DataSource with ID {source_id} not found")
|
||
|
||
# Проверить существует ли Life таблица
|
||
if not DatabaseManager.check_life_table_exists(table_name, source_id):
|
||
logger.warning(f"Life table {life_table_name} does not exist for {table_name}")
|
||
return operations
|
||
|
||
# Использовать целевое имя таблицы или оригинальное
|
||
final_table_name = target_table_name or table_name
|
||
|
||
# Получить engine для источника и целевой БД
|
||
source_engine = DatabaseManager.get_source_engine(source_id)
|
||
target_engine = DatabaseManager.get_target_engine(target_id)
|
||
|
||
with source_engine.connect() as connection:
|
||
# Получить необработанные изменения
|
||
query = f"""
|
||
SELECT * FROM {life_table_name}
|
||
WHERE NOT ISNULL(IsProcessed, 0)
|
||
ORDER BY ChangeTime ASC
|
||
"""
|
||
|
||
result = connection.execute(text(query))
|
||
columns = [desc[0] for desc in result.keys()]
|
||
|
||
for row in result:
|
||
row_dict = dict(zip(columns, row))
|
||
operation = row_dict.get('Operation', 'INSERT')
|
||
|
||
try:
|
||
# Применить фильтры к данным
|
||
filtered_row = self._filter_excluded_fields([row_dict], life_excluded_fields)[0]
|
||
mapped_row = self._apply_column_mapping([filtered_row], column_mapping)[0]
|
||
|
||
self._apply_change(
|
||
table_name=final_table_name,
|
||
operation=operation,
|
||
row_data=mapped_row,
|
||
source_id=source_id,
|
||
target_engine=target_engine
|
||
)
|
||
operations[operation] += 1
|
||
except Exception as e:
|
||
logger.error(f"Error applying change: {e}")
|
||
|
||
logger.info(f"Life table processing completed: {operations} for {table_name}")
|
||
return operations
|
||
except Exception as e:
|
||
logger.error(f"Error processing Life table changes for {table_name}: {e}")
|
||
return operations
|
||
|
||
def _apply_change(
|
||
self,
|
||
table_name: str,
|
||
operation: str,
|
||
row_data: dict,
|
||
source_id: int,
|
||
target_engine=None
|
||
):
|
||
"""Применить изменение к целевой таблице"""
|
||
try:
|
||
# Логировать изменение
|
||
change_log = ChangeLog(
|
||
table_id=source_id,
|
||
table_name=table_name,
|
||
operation=operation,
|
||
change_data=json.dumps(row_data),
|
||
change_timestamp=datetime.utcnow()
|
||
)
|
||
self.session.add(change_log)
|
||
self.session.commit()
|
||
|
||
logger.debug(f"Applied {operation} change to {table_name}")
|
||
except Exception as e:
|
||
logger.error(f"Error logging change: {e}")
|
||
self.session.rollback()
|
||
|
||
def close(self):
|
||
"""Закрыть сессию"""
|
||
if self.session:
|
||
self.session.close()
|