import dlt from typing import List, Dict, Any, Tuple, Optional from datetime import datetime from sqlalchemy import text import logging import json try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False from database import DatabaseManager, PostgresSessionLocal from models import ReplicationJob, ReplicationStatus, ChangeLog, DataSource logger = logging.getLogger(__name__) class ReplicationService: """Сервис репликации данных из различных источников в целевую БД с поддержкой трансформаций""" def __init__(self): self.session = PostgresSessionLocal() def create_replication_job(self, table_id: int, table_name: str) -> ReplicationJob: """Создать запись о задаче репликации""" job = ReplicationJob( table_id=table_id, table_name=table_name, status=ReplicationStatus.PENDING, started_at=datetime.utcnow() ) self.session.add(job) self.session.commit() return job def update_job_status(self, job_id: int, status: ReplicationStatus, rows_processed: int = 0, error_message: str = None): """Обновить статус задачи репликации""" try: job = self.session.query(ReplicationJob).filter(ReplicationJob.id == job_id).first() if job: job.status = status job.rows_processed = rows_processed job.completed_at = datetime.utcnow() if error_message: job.error_message = error_message self.session.commit() except Exception as e: logger.error(f"Error updating job status: {e}") self.session.rollback() def _apply_column_mapping(self, rows: List[Dict[str, Any]], column_mapping: Optional[Dict[str, str]]) -> List[Dict[str, Any]]: """Применить переименование столбцов""" if not column_mapping: return rows mapped_rows = [] for row in rows: new_row = {} for old_name, value in row.items(): new_name = column_mapping.get(old_name, old_name) new_row[new_name] = value mapped_rows.append(new_row) logger.debug(f"Applied column mapping: {column_mapping}") return mapped_rows def _filter_excluded_fields(self, rows: List[Dict[str, Any]], excluded_fields: Optional[List[str]]) -> List[Dict[str, Any]]: """Исключить сервисные поля""" if not excluded_fields: return rows filtered_rows = [] for row in rows: filtered_row = {k: v for k, v in row.items() if k not in excluded_fields} filtered_rows.append(filtered_row) logger.debug(f"Excluded fields: {excluded_fields}") return filtered_rows def replicate_table( self, table_name: str, source_id: int, source_schema: str = None, target_schema: str = "public", target_id: int = None, target_table_name: str = None, column_mapping: Optional[Dict[str, str]] = None, life_excluded_fields: Optional[List[str]] = None ) -> Tuple[bool, int]: """ Реплицировать таблицу из источника в целевую БД Args: table_name: Имя таблицы в источнике source_id: ID источника данных source_schema: Схема в источнике (если None, используется default_schema) target_schema: Схема в целевой БД target_id: ID целевой БД (если None, используется default) target_table_name: Переименование таблицы при копировании (если None, используется table_name) column_mapping: Переименование столбцов {"source_col": "target_col"} life_excluded_fields: Поля для исключения (для Life таблиц) Returns: Кортеж (успешность, количество строк) """ try: # Получить информацию об источнике data_source = DatabaseManager.get_data_source(source_id) if not data_source: raise ValueError(f"DataSource with ID {source_id} not found") # Использовать имя целевой таблицы или имя исходной таблицы final_table_name = target_table_name or table_name # Использовать default_schema если не указана if source_schema is None: source_schema = data_source.default_schema logger.info( f"Starting replication for table {table_name} -> {final_table_name} " f"from source {data_source.name} (ID: {source_id}) " f"to target DB (ID: {target_id or 'default'})" ) # Определить тип целевой БД для выбора стратегии репликации target_db = DatabaseManager.get_target_database(target_id) # Использовать DLT для PostgreSQL (включая default PostgreSQL) is_postgres_target = False if target_db: is_postgres_target = target_db.db_type.value == 'pgsql' else: is_postgres_target = True if is_postgres_target: # Использовать DLT sql_database источник для PostgreSQL from config import settings from dlt.destinations import postgres from dlt.sources.sql_database import sql_table # Создаем PostgreSQL destination с кредами из конфига postgres_destination = postgres( credentials={ "drivername": "postgresql", "username": settings.postgres_username, "password": settings.postgres_password, "host": settings.postgres_host, "port": settings.postgres_port, "database": settings.postgres_database } ) pipeline = dlt.pipeline( pipeline_name=f"replication_{data_source.source_type.value}_{table_name}", destination=postgres_destination, dataset_name=target_schema, full_refresh=False ) # Использовать sql_table источник для чтения данных из источника source_table = sql_table( credentials=DatabaseManager._build_connection_string(data_source), table=table_name, schema=source_schema, chunk_size=5000, detect_precision_hints=True, write_disposition="append" # Дополнять таблицу, а не заменять ) # Переименовать таблицу если нужно if final_table_name != table_name: source_table.apply_hints(table_name=final_table_name) load_info = pipeline.run(source_table) # Получить количество загруженных строк из load_info row_count = 0 if load_info and hasattr(load_info, 'packages'): for package in load_info.packages: if hasattr(package, 'jobs'): for job_name, jobs in package.jobs.items(): for job in jobs: if hasattr(job, 'rows') and job.rows: row_count += job.rows logger.info( f"Successfully replicated table {table_name} " f"(Source: {data_source.name}) to {final_table_name}. Rows: {row_count}" ) return True, row_count else: # Для других БД используем прямую вставку через SQLAlchemy source_engine = DatabaseManager.get_source_engine(source_id) target_engine = DatabaseManager.get_target_engine(target_id) # Читать данные из источника with source_engine.connect() as connection: query = f"SELECT * FROM {source_schema}.{table_name}" result = connection.execute(text(query)) columns = [desc[0] for desc in result.keys()] rows = [] row_count = 0 for row in result: row_dict = dict(zip(columns, row)) rows.append(row_dict) row_count += 1 if rows: # Применить фильтры и трансформации rows = self._filter_excluded_fields(rows, life_excluded_fields) rows = self._apply_column_mapping(rows, column_mapping) # Вставить строки через SQLAlchemy self._insert_rows(target_engine, rows, final_table_name, target_schema) logger.info( f"Successfully replicated {row_count} rows from {table_name} " f"(Source: {data_source.name}) to {final_table_name}" ) return True, row_count else: logger.warning(f"No data found in table {table_name} from {data_source.name}") return True, 0 except Exception as e: logger.error(f"Error replicating table {table_name} from source {source_id}: {e}", exc_info=True) return False, 0 def _insert_rows(self, engine, rows: List[Dict[str, Any]], table_name: str, schema: str): """Вставить строки в целевую БД через SQLAlchemy""" if not rows: return try: with engine.begin() as connection: # Получить имена столбцов из первой строки columns = list(rows[0].keys()) # Создать INSERT statement с именованными параметрами col_names = ", ".join(columns) placeholders = ", ".join([f":{col}" for col in columns]) query = f"INSERT INTO {schema}.{table_name} ({col_names}) VALUES ({placeholders})" # Вставить все строки, используя executemany с именованными параметрами # Фильтруем строки для согласованности столбцов filtered_rows = [] for row in rows: filtered_row = {col: row.get(col) for col in columns} filtered_rows.append(filtered_row) connection.execute(text(query), filtered_rows) logger.debug(f"Inserted {len(rows)} rows into {schema}.{table_name}") except Exception as e: logger.error(f"Error inserting rows into {schema}.{table_name}: {e}") raise def process_life_table_changes( self, table_name: str, source_id: int, use_life_table: bool = False, life_excluded_fields: Optional[List[str]] = None, target_id: int = None, target_table_name: str = None, column_mapping: Optional[Dict[str, str]] = None ) -> Dict[str, int]: """ Обработать изменения из Life таблицы Args: table_name: Имя оригинальной таблицы source_id: ID источника данных use_life_table: Обрабатывать ли Life таблицу (если False, вернуть пусто) life_excluded_fields: Поля для исключения при обработке target_id: ID целевой БД target_table_name: Переименование таблицы column_mapping: Маппинг столбцов Returns: Словарь с количеством обработанных операций """ operations = {"INSERT": 0, "UPDATE": 0, "DELETE": 0} # Если Life таблица отключена, вернуть пусто if not use_life_table: logger.info(f"Life table processing disabled for {table_name}") return operations life_table_name = DatabaseManager.get_life_table_name(table_name) try: # Получить информацию об источнике data_source = DatabaseManager.get_data_source(source_id) if not data_source: raise ValueError(f"DataSource with ID {source_id} not found") # Проверить существует ли Life таблица if not DatabaseManager.check_life_table_exists(table_name, source_id): logger.warning(f"Life table {life_table_name} does not exist for {table_name}") return operations # Использовать целевое имя таблицы или оригинальное final_table_name = target_table_name or table_name # Получить engine для источника и целевой БД source_engine = DatabaseManager.get_source_engine(source_id) target_engine = DatabaseManager.get_target_engine(target_id) with source_engine.connect() as connection: # Получить необработанные изменения query = f""" SELECT * FROM {life_table_name} WHERE NOT ISNULL(IsProcessed, 0) ORDER BY ChangeTime ASC """ result = connection.execute(text(query)) columns = [desc[0] for desc in result.keys()] for row in result: row_dict = dict(zip(columns, row)) operation = row_dict.get('Operation', 'INSERT') try: # Применить фильтры к данным filtered_row = self._filter_excluded_fields([row_dict], life_excluded_fields)[0] mapped_row = self._apply_column_mapping([filtered_row], column_mapping)[0] self._apply_change( table_name=final_table_name, operation=operation, row_data=mapped_row, source_id=source_id, target_engine=target_engine ) operations[operation] += 1 except Exception as e: logger.error(f"Error applying change: {e}") logger.info(f"Life table processing completed: {operations} for {table_name}") return operations except Exception as e: logger.error(f"Error processing Life table changes for {table_name}: {e}") return operations def _apply_change( self, table_name: str, operation: str, row_data: dict, source_id: int, target_engine=None ): """Применить изменение к целевой таблице""" try: # Логировать изменение change_log = ChangeLog( table_id=source_id, table_name=table_name, operation=operation, change_data=json.dumps(row_data), change_timestamp=datetime.utcnow() ) self.session.add(change_log) self.session.commit() logger.debug(f"Applied {operation} change to {table_name}") except Exception as e: logger.error(f"Error logging change: {e}") self.session.rollback() def close(self): """Закрыть сессию""" if self.session: self.session.close()