184 lines
8.7 KiB
Python
184 lines
8.7 KiB
Python
import pandas as pd
|
||
from typing import Any, Iterator, List, Optional, Generator
|
||
|
||
from sqlalchemy import text
|
||
from app.core.database import db_connector
|
||
from app.core.logging import migration_logger
|
||
from app.core.config import settings
|
||
|
||
|
||
class DataReader:
|
||
"""Чтение данных по ID с поддержкой чанков"""
|
||
|
||
def read_by_id_chunked(self, table_name: str, id_column: str, last_id: Optional[int]) -> Generator[pd.DataFrame, None, None]:
|
||
"""
|
||
Чтение записей порциями (чанками) для больших таблиц
|
||
Возвращает генератор DataFrame'ов
|
||
"""
|
||
try:
|
||
if last_id is None:
|
||
migration_logger.info(f"Первая загрузка {table_name}")
|
||
# Получаем минимальный ID
|
||
min_max_query = f"SELECT MIN({id_column}) as min_id, MAX({id_column}) as max_id, COUNT(*) as total FROM {table_name}"
|
||
stats_df = pd.read_sql_query(min_max_query, db_connector.src_engine)
|
||
|
||
if stats_df.empty or stats_df.iloc[0]['total'] == 0:
|
||
migration_logger.warning(f"Таблица {table_name} пуста")
|
||
return
|
||
|
||
min_id = stats_df.iloc[0]['min_id'] or 0
|
||
max_id = stats_df.iloc[0]['max_id']
|
||
total = stats_df.iloc[0]['total']
|
||
|
||
migration_logger.info(f"Статистика {table_name}: ID от {min_id} до {max_id}, всего {total} строк")
|
||
|
||
# Загружаем порциями
|
||
current_id = min_id
|
||
chunk_num = 0
|
||
|
||
while current_id <= max_id:
|
||
chunk_num += 1
|
||
chunk_query = f"""
|
||
SELECT * FROM {table_name}
|
||
WHERE {id_column} >= {current_id} AND {id_column} < {current_id + settings.CHUNK_SIZE}
|
||
ORDER BY {id_column} ASC
|
||
"""
|
||
|
||
chunk_df = pd.read_sql_query(chunk_query, db_connector.src_engine)
|
||
|
||
if not chunk_df.empty:
|
||
migration_logger.info(f" Чанк {chunk_num}: загружено {len(chunk_df)} строк (ID: {current_id} - {current_id + settings.CHUNK_SIZE})")
|
||
yield chunk_df
|
||
|
||
current_id += settings.CHUNK_SIZE
|
||
|
||
if chunk_num % settings.BATCH_SIZE == 0:
|
||
migration_logger.info(f" Прогресс: обработано {chunk_num} чанков, загружено {total_loaded} строк")
|
||
|
||
# Небольшая задержка для снижения нагрузки
|
||
if chunk_num % 10 == 0:
|
||
import time
|
||
time.sleep(0.1)
|
||
|
||
else:
|
||
# Инкрементальная загрузка - тоже порциями
|
||
migration_logger.info(f"📖 Инкрементальная загрузка {table_name} с ID > {last_id}")
|
||
|
||
# Получаем максимальный ID для оценки прогресса
|
||
max_query = f"SELECT MAX({id_column}) as max_id FROM {table_name}"
|
||
max_df = pd.read_sql_query(max_query, db_connector.src_engine)
|
||
max_id = max_df.iloc[0]['max_id'] if not max_df.empty else last_id
|
||
|
||
current_id = last_id + 1
|
||
chunk_num = 0
|
||
total_loaded = 0
|
||
|
||
while current_id <= max_id:
|
||
chunk_num += 1
|
||
chunk_query = f"""
|
||
SELECT * FROM {table_name}
|
||
WHERE {id_column} >= {current_id} AND {id_column} < {current_id + settings.CHUNK_SIZE}
|
||
ORDER BY {id_column} ASC
|
||
"""
|
||
|
||
chunk_df = pd.read_sql_query(chunk_query, db_connector.src_engine)
|
||
|
||
if not chunk_df.empty:
|
||
yield chunk_df
|
||
total_loaded += len(chunk_df)
|
||
migration_logger.info(f" Чанк {chunk_num}: загружено {len(chunk_df)} строк (всего {total_loaded})")
|
||
|
||
current_id += settings.CHUNK_SIZE
|
||
|
||
if chunk_num % settings.BATCH_SIZE == 0:
|
||
migration_logger.info(f" Прогресс: обработано {chunk_num} чанков, загружено {total_loaded} строк")
|
||
|
||
# Небольшая задержка
|
||
if chunk_num % 10 == 0:
|
||
import time
|
||
time.sleep(0.1)
|
||
|
||
migration_logger.info(f"Инкрементальная загрузка завершена, всего {total_loaded} новых строк")
|
||
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка чтения {table_name}: {e}")
|
||
raise
|
||
|
||
def prepare_query_for_pymssql(self, query: str) -> str:
|
||
"""Конвертирует ? плейсхолдеры в %s для pymssql"""
|
||
return query.replace('?', '%s')
|
||
|
||
def read_custom_query_chunked(self, query: str, params: Optional[tuple] = None,
|
||
chunksize: int = 5000) -> Iterator[pd.DataFrame]:
|
||
"""
|
||
Читает данные по произвольному SQL-запросу пачками.
|
||
|
||
Args:
|
||
query: SQL-запрос с плейсхолдерами (?)
|
||
params: Параметры для запроса (список)
|
||
chunksize: Размер пачки
|
||
|
||
Returns:
|
||
Iterator[pd.DataFrame]: Итератор по пачкам данных
|
||
"""
|
||
migration_logger.debug(f"Executing custom query: {query[:200]}...")
|
||
migration_logger.debug(f"Params: {params}")
|
||
|
||
# Используем pandas для чанкованного чтения
|
||
if params:
|
||
preparedQuery = self.prepare_query_for_pymssql(query)
|
||
return pd.read_sql_query(preparedQuery, db_connector.src_engine, params=params, chunksize=chunksize)
|
||
else:
|
||
return pd.read_sql_query(query, db_connector.src_engine, chunksize=chunksize)
|
||
|
||
def get_row_count(self, table_name: str) -> int:
|
||
"""Получить количество строк в таблице"""
|
||
try:
|
||
query = f"SELECT COUNT(*) as cnt FROM {table_name}"
|
||
df = pd.read_sql_query(query, db_connector.src_engine)
|
||
return int(df.iloc[0]['cnt']) if not df.empty else 0
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка подсчета строк в {table_name}: {e}")
|
||
return 0
|
||
|
||
def get_last_id(self, table_name: str, id_column: str) -> int:
|
||
"""
|
||
Получает максимальный ID из таблицы
|
||
"""
|
||
try:
|
||
table_name = table_name.lower()
|
||
query = text(f'SELECT MAX("{id_column}") as max_id FROM {table_name}')
|
||
with db_connector.dst_engine as conn:
|
||
result = conn.execute(query).scalar()
|
||
return int(result) if result is not None else None
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка получения MAX ID для {table_name}: {e}")
|
||
return None
|
||
|
||
def get_table_stats(self, table_name: str, id_column: str) -> dict:
|
||
"""
|
||
Получает статистику по таблице назначения (PostgreSQL)
|
||
"""
|
||
try:
|
||
table_name = table_name.lower()
|
||
query = text(f"""
|
||
SELECT
|
||
COUNT(*) as total_rows,
|
||
MIN("{id_column}") as min_id,
|
||
MAX("{id_column}") as max_id
|
||
FROM {table_name}
|
||
""")
|
||
|
||
with db_connector.dst_engine.connect() as conn:
|
||
result = conn.execute(query).first()
|
||
|
||
return {
|
||
'total_rows': result[0] or 0,
|
||
'min_id': int(result[1]) if result[1] is not None else None,
|
||
'max_id': int(result[2]) if result[2] is not None else None
|
||
}
|
||
except Exception as e:
|
||
migration_logger.error(f"Ошибка получения статистики для {table_name} в PG: {e}")
|
||
return {'total_rows': 0, 'min_id': None, 'max_id': None}
|
||
|
||
data_reader = DataReader() |