Files
2026-03-29 23:24:15 +09:00

382 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dlt
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime
from sqlalchemy import text
import logging
import json
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
from database import DatabaseManager, PostgresSessionLocal
from models import ReplicationJob, ReplicationStatus, ChangeLog, DataSource
logger = logging.getLogger(__name__)
class ReplicationService:
"""Сервис репликации данных из различных источников в целевую БД с поддержкой трансформаций"""
def __init__(self):
self.session = PostgresSessionLocal()
def create_replication_job(self, table_id: int, table_name: str) -> ReplicationJob:
"""Создать запись о задаче репликации"""
job = ReplicationJob(
table_id=table_id,
table_name=table_name,
status=ReplicationStatus.PENDING,
started_at=datetime.utcnow()
)
self.session.add(job)
self.session.commit()
return job
def update_job_status(self, job_id: int, status: ReplicationStatus,
rows_processed: int = 0, error_message: str = None):
"""Обновить статус задачи репликации"""
try:
job = self.session.query(ReplicationJob).filter(ReplicationJob.id == job_id).first()
if job:
job.status = status
job.rows_processed = rows_processed
job.completed_at = datetime.utcnow()
if error_message:
job.error_message = error_message
self.session.commit()
except Exception as e:
logger.error(f"Error updating job status: {e}")
self.session.rollback()
def _apply_column_mapping(self, rows: List[Dict[str, Any]], column_mapping: Optional[Dict[str, str]]) -> List[Dict[str, Any]]:
"""Применить переименование столбцов"""
if not column_mapping:
return rows
mapped_rows = []
for row in rows:
new_row = {}
for old_name, value in row.items():
new_name = column_mapping.get(old_name, old_name)
new_row[new_name] = value
mapped_rows.append(new_row)
logger.debug(f"Applied column mapping: {column_mapping}")
return mapped_rows
def _filter_excluded_fields(self, rows: List[Dict[str, Any]], excluded_fields: Optional[List[str]]) -> List[Dict[str, Any]]:
"""Исключить сервисные поля"""
if not excluded_fields:
return rows
filtered_rows = []
for row in rows:
filtered_row = {k: v for k, v in row.items() if k not in excluded_fields}
filtered_rows.append(filtered_row)
logger.debug(f"Excluded fields: {excluded_fields}")
return filtered_rows
def replicate_table(
self,
table_name: str,
source_id: int,
source_schema: str = None,
target_schema: str = "public",
target_id: int = None,
target_table_name: str = None,
column_mapping: Optional[Dict[str, str]] = None,
life_excluded_fields: Optional[List[str]] = None
) -> Tuple[bool, int]:
"""
Реплицировать таблицу из источника в целевую БД
Args:
table_name: Имя таблицы в источнике
source_id: ID источника данных
source_schema: Схема в источнике (если None, используется default_schema)
target_schema: Схема в целевой БД
target_id: ID целевой БД (если None, используется default)
target_table_name: Переименование таблицы при копировании (если None, используется table_name)
column_mapping: Переименование столбцов {"source_col": "target_col"}
life_excluded_fields: Поля для исключения (для Life таблиц)
Returns:
Кортеж (успешность, количество строк)
"""
try:
# Получить информацию об источнике
data_source = DatabaseManager.get_data_source(source_id)
if not data_source:
raise ValueError(f"DataSource with ID {source_id} not found")
# Использовать имя целевой таблицы или имя исходной таблицы
final_table_name = target_table_name or table_name
# Использовать default_schema если не указана
if source_schema is None:
source_schema = data_source.default_schema
logger.info(
f"Starting replication for table {table_name} -> {final_table_name} "
f"from source {data_source.name} (ID: {source_id}) "
f"to target DB (ID: {target_id or 'default'})"
)
# Определить тип целевой БД для выбора стратегии репликации
target_db = DatabaseManager.get_target_database(target_id)
# Использовать DLT для PostgreSQL (включая default PostgreSQL)
is_postgres_target = False
if target_db:
is_postgres_target = target_db.db_type.value == 'pgsql'
else:
is_postgres_target = True
if is_postgres_target:
# Использовать DLT sql_database источник для PostgreSQL
from config import settings
from dlt.destinations import postgres
from dlt.sources.sql_database import sql_table
# Создаем PostgreSQL destination с кредами из конфига
postgres_destination = postgres(
credentials={
"drivername": "postgresql",
"username": settings.postgres_username,
"password": settings.postgres_password,
"host": settings.postgres_host,
"port": settings.postgres_port,
"database": settings.postgres_database
}
)
pipeline = dlt.pipeline(
pipeline_name=f"replication_{data_source.source_type.value}_{table_name}",
destination=postgres_destination,
dataset_name=target_schema,
full_refresh=False
)
# Использовать sql_table источник для чтения данных из источника
source_table = sql_table(
credentials=DatabaseManager._build_connection_string(data_source),
table=table_name,
schema=source_schema,
chunk_size=5000,
detect_precision_hints=True,
write_disposition="append" # Дополнять таблицу, а не заменять
)
# Переименовать таблицу если нужно
if final_table_name != table_name:
source_table.apply_hints(table_name=final_table_name)
load_info = pipeline.run(source_table)
# Получить количество загруженных строк из load_info
row_count = 0
if load_info and hasattr(load_info, 'packages'):
for package in load_info.packages:
if hasattr(package, 'jobs'):
for job_name, jobs in package.jobs.items():
for job in jobs:
if hasattr(job, 'rows') and job.rows:
row_count += job.rows
logger.info(
f"Successfully replicated table {table_name} "
f"(Source: {data_source.name}) to {final_table_name}. Rows: {row_count}"
)
return True, row_count
else:
# Для других БД используем прямую вставку через SQLAlchemy
source_engine = DatabaseManager.get_source_engine(source_id)
target_engine = DatabaseManager.get_target_engine(target_id)
# Читать данные из источника
with source_engine.connect() as connection:
query = f"SELECT * FROM {source_schema}.{table_name}"
result = connection.execute(text(query))
columns = [desc[0] for desc in result.keys()]
rows = []
row_count = 0
for row in result:
row_dict = dict(zip(columns, row))
rows.append(row_dict)
row_count += 1
if rows:
# Применить фильтры и трансформации
rows = self._filter_excluded_fields(rows, life_excluded_fields)
rows = self._apply_column_mapping(rows, column_mapping)
# Вставить строки через SQLAlchemy
self._insert_rows(target_engine, rows, final_table_name, target_schema)
logger.info(
f"Successfully replicated {row_count} rows from {table_name} "
f"(Source: {data_source.name}) to {final_table_name}"
)
return True, row_count
else:
logger.warning(f"No data found in table {table_name} from {data_source.name}")
return True, 0
except Exception as e:
logger.error(f"Error replicating table {table_name} from source {source_id}: {e}", exc_info=True)
return False, 0
def _insert_rows(self, engine, rows: List[Dict[str, Any]], table_name: str, schema: str):
"""Вставить строки в целевую БД через SQLAlchemy"""
if not rows:
return
try:
with engine.begin() as connection:
# Получить имена столбцов из первой строки
columns = list(rows[0].keys())
# Создать INSERT statement с именованными параметрами
col_names = ", ".join(columns)
placeholders = ", ".join([f":{col}" for col in columns])
query = f"INSERT INTO {schema}.{table_name} ({col_names}) VALUES ({placeholders})"
# Вставить все строки, используя executemany с именованными параметрами
# Фильтруем строки для согласованности столбцов
filtered_rows = []
for row in rows:
filtered_row = {col: row.get(col) for col in columns}
filtered_rows.append(filtered_row)
connection.execute(text(query), filtered_rows)
logger.debug(f"Inserted {len(rows)} rows into {schema}.{table_name}")
except Exception as e:
logger.error(f"Error inserting rows into {schema}.{table_name}: {e}")
raise
def process_life_table_changes(
self,
table_name: str,
source_id: int,
use_life_table: bool = False,
life_excluded_fields: Optional[List[str]] = None,
target_id: int = None,
target_table_name: str = None,
column_mapping: Optional[Dict[str, str]] = None
) -> Dict[str, int]:
"""
Обработать изменения из Life таблицы
Args:
table_name: Имя оригинальной таблицы
source_id: ID источника данных
use_life_table: Обрабатывать ли Life таблицу (если False, вернуть пусто)
life_excluded_fields: Поля для исключения при обработке
target_id: ID целевой БД
target_table_name: Переименование таблицы
column_mapping: Маппинг столбцов
Returns:
Словарь с количеством обработанных операций
"""
operations = {"INSERT": 0, "UPDATE": 0, "DELETE": 0}
# Если Life таблица отключена, вернуть пусто
if not use_life_table:
logger.info(f"Life table processing disabled for {table_name}")
return operations
life_table_name = DatabaseManager.get_life_table_name(table_name)
try:
# Получить информацию об источнике
data_source = DatabaseManager.get_data_source(source_id)
if not data_source:
raise ValueError(f"DataSource with ID {source_id} not found")
# Проверить существует ли Life таблица
if not DatabaseManager.check_life_table_exists(table_name, source_id):
logger.warning(f"Life table {life_table_name} does not exist for {table_name}")
return operations
# Использовать целевое имя таблицы или оригинальное
final_table_name = target_table_name or table_name
# Получить engine для источника и целевой БД
source_engine = DatabaseManager.get_source_engine(source_id)
target_engine = DatabaseManager.get_target_engine(target_id)
with source_engine.connect() as connection:
# Получить необработанные изменения
query = f"""
SELECT * FROM {life_table_name}
WHERE NOT ISNULL(IsProcessed, 0)
ORDER BY ChangeTime ASC
"""
result = connection.execute(text(query))
columns = [desc[0] for desc in result.keys()]
for row in result:
row_dict = dict(zip(columns, row))
operation = row_dict.get('Operation', 'INSERT')
try:
# Применить фильтры к данным
filtered_row = self._filter_excluded_fields([row_dict], life_excluded_fields)[0]
mapped_row = self._apply_column_mapping([filtered_row], column_mapping)[0]
self._apply_change(
table_name=final_table_name,
operation=operation,
row_data=mapped_row,
source_id=source_id,
target_engine=target_engine
)
operations[operation] += 1
except Exception as e:
logger.error(f"Error applying change: {e}")
logger.info(f"Life table processing completed: {operations} for {table_name}")
return operations
except Exception as e:
logger.error(f"Error processing Life table changes for {table_name}: {e}")
return operations
def _apply_change(
self,
table_name: str,
operation: str,
row_data: dict,
source_id: int,
target_engine=None
):
"""Применить изменение к целевой таблице"""
try:
# Логировать изменение
change_log = ChangeLog(
table_id=source_id,
table_name=table_name,
operation=operation,
change_data=json.dumps(row_data),
change_timestamp=datetime.utcnow()
)
self.session.add(change_log)
self.session.commit()
logger.debug(f"Applied {operation} change to {table_name}")
except Exception as e:
logger.error(f"Error logging change: {e}")
self.session.rollback()
def close(self):
"""Закрыть сессию"""
if self.session:
self.session.close()