239 lines
11 KiB
Python
239 lines
11 KiB
Python
import pandas as pd
|
||
from sqlalchemy import text, inspect
|
||
from typing import List, Dict, Any, Optional
|
||
from app.core.database import db_connector
|
||
from app.core.logging import migration_logger
|
||
|
||
|
||
class DataWriter:
|
||
"""Запись данных в PostgreSQL"""
|
||
|
||
def table_exists(self, table_name: str) -> bool:
|
||
"""Проверка существования таблицы"""
|
||
try:
|
||
inspector = inspect(db_connector.dst_engine)
|
||
return inspector.has_table(table_name)
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка проверки таблицы {table_name}: {e}")
|
||
return False
|
||
|
||
def create_table(self, table_name: str, df: pd.DataFrame):
|
||
"""Создание таблицы из DataFrame"""
|
||
try:
|
||
if self.table_exists(table_name.lower()):
|
||
migration_logger.info(f"Таблица {table_name} существует, удаляем...")
|
||
with db_connector.dst_connection() as conn:
|
||
conn.execute(text(f'DROP TABLE IF EXISTS "{table_name.lower()}" CASCADE'))
|
||
conn.commit()
|
||
|
||
df.to_sql(
|
||
table_name.lower(),
|
||
db_connector.dst_engine,
|
||
if_exists='replace',
|
||
index=False,
|
||
chunksize=10000
|
||
)
|
||
migration_logger.info(f"Таблица {table_name} создана, {len(df)} строк")
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка создания таблицы {table_name}: {e}")
|
||
raise
|
||
|
||
def append_data(self, table_name: str, df: pd.DataFrame):
|
||
"""Добавление данных с игнорированием дубликатов"""
|
||
try:
|
||
if df.empty:
|
||
return
|
||
|
||
# Отключаем проверку внешних ключей на время вставки
|
||
with db_connector.dst_engine.connect() as conn:
|
||
conn.execute(text("SET session_replication_role = 'replica';"))
|
||
|
||
df.to_sql(
|
||
table_name.lower(),
|
||
conn,
|
||
if_exists='append',
|
||
index=False,
|
||
chunksize=10000,
|
||
method='multi'
|
||
)
|
||
|
||
conn.execute(text("SET session_replication_role = 'origin';"))
|
||
conn.commit()
|
||
|
||
migration_logger.info(f"Добавлено {len(df)} строк в {table_name}")
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка добавления данных в {table_name}: {e}")
|
||
raise
|
||
|
||
def create_indexes(self, table_name: str, indexes: List[Dict[str, Any]]):
|
||
"""Создание индексов"""
|
||
if not indexes:
|
||
return
|
||
|
||
pg_table = table_name.lower()
|
||
|
||
for index_info in indexes:
|
||
try:
|
||
index_name = f"idx_{pg_table}_{index_info['name'].lower()}"
|
||
index_name = ''.join(c for c in index_name if c.isalnum() or c == '_')
|
||
|
||
columns = ', '.join([f'"{col}"' for col in index_info['columns']])
|
||
unique_str = 'UNIQUE ' if index_info.get('unique') else ''
|
||
|
||
sql = f'CREATE {unique_str}INDEX IF NOT EXISTS "{index_name}" ON "{pg_table}" ({columns})'
|
||
|
||
with db_connector.dst_connection() as conn:
|
||
conn.execute(text(sql))
|
||
conn.commit()
|
||
|
||
migration_logger.info(f"Создан индекс: {index_name}")
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка создания индекса {index_info.get('name')}", e)
|
||
|
||
def create_foreign_keys(self, table_name: str, foreign_keys: List[Dict[str, str]]):
|
||
"""Создание внешних ключей"""
|
||
if not foreign_keys:
|
||
return
|
||
|
||
pg_table = table_name.lower()
|
||
|
||
for fk_info in foreign_keys:
|
||
try:
|
||
referenced_table = fk_info['referenced_table'].lower()
|
||
|
||
# Проверяем существование таблицы
|
||
with db_connector.dst_connection() as conn:
|
||
check_table_sql = """
|
||
SELECT EXISTS (
|
||
SELECT FROM information_schema.tables
|
||
WHERE table_schema = 'public'
|
||
AND table_name = :table_name
|
||
);
|
||
"""
|
||
result = conn.execute(text(check_table_sql), {"table_name": referenced_table})
|
||
table_exists = result.scalar()
|
||
|
||
if not table_exists:
|
||
migration_logger.error(f"Таблица '{referenced_table}' не существует. Пропускаем создание внешнего ключа")
|
||
continue
|
||
|
||
|
||
fk_name = f"fk_{pg_table}_{fk_info['parent_column'].lower()}"
|
||
fk_name = ''.join(c for c in fk_name if c.isalnum() or c == '_')
|
||
|
||
create_sql = f"""
|
||
ALTER TABLE "{pg_table}"
|
||
ADD CONSTRAINT "{fk_name}"
|
||
FOREIGN KEY ("{fk_info['parent_column']}")
|
||
REFERENCES "{referenced_table}" ("{fk_info['referenced_column']}")
|
||
"""
|
||
|
||
with db_connector.dst_connection() as conn:
|
||
conn.execute(text(create_sql))
|
||
conn.commit()
|
||
|
||
migration_logger.info(f"Создан внешний ключ: {fk_name}")
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка создания внешнего ключа {fk_info.get('name', 'unknown')}", e)
|
||
|
||
def analyze_table(self, table_name: str):
|
||
"""Выполнить ANALYZE"""
|
||
try:
|
||
with db_connector.dst_connection() as conn:
|
||
conn.execute(text(f'ANALYZE "{table_name.lower()}"'))
|
||
conn.commit()
|
||
migration_logger.info(f"ANALYZE выполнен для {table_name}")
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка ANALYZE для {table_name}: {e}")
|
||
|
||
def upsert_data(self, table_name: str, df: pd.DataFrame, id_column: str):
|
||
"""
|
||
Вставляет или обновляет данные в PostgreSQL.
|
||
|
||
Args:
|
||
table_name: Имя таблицы
|
||
df: DataFrame с данными
|
||
id_column: Колонка с ID для определения конфликта
|
||
"""
|
||
if df.empty:
|
||
return
|
||
|
||
table_name = table_name.lower()
|
||
|
||
# Создаем подключение
|
||
with db_connector.dst_engine.begin() as conn:
|
||
# Получаем существующие ID
|
||
existing_ids_query = f"SELECT \"{id_column}\" FROM {table_name}"
|
||
existing_ids = pd.read_sql_query(existing_ids_query, conn)
|
||
existing_set = set(existing_ids[id_column].tolist())
|
||
|
||
# Разделяем на новые и существующие
|
||
new_records = df[~df[id_column].isin(existing_set)]
|
||
update_records = df[df[id_column].isin(existing_set)]
|
||
print(f"new_records: {len(new_records)}, update_records: {len(update_records)}")
|
||
|
||
# Вставляем новые
|
||
if not new_records.empty:
|
||
new_records.to_sql(table_name, conn, if_exists='append', index=False, method='multi')
|
||
migration_logger.info(f" Вставлено {len(new_records)} записей")
|
||
|
||
# Обновляем существующие
|
||
if not update_records.empty:
|
||
print(f"Обновляем {len(update_records)} записей")
|
||
|
||
# Преобразуем DataFrame в список словарей для bulk update
|
||
records = update_records.to_dict('records')
|
||
|
||
# Обновляем каждую запись
|
||
for record in records:
|
||
id_value = record[id_column]
|
||
update_dict = {k: v for k, v in record.items() if k != id_column and v is not None and not pd.isna(v)}
|
||
|
||
if update_dict:
|
||
set_clause = ', '.join([f'"{k}" = :{k}' for k in update_dict.keys()])
|
||
update_query = text(f'UPDATE {table_name} SET {set_clause} WHERE "{id_column}" = :id_value')
|
||
params = {**update_dict, 'id_value': id_value}
|
||
conn.execute(update_query, params)
|
||
|
||
migration_logger.info(f" Обновлено {len(update_records)} записей")
|
||
|
||
def delete_data(self, table_name: str, id_column: str, ids: List[Any]):
|
||
"""Удаляет данные из PostgreSQL по списку ID"""
|
||
if not ids:
|
||
return
|
||
|
||
table_name = table_name.lower()
|
||
total_deleted = 0
|
||
|
||
# Разбиваем на чанки по 1000 ID для избежания проблем с длиной запроса
|
||
chunk_size = 1000
|
||
for i in range(0, len(ids), chunk_size):
|
||
chunk_ids = ids[i:i+chunk_size]
|
||
|
||
# Для одного ID
|
||
if len(chunk_ids) == 1:
|
||
delete_query = text(f'DELETE FROM {table_name} WHERE "{id_column}" = :id')
|
||
params = {"id": chunk_ids[0]}
|
||
print(f"query DELETE FROM {table_name} WHERE {id_column} = :id [{chunk_ids[0]}]")
|
||
|
||
with db_connector.dst_engine.begin() as conn:
|
||
result = conn.execute(delete_query, params)
|
||
total_deleted += result.rowcount
|
||
|
||
# Для нескольких ID
|
||
else:
|
||
# Используем ANY (более эффективно для PostgreSQL)
|
||
delete_query = text(f'DELETE FROM {table_name} WHERE "{id_column}" = ANY(:ids)')
|
||
params = {"ids": chunk_ids}
|
||
|
||
with db_connector.dst_engine.begin() as conn:
|
||
result = conn.execute(delete_query, params)
|
||
total_deleted += result.rowcount
|
||
|
||
migration_logger.info(f" Всего удалено: {total_deleted}")
|
||
|
||
data_writer = DataWriter() |