import pandas as pd from sqlalchemy import text, inspect from typing import List, Dict, Any, Optional from app.core.database import db_connector from app.core.logging import migration_logger class DataWriter: """Запись данных в PostgreSQL""" def table_exists(self, table_name: str) -> bool: """Проверка существования таблицы""" try: inspector = inspect(db_connector.dst_engine) return inspector.has_table(table_name) except Exception as e: migration_logger.error(f"Ошибка проверки таблицы {table_name}: {e}") return False def create_table(self, table_name: str, df: pd.DataFrame): """Создание таблицы из DataFrame""" try: if self.table_exists(table_name.lower()): migration_logger.info(f"Таблица {table_name} существует, удаляем...") with db_connector.dst_connection() as conn: conn.execute(text(f'DROP TABLE IF EXISTS "{table_name.lower()}" CASCADE')) conn.commit() df.to_sql( table_name.lower(), db_connector.dst_engine, if_exists='replace', index=False, chunksize=10000 ) migration_logger.info(f"Таблица {table_name} создана, {len(df)} строк") except Exception as e: migration_logger.error(f"Ошибка создания таблицы {table_name}: {e}") raise def append_data(self, table_name: str, df: pd.DataFrame): """Добавление данных с игнорированием дубликатов""" try: if df.empty: return # Отключаем проверку внешних ключей на время вставки with db_connector.dst_engine.connect() as conn: conn.execute(text("SET session_replication_role = 'replica';")) df.to_sql( table_name.lower(), conn, if_exists='append', index=False, chunksize=10000, method='multi' ) conn.execute(text("SET session_replication_role = 'origin';")) conn.commit() migration_logger.info(f"Добавлено {len(df)} строк в {table_name}") except Exception as e: migration_logger.error(f"Ошибка добавления данных в {table_name}: {e}") raise def create_indexes(self, table_name: str, indexes: List[Dict[str, Any]]): """Создание индексов""" if not indexes: return pg_table = table_name.lower() for index_info in indexes: try: index_name = f"idx_{pg_table}_{index_info['name'].lower()}" index_name = ''.join(c for c in index_name if c.isalnum() or c == '_') columns = ', '.join([f'"{col}"' for col in index_info['columns']]) unique_str = 'UNIQUE ' if index_info.get('unique') else '' sql = f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" ON "{pg_table}" ({columns})' with db_connector.dst_connection() as conn: conn.execute(text(sql)) conn.commit() migration_logger.info(f"Создан индекс: {index_name}") except Exception as e: migration_logger.error(f"Ошибка создания индекса {index_info.get('name')}", e) def create_foreign_keys(self, table_name: str, foreign_keys: List[Dict[str, str]]): """Создание внешних ключей""" if not foreign_keys: return pg_table = table_name.lower() for fk_info in foreign_keys: try: referenced_table = fk_info['referenced_table'].lower() # Проверяем существование таблицы with db_connector.dst_connection() as conn: check_table_sql = """ SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = :table_name ); """ result = conn.execute(text(check_table_sql), {"table_name": referenced_table}) table_exists = result.scalar() if not table_exists: migration_logger.error(f"Таблица '{referenced_table}' не существует. Пропускаем создание внешнего ключа") continue fk_name = f"fk_{pg_table}_{fk_info['parent_column'].lower()}" fk_name = ''.join(c for c in fk_name if c.isalnum() or c == '_') create_sql = f""" ALTER TABLE "{pg_table}" ADD CONSTRAINT "{fk_name}" FOREIGN KEY ("{fk_info['parent_column']}") REFERENCES "{referenced_table}" ("{fk_info['referenced_column']}") """ with db_connector.dst_connection() as conn: conn.execute(text(create_sql)) conn.commit() migration_logger.info(f"Создан внешний ключ: {fk_name}") except Exception as e: migration_logger.error(f"Ошибка создания внешнего ключа {fk_info.get('name', 'unknown')}", e) def analyze_table(self, table_name: str): """Выполнить ANALYZE""" try: with db_connector.dst_connection() as conn: conn.execute(text(f'ANALYZE "{table_name.lower()}"')) conn.commit() migration_logger.info(f"ANALYZE выполнен для {table_name}") except Exception as e: migration_logger.error(f"Ошибка ANALYZE для {table_name}: {e}") def upsert_data(self, table_name: str, df: pd.DataFrame, id_column: str): """ Вставляет или обновляет данные в PostgreSQL. Args: table_name: Имя таблицы df: DataFrame с данными id_column: Колонка с ID для определения конфликта """ if df.empty: return table_name = table_name.lower() # Создаем подключение with db_connector.dst_engine.begin() as conn: # Получаем существующие ID existing_ids_query = f"SELECT \"{id_column}\" FROM {table_name}" existing_ids = pd.read_sql_query(existing_ids_query, conn) existing_set = set(existing_ids[id_column].tolist()) # Разделяем на новые и существующие new_records = df[~df[id_column].isin(existing_set)] update_records = df[df[id_column].isin(existing_set)] print(f"new_records: {len(new_records)}, update_records: {len(update_records)}") # Вставляем новые if not new_records.empty: new_records.to_sql(table_name, conn, if_exists='append', index=False, method='multi') migration_logger.info(f" Вставлено {len(new_records)} записей") # Обновляем существующие if not update_records.empty: print(f"Обновляем {len(update_records)} записей") # Преобразуем DataFrame в список словарей для bulk update records = update_records.to_dict('records') # Обновляем каждую запись for record in records: id_value = record[id_column] update_dict = {k: v for k, v in record.items() if k != id_column and v is not None and not pd.isna(v)} if update_dict: set_clause = ', '.join([f'"{k}" = :{k}' for k in update_dict.keys()]) update_query = text(f'UPDATE {table_name} SET {set_clause} WHERE "{id_column}" = :id_value') params = {**update_dict, 'id_value': id_value} conn.execute(update_query, params) migration_logger.info(f" Обновлено {len(update_records)} записей") def delete_data(self, table_name: str, id_column: str, ids: List[Any]): """Удаляет данные из PostgreSQL по списку ID""" if not ids: return table_name = table_name.lower() total_deleted = 0 # Разбиваем на чанки по 1000 ID для избежания проблем с длиной запроса chunk_size = 1000 for i in range(0, len(ids), chunk_size): chunk_ids = ids[i:i+chunk_size] # Для одного ID if len(chunk_ids) == 1: delete_query = text(f'DELETE FROM {table_name} WHERE "{id_column}" = :id') params = {"id": chunk_ids[0]} print(f"query DELETE FROM {table_name} WHERE {id_column} = :id [{chunk_ids[0]}]") with db_connector.dst_engine.begin() as conn: result = conn.execute(delete_query, params) total_deleted += result.rowcount # Для нескольких ID else: # Используем ANY (более эффективно для PostgreSQL) delete_query = text(f'DELETE FROM {table_name} WHERE "{id_column}" = ANY(:ids)') params = {"ids": chunk_ids} with db_connector.dst_engine.begin() as conn: result = conn.execute(delete_query, params) total_deleted += result.rowcount migration_logger.info(f" Всего удалено: {total_deleted}") data_writer = DataWriter()