Files
replicator/app/services/data_reader.py
2026-03-08 20:21:15 +09:00

184 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from typing import Any, Iterator, List, Optional, Generator
from sqlalchemy import text
from app.core.database import db_connector
from app.core.logging import migration_logger
from app.core.config import settings
class DataReader:
"""Чтение данных по ID с поддержкой чанков"""
def read_by_id_chunked(self, table_name: str, id_column: str, last_id: Optional[int]) -> Generator[pd.DataFrame, None, None]:
"""
Чтение записей порциями (чанками) для больших таблиц
Возвращает генератор DataFrame'ов
"""
try:
if last_id is None:
migration_logger.info(f"Первая загрузка {table_name}")
# Получаем минимальный ID
min_max_query = f"SELECT MIN({id_column}) as min_id, MAX({id_column}) as max_id, COUNT(*) as total FROM {table_name}"
stats_df = pd.read_sql_query(min_max_query, db_connector.src_engine)
if stats_df.empty or stats_df.iloc[0]['total'] == 0:
migration_logger.warning(f"Таблица {table_name} пуста")
return
min_id = stats_df.iloc[0]['min_id'] or 0
max_id = stats_df.iloc[0]['max_id']
total = stats_df.iloc[0]['total']
migration_logger.info(f"Статистика {table_name}: ID от {min_id} до {max_id}, всего {total} строк")
# Загружаем порциями
current_id = min_id
chunk_num = 0
while current_id <= max_id:
chunk_num += 1
chunk_query = f"""
SELECT * FROM {table_name}
WHERE {id_column} >= {current_id} AND {id_column} < {current_id + settings.CHUNK_SIZE}
ORDER BY {id_column} ASC
"""
chunk_df = pd.read_sql_query(chunk_query, db_connector.src_engine)
if not chunk_df.empty:
migration_logger.info(f" Чанк {chunk_num}: загружено {len(chunk_df)} строк (ID: {current_id} - {current_id + settings.CHUNK_SIZE})")
yield chunk_df
current_id += settings.CHUNK_SIZE
if chunk_num % settings.BATCH_SIZE == 0:
migration_logger.info(f" Прогресс: обработано {chunk_num} чанков, загружено {total_loaded} строк")
# Небольшая задержка для снижения нагрузки
if chunk_num % 10 == 0:
import time
time.sleep(0.1)
else:
# Инкрементальная загрузка - тоже порциями
migration_logger.info(f"📖 Инкрементальная загрузка {table_name} с ID > {last_id}")
# Получаем максимальный ID для оценки прогресса
max_query = f"SELECT MAX({id_column}) as max_id FROM {table_name}"
max_df = pd.read_sql_query(max_query, db_connector.src_engine)
max_id = max_df.iloc[0]['max_id'] if not max_df.empty else last_id
current_id = last_id + 1
chunk_num = 0
total_loaded = 0
while current_id <= max_id:
chunk_num += 1
chunk_query = f"""
SELECT * FROM {table_name}
WHERE {id_column} >= {current_id} AND {id_column} < {current_id + settings.CHUNK_SIZE}
ORDER BY {id_column} ASC
"""
chunk_df = pd.read_sql_query(chunk_query, db_connector.src_engine)
if not chunk_df.empty:
yield chunk_df
total_loaded += len(chunk_df)
migration_logger.info(f" Чанк {chunk_num}: загружено {len(chunk_df)} строк (всего {total_loaded})")
current_id += settings.CHUNK_SIZE
if chunk_num % settings.BATCH_SIZE == 0:
migration_logger.info(f" Прогресс: обработано {chunk_num} чанков, загружено {total_loaded} строк")
# Небольшая задержка
if chunk_num % 10 == 0:
import time
time.sleep(0.1)
migration_logger.info(f"Инкрементальная загрузка завершена, всего {total_loaded} новых строк")
except Exception as e:
migration_logger.error(f"Ошибка чтения {table_name}: {e}")
raise
def prepare_query_for_pymssql(self, query: str) -> str:
"""Конвертирует ? плейсхолдеры в %s для pymssql"""
return query.replace('?', '%s')
def read_custom_query_chunked(self, query: str, params: Optional[tuple] = None,
chunksize: int = 5000) -> Iterator[pd.DataFrame]:
"""
Читает данные по произвольному SQL-запросу пачками.
Args:
query: SQL-запрос с плейсхолдерами (?)
params: Параметры для запроса (список)
chunksize: Размер пачки
Returns:
Iterator[pd.DataFrame]: Итератор по пачкам данных
"""
migration_logger.debug(f"Executing custom query: {query[:200]}...")
migration_logger.debug(f"Params: {params}")
# Используем pandas для чанкованного чтения
if params:
preparedQuery = self.prepare_query_for_pymssql(query)
return pd.read_sql_query(preparedQuery, db_connector.src_engine, params=params, chunksize=chunksize)
else:
return pd.read_sql_query(query, db_connector.src_engine, chunksize=chunksize)
def get_row_count(self, table_name: str) -> int:
"""Получить количество строк в таблице"""
try:
query = f"SELECT COUNT(*) as cnt FROM {table_name}"
df = pd.read_sql_query(query, db_connector.src_engine)
return int(df.iloc[0]['cnt']) if not df.empty else 0
except Exception as e:
migration_logger.error(f"Ошибка подсчета строк в {table_name}: {e}")
return 0
def get_last_id(self, table_name: str, id_column: str) -> int:
"""
Получает максимальный ID из таблицы
"""
try:
table_name = table_name.lower()
query = text(f'SELECT MAX("{id_column}") as max_id FROM {table_name}')
with db_connector.dst_engine as conn:
result = conn.execute(query).scalar()
return int(result) if result is not None else None
except Exception as e:
migration_logger.error(f"Ошибка получения MAX ID для {table_name}: {e}")
return None
def get_table_stats(self, table_name: str, id_column: str) -> dict:
"""
Получает статистику по таблице назначения (PostgreSQL)
"""
try:
table_name = table_name.lower()
query = text(f"""
SELECT
COUNT(*) as total_rows,
MIN("{id_column}") as min_id,
MAX("{id_column}") as max_id
FROM {table_name}
""")
with db_connector.dst_engine.connect() as conn:
result = conn.execute(query).first()
return {
'total_rows': result[0] or 0,
'min_id': int(result[1]) if result[1] is not None else None,
'max_id': int(result[2]) if result[2] is not None else None
}
except Exception as e:
migration_logger.error(f"Ошибка получения статистики для {table_name} в PG: {e}")
return {'total_rows': 0, 'min_id': None, 'max_id': None}
data_reader = DataReader()