import json from dataclasses import asdict from typing import List, Optional from .config import Config, TableMigrationConfig class TableConfigRepository: """Хранилище конфигурации таблиц репликатора в PostgreSQL.""" def __init__(self, config: Config, engine): self.config = config self.engine = engine def _text(self, sql: str): from sqlalchemy import text return text(sql) def _quote_identifier(self, identifier: str) -> str: return '"' + identifier.replace('"', '""') + '"' def _qualified_table(self) -> str: return ( f'{self._quote_identifier(self.config.REPLICATOR_SCHEMA)}.' f'{self._quote_identifier(self.config.TABLE_CONFIGS_TABLE)}' ) def ensure_table(self): """Создание таблицы конфигурации в служебной схеме.""" with self.engine.connect() as conn: conn.execute(self._text( f'CREATE SCHEMA IF NOT EXISTS {self._quote_identifier(self.config.REPLICATOR_SCHEMA)}' )) conn.execute(self._text(f""" CREATE TABLE IF NOT EXISTS {self._qualified_table()} ( source_table text PRIMARY KEY, target_table text NULL, mode text NOT NULL, initial_load_mode text NOT NULL, life_table text NULL, datetime_column text NOT NULL, sequence_column text NULL, order_columns_json jsonb NOT NULL DEFAULT '[]'::jsonb, operation_column text NULL, delete_operations_json jsonb NOT NULL DEFAULT '["d"]'::jsonb, upsert_operations_json jsonb NOT NULL DEFAULT '["i","u"]'::jsonb, primary_key_json jsonb NOT NULL DEFAULT '[]'::jsonb, exclude_columns_json jsonb NOT NULL DEFAULT '[]'::jsonb, timescale boolean NOT NULL DEFAULT FALSE, timescale_time_column text NULL, enabled boolean NOT NULL DEFAULT TRUE, created_at timestamp NOT NULL DEFAULT now(), updated_at timestamp NOT NULL DEFAULT now() ) """)) conn.execute(self._text(f""" CREATE INDEX IF NOT EXISTS idx_{self.config.TABLE_CONFIGS_TABLE}_enabled ON {self._qualified_table()} (enabled, source_table) """)) conn.commit() def seed_defaults_if_empty(self): """Первичная загрузка конфигурации из bootstrap defaults.""" self.ensure_table() with self.engine.connect() as conn: count = conn.execute(self._text( f'SELECT COUNT(*) FROM {self._qualified_table()}' )).scalar() if count and int(count) > 0: return for table_config in self.config.DEFAULT_TABLE_MIGRATIONS: payload = asdict(table_config) conn.execute(self._text(f""" INSERT INTO {self._qualified_table()} ( source_table, target_table, mode, initial_load_mode, life_table, datetime_column, sequence_column, order_columns_json, operation_column, delete_operations_json, upsert_operations_json, primary_key_json, exclude_columns_json, timescale, timescale_time_column, enabled ) VALUES ( :source_table, :target_table, :mode, :initial_load_mode, :life_table, :datetime_column, :sequence_column, CAST(:order_columns_json AS jsonb), :operation_column, CAST(:delete_operations_json AS jsonb), CAST(:upsert_operations_json AS jsonb), CAST(:primary_key_json AS jsonb), CAST(:exclude_columns_json AS jsonb), :timescale, :timescale_time_column, :enabled ) """), { 'source_table': payload['source_table'], 'target_table': payload['target_table'], 'mode': payload['mode'], 'initial_load_mode': payload['initial_load_mode'], 'life_table': payload['life_table'], 'datetime_column': payload['datetime_column'], 'sequence_column': payload['sequence_column'], 'order_columns_json': json.dumps(payload['order_columns']), 'operation_column': payload['operation_column'], 'delete_operations_json': json.dumps(payload['delete_operations']), 'upsert_operations_json': json.dumps(payload['upsert_operations']), 'primary_key_json': json.dumps(payload['primary_key']), 'exclude_columns_json': json.dumps(payload['exclude_columns']), 'timescale': payload['timescale'], 'timescale_time_column': payload['timescale_time_column'], 'enabled': payload['enabled'], }) conn.commit() def load_configs(self, enabled_only: bool = True, seed_defaults: bool = True) -> List[TableMigrationConfig]: """Чтение конфигурации таблиц из БД.""" self.ensure_table() if seed_defaults: self.seed_defaults_if_empty() where_clause = 'WHERE enabled = TRUE' if enabled_only else '' sql = self._text(f""" SELECT * FROM {self._qualified_table()} {where_clause} ORDER BY source_table """) with self.engine.connect() as conn: rows = conn.execute(sql).mappings().all() return [self._row_to_config(row) for row in rows] def get_config(self, table_name: str) -> Optional[TableMigrationConfig]: """Чтение одной конфигурации по source_table или target_table.""" self.ensure_table() sql = self._text(f""" SELECT * FROM {self._qualified_table()} WHERE enabled = TRUE AND ( lower(source_table) = :table_name OR lower(COALESCE(target_table, source_table)) = :table_name ) LIMIT 1 """) with self.engine.connect() as conn: row = conn.execute(sql, {'table_name': table_name.lower()}).mappings().first() return self._row_to_config(row) if row else None def _row_to_config(self, row) -> TableMigrationConfig: return TableMigrationConfig( source_table=row['source_table'], target_table=row['target_table'], mode=row['mode'], initial_load_mode=row['initial_load_mode'], life_table=row['life_table'], datetime_column=row['datetime_column'], sequence_column=row['sequence_column'], order_columns=self._json_field(row['order_columns_json']), operation_column=row['operation_column'], delete_operations=self._json_field(row['delete_operations_json']), upsert_operations=self._json_field(row['upsert_operations_json']), primary_key=self._json_field(row['primary_key_json']), exclude_columns=self._json_field(row['exclude_columns_json']), timescale=row['timescale'], timescale_time_column=row['timescale_time_column'], enabled=row['enabled'], ) def _json_field(self, value) -> List[str]: if value is None: return [] if isinstance(value, str): return json.loads(value) return list(value)