v2026.06.3

This commit is contained in:
brusnitsyn
2026-06-14 22:51:46 +09:00
parent b5d1f61a82
commit 3fb2053705
6 changed files with 603 additions and 21 deletions

View File

@@ -856,6 +856,20 @@ class DatabaseMigrator:
else:
conflict_action = "DO NOTHING"
try:
target_types = {
col['name']: str(col['type'])
for col in inspect(self.dst_engine).get_columns(table_name)
}
except Exception:
target_types = {}
select_expr = ', '.join(
f'CAST({self.quote_identifier(col)} AS {target_types[col]})'
if col in target_types else self.quote_identifier(col)
for col in columns
)
try:
if self.table_exists(staging):
with self.dst_engine.connect() as conn:
@@ -869,7 +883,7 @@ class DatabaseMigrator:
self.write_dataframe_batch_without_sqlalchemy(chunk, staging)
sql = f"""
INSERT INTO {self.quote_identifier(table_name)} ({quoted_columns})
SELECT {quoted_columns}
SELECT {select_expr}
FROM {self.quote_identifier(staging)}
ON CONFLICT ({conflict_columns}) {conflict_action}
"""
@@ -1403,9 +1417,37 @@ class DatabaseMigrator:
self.logger.log_table_success(table_name, 0)
return True
if (
last_watermark['last_x_datetime'] is None
and target_exists
and table_config.primary_key
and table_config.life_table
):
pg_count = self._get_pg_row_count(pg_table)
if pg_count > 0:
self.logger.log_info(
f"Таблица {pg_table} содержит {pg_count} строк без watermark — "
f"автоопределение watermark из {table_config.life_table}"
)
detected = self._get_watermark_for_pg_data(table_config, find_min=True)
if detected['last_x_datetime'] is not None:
self.save_watermark(
pg_table,
detected['last_x_datetime'],
detected['last_sequence_value'],
pg_count,
'success',
)
last_watermark = detected
self.logger.log_info(f"Watermark установлен автоматически: {last_watermark}")
else:
self.logger.log_warning(
f"Не удалось определить watermark для {pg_table} из существующих данных, "
f"начинаем инкрементальную миграцию с нуля"
)
if (
table_config.initial_load_mode == 'full_then_incremental'
and not target_exists
and last_watermark['last_x_datetime'] is None
):
self.logger.log_info(
@@ -1641,11 +1683,11 @@ class DatabaseMigrator:
return '0x' + value.hex()
return "'" + str(value).replace("'", "''") + "'"
def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig) -> Dict[str, Any]:
def _get_watermark_for_pg_data(self, table_config: TableMigrationConfig, find_min: bool = False) -> Dict[str, Any]:
"""Поиск watermark в MSSQL Life_ для строк, уже имеющихся в PostgreSQL.
Читает PK из PG потоком (без загрузки всей таблицы в память), батчами
запрашивает MSSQL Life_ и находит максимальную (datetime, sequence) пару
запрашивает MSSQL Life_ и находит минимальную или максимальную (datetime, sequence) пару
среди событий, относящихся к уже реплицированным строкам.
"""
empty: Dict[str, Any] = {'last_x_datetime': None, 'last_sequence_value': None}
@@ -1656,9 +1698,10 @@ class DatabaseMigrator:
LOOKUP_BATCH = 500
order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} DESC"]
direction = 'ASC' if find_min else 'DESC'
order_parts = [f"{self.quote_mssql_identifier(table_config.datetime_column)} {direction}"]
if table_config.sequence_column:
order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} DESC")
order_parts.append(f"{self.quote_mssql_identifier(table_config.sequence_column)} {direction}")
order_clause = ', '.join(order_parts)
select_parts = [
@@ -1722,15 +1765,18 @@ class DatabaseMigrator:
batch_dt = row['max_dt']
batch_seq = row['max_seq'] if table_config.sequence_column else None
if (
is_better = (
max_datetime is None
or batch_dt > max_datetime
or (
batch_dt == max_datetime
and batch_seq is not None
and (max_sequence is None or batch_seq > max_sequence)
)
):
or (find_min and (
batch_dt < max_datetime
or (batch_dt == max_datetime and batch_seq is not None and (max_sequence is None or batch_seq < max_sequence))
))
or (not find_min and (
batch_dt > max_datetime
or (batch_dt == max_datetime and batch_seq is not None and (max_sequence is None or batch_seq > max_sequence))
))
)
if is_better:
max_datetime = batch_dt
max_sequence = batch_seq
@@ -1740,9 +1786,10 @@ class DatabaseMigrator:
f"(строки {total_rows - len(batch) + 1}{total_rows}): {e}"
)
direction_label = 'минимальный' if find_min else 'максимальный'
self.logger.log_info(
f"Обработано {total_rows} PK из {table_config.pg_table} "
f"в {batch_num} батчах, watermark: {max_datetime}"
f"в {batch_num} батчах, {direction_label} watermark: {max_datetime}"
)
return {'last_x_datetime': max_datetime, 'last_sequence_value': max_sequence}

View File

@@ -494,7 +494,7 @@ class MigrationJobQueue:
def _materialize_due_schedules(self):
now = datetime.now()
grace_cutoff = now - timedelta(seconds=Config.SCHEDULE_GRACE_SECONDS)
grace_cutoff = now - timedelta(seconds=max(Config.SCHEDULE_GRACE_SECONDS, Config.QUEUE_POLL_SECONDS))
with self._get_engine().connect() as conn:
due_rows = conn.execute(self._text(f"""
SELECT *