Files
syncio/app/queue.py
brusnitsyn b5d1f61a82 v2026.06
2026-06-10 16:53:03 +09:00

753 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import threading
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, time as dt_time
from typing import Any, Dict, List, Optional
from .config import Config
@dataclass
class MigrationJob:
"""Сериализуемое представление задания миграции."""
job_id: str
created_at: datetime
run_at: datetime
tables: Optional[List[str]]
send_email: bool
dry_run: Optional[bool]
read_limit: Optional[int]
force_full: bool
status: str
started_at: Optional[datetime]
finished_at: Optional[datetime]
report: Optional[Dict[str, Any]]
error: Optional[str]
queue_sequence: int
schedule_id: Optional[str]
def to_dict(self) -> Dict[str, Any]:
return {
'job_id': self.job_id,
'created_at': self.created_at.isoformat(),
'run_at': self.run_at.isoformat(),
'tables': self.tables,
'send_email': self.send_email,
'dry_run': self.dry_run,
'read_limit': self.read_limit,
'force_full': self.force_full,
'status': self.status,
'started_at': self.started_at.isoformat() if self.started_at else None,
'finished_at': self.finished_at.isoformat() if self.finished_at else None,
'report': self.report,
'error': self.error,
'queue_sequence': self.queue_sequence,
'schedule_id': self.schedule_id,
}
@dataclass
class MigrationSchedule:
"""Сериализуемое представление расписания."""
schedule_id: str
created_at: datetime
updated_at: datetime
name: Optional[str]
schedule_type: str
enabled: bool
catch_up_missed_runs: bool
initial_force_full: bool
tables: Optional[List[str]]
send_email: bool
dry_run: Optional[bool]
read_limit: Optional[int]
interval_seconds: Optional[int]
daily_time: Optional[str]
start_at: Optional[datetime]
next_run_at: datetime
last_enqueued_at: Optional[datetime]
last_job_id: Optional[str]
def to_dict(self) -> Dict[str, Any]:
return {
'schedule_id': self.schedule_id,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat(),
'name': self.name,
'schedule_type': self.schedule_type,
'enabled': self.enabled,
'catch_up_missed_runs': self.catch_up_missed_runs,
'initial_force_full': self.initial_force_full,
'tables': self.tables,
'send_email': self.send_email,
'dry_run': self.dry_run,
'read_limit': self.read_limit,
'interval_seconds': self.interval_seconds,
'daily_time': self.daily_time,
'start_at': self.start_at.isoformat() if self.start_at else None,
'next_run_at': self.next_run_at.isoformat(),
'last_enqueued_at': self.last_enqueued_at.isoformat() if self.last_enqueued_at else None,
'last_job_id': self.last_job_id,
}
class MigrationJobQueue:
"""Persistent очередь и планировщик миграций поверх PostgreSQL."""
JOBS_TABLE = 'migration_jobs'
SCHEDULES_TABLE = 'migration_schedules'
def __init__(self):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.engine = None
self.schema_ready = False
self.worker_started = False
self.worker = None
def start(self):
"""Ленивая инициализация worker thread."""
with self.condition:
if self.worker_started:
return
self.worker_started = True
self.worker = threading.Thread(target=self._worker_loop, daemon=True)
self.worker.start()
self.condition.notify_all()
def enqueue(
self,
tables: Optional[List[str]] = None,
send_email: bool = True,
dry_run: Optional[bool] = None,
read_limit: Optional[int] = None,
force_full: bool = False,
run_at: Optional[datetime] = None,
delay_seconds: Optional[int] = None,
schedule_id: Optional[str] = None,
) -> Dict[str, Any]:
self.start()
self._ensure_schema()
scheduled_at = self._resolve_run_at(run_at=run_at, delay_seconds=delay_seconds)
job_id = str(uuid.uuid4())
created_at = datetime.now()
sql = self._text(f"""
INSERT INTO {self._qualified_table(self.JOBS_TABLE)}
(job_id, schedule_id, created_at, run_at, tables_json, send_email, dry_run, read_limit, force_full, status)
VALUES
(:job_id, :schedule_id, :created_at, :run_at, CAST(:tables_json AS jsonb), :send_email, :dry_run, :read_limit, :force_full, 'queued')
RETURNING *
""")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {
'job_id': job_id,
'schedule_id': schedule_id,
'created_at': created_at,
'run_at': scheduled_at,
'tables_json': json.dumps(tables) if tables is not None else None,
'send_email': send_email,
'dry_run': dry_run,
'read_limit': read_limit,
'force_full': force_full,
}).mappings().first()
conn.commit()
with self.condition:
self.condition.notify_all()
return self._job_from_row(row).to_dict()
def list_jobs(self, limit: int = 100) -> List[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"""
SELECT *
FROM {self._qualified_table(self.JOBS_TABLE)}
ORDER BY queue_sequence DESC
LIMIT :limit
""")
with self._get_engine().connect() as conn:
rows = conn.execute(sql, {'limit': limit}).mappings().all()
return [self._job_from_row(row).to_dict() for row in rows]
def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.JOBS_TABLE)} WHERE job_id = :job_id")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {'job_id': job_id}).mappings().first()
return self._job_from_row(row).to_dict() if row else None
def create_schedule(
self,
schedule_type: str,
tables: Optional[List[str]] = None,
send_email: bool = True,
dry_run: Optional[bool] = None,
read_limit: Optional[int] = None,
interval_seconds: Optional[int] = None,
daily_time: Optional[str] = None,
start_at: Optional[datetime] = None,
name: Optional[str] = None,
enabled: bool = True,
catch_up_missed_runs: bool = False,
initial_force_full: bool = False,
) -> Dict[str, Any]:
self.start()
self._ensure_schema()
schedule_id = str(uuid.uuid4())
created_at = datetime.now()
normalized_start_at = self._normalize_datetime(start_at)
next_run_at = self._resolve_next_schedule_run_at(
schedule_type=schedule_type,
interval_seconds=interval_seconds,
daily_time=daily_time,
start_at=normalized_start_at,
reference=created_at,
)
sql = self._text(f"""
INSERT INTO {self._qualified_table(self.SCHEDULES_TABLE)}
(
schedule_id, created_at, updated_at, name, schedule_type, enabled,
catch_up_missed_runs, initial_force_full, tables_json, send_email, dry_run, read_limit, interval_seconds,
daily_time, start_at, next_run_at
)
VALUES
(
:schedule_id, :created_at, :updated_at, :name, :schedule_type, :enabled,
:catch_up_missed_runs, :initial_force_full,
CAST(:tables_json AS jsonb), :send_email, :dry_run, :read_limit, :interval_seconds,
:daily_time, :start_at, :next_run_at
)
RETURNING *
""")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {
'schedule_id': schedule_id,
'created_at': created_at,
'updated_at': created_at,
'name': name,
'schedule_type': schedule_type,
'enabled': enabled,
'catch_up_missed_runs': catch_up_missed_runs,
'initial_force_full': initial_force_full,
'tables_json': json.dumps(tables) if tables is not None else None,
'send_email': send_email,
'dry_run': dry_run,
'read_limit': read_limit,
'interval_seconds': interval_seconds,
'daily_time': daily_time,
'start_at': normalized_start_at,
'next_run_at': next_run_at,
}).mappings().first()
conn.commit()
with self.condition:
self.condition.notify_all()
return self._schedule_from_row(row).to_dict()
def list_schedules(self) -> List[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} ORDER BY next_run_at, created_at")
with self._get_engine().connect() as conn:
rows = conn.execute(sql).mappings().all()
return [self._schedule_from_row(row).to_dict() for row in rows]
def get_schedule(self, schedule_id: str) -> Optional[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} WHERE schedule_id = :schedule_id")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {'schedule_id': schedule_id}).mappings().first()
return self._schedule_from_row(row).to_dict() if row else None
def get_status(self) -> Dict[str, Any]:
self.start()
self._ensure_schema()
with self._get_engine().connect() as conn:
running_job = conn.execute(self._text(f"""
SELECT *
FROM {self._qualified_table(self.JOBS_TABLE)}
WHERE status = 'running'
ORDER BY started_at DESC
LIMIT 1
""")).mappings().first()
queued_jobs = conn.execute(self._text(f"""
SELECT COUNT(*)
FROM {self._qualified_table(self.JOBS_TABLE)}
WHERE status = 'queued'
""")).scalar()
schedules = conn.execute(self._text(f"""
SELECT COUNT(*)
FROM {self._qualified_table(self.SCHEDULES_TABLE)}
WHERE enabled = TRUE
""")).scalar()
return {
'running': running_job is not None,
'running_job': self._job_from_row(running_job).to_dict() if running_job else None,
'queued_jobs': int(queued_jobs or 0),
'enabled_schedules': int(schedules or 0),
}
def _get_engine(self):
if self.engine is None:
from sqlalchemy import create_engine
self.engine = create_engine(Config.POSTGRES_CONNECTION_STRING)
return self.engine
def _text(self, sql: str):
from sqlalchemy import text
return text(sql)
def _quote_identifier(self, identifier: str) -> str:
return '"' + identifier.replace('"', '""') + '"'
def _qualified_table(self, table_name: str) -> str:
return f'{self._quote_identifier(Config.REPLICATOR_SCHEMA)}.{self._quote_identifier(table_name)}'
def _ensure_schema(self):
if self.schema_ready:
return
with self.lock:
if self.schema_ready:
return
with self._get_engine().connect() as conn:
conn.execute(self._text(
f'CREATE SCHEMA IF NOT EXISTS {self._quote_identifier(Config.REPLICATOR_SCHEMA)}'
))
conn.execute(self._text(f"""
CREATE TABLE IF NOT EXISTS {self._qualified_table(self.JOBS_TABLE)} (
job_id text PRIMARY KEY,
schedule_id text NULL,
created_at timestamp NOT NULL,
run_at timestamp NOT NULL,
tables_json jsonb NULL,
send_email boolean NOT NULL DEFAULT TRUE,
dry_run boolean NULL,
read_limit integer NULL,
force_full boolean NOT NULL DEFAULT FALSE,
status text NOT NULL,
started_at timestamp NULL,
finished_at timestamp NULL,
report_json jsonb NULL,
error text NULL,
queue_sequence bigint GENERATED ALWAYS AS IDENTITY
)
"""))
conn.execute(self._text(f"""
CREATE INDEX IF NOT EXISTS idx_{self.JOBS_TABLE}_status_run_at
ON {self._qualified_table(self.JOBS_TABLE)} (status, run_at, queue_sequence)
"""))
conn.execute(self._text(f"""
CREATE TABLE IF NOT EXISTS {self._qualified_table(self.SCHEDULES_TABLE)} (
schedule_id text PRIMARY KEY,
created_at timestamp NOT NULL,
updated_at timestamp NOT NULL,
name text NULL,
schedule_type text NOT NULL,
enabled boolean NOT NULL DEFAULT TRUE,
catch_up_missed_runs boolean NOT NULL DEFAULT FALSE,
initial_force_full boolean NOT NULL DEFAULT FALSE,
tables_json jsonb NULL,
send_email boolean NOT NULL DEFAULT TRUE,
dry_run boolean NULL,
read_limit integer NULL,
interval_seconds integer NULL,
daily_time text NULL,
start_at timestamp NULL,
next_run_at timestamp NOT NULL,
last_enqueued_at timestamp NULL,
last_job_id text NULL
)
"""))
conn.execute(self._text(f"""
ALTER TABLE {self._qualified_table(self.SCHEDULES_TABLE)}
ADD COLUMN IF NOT EXISTS catch_up_missed_runs boolean NOT NULL DEFAULT FALSE
"""))
conn.execute(self._text(f"""
ALTER TABLE {self._qualified_table(self.SCHEDULES_TABLE)}
ADD COLUMN IF NOT EXISTS initial_force_full boolean NOT NULL DEFAULT FALSE
"""))
conn.execute(self._text(f"""
ALTER TABLE {self._qualified_table(self.JOBS_TABLE)}
ADD COLUMN IF NOT EXISTS force_full boolean NOT NULL DEFAULT FALSE
"""))
conn.execute(self._text(f"""
CREATE INDEX IF NOT EXISTS idx_{self.SCHEDULES_TABLE}_enabled_next_run_at
ON {self._qualified_table(self.SCHEDULES_TABLE)} (enabled, next_run_at)
"""))
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.JOBS_TABLE)}
SET status = 'failed',
finished_at = :now,
error = 'Прервано перезапуском воркера'
WHERE status = 'running'
"""), {'now': datetime.now()})
conn.commit()
self.schema_ready = True
def _resolve_run_at(
self,
run_at: Optional[datetime] = None,
delay_seconds: Optional[int] = None,
) -> datetime:
if run_at and delay_seconds is not None:
raise ValueError("Specify either run_at or delay_seconds")
if delay_seconds is not None:
return datetime.now() + timedelta(seconds=delay_seconds)
if run_at is None:
return datetime.now()
return self._normalize_datetime(run_at)
def _resolve_next_schedule_run_at(
self,
schedule_type: str,
interval_seconds: Optional[int],
daily_time: Optional[str],
start_at: Optional[datetime],
reference: datetime,
) -> datetime:
normalized_reference = self._normalize_datetime(reference)
normalized_start_at = self._normalize_datetime(start_at) if start_at else None
baseline = normalized_start_at or normalized_reference
if schedule_type == 'interval':
if not interval_seconds or interval_seconds <= 0:
raise ValueError("interval_seconds must be greater than 0 for interval schedule")
return baseline if baseline > normalized_reference else normalized_reference + timedelta(seconds=interval_seconds)
if schedule_type == 'daily':
if not daily_time:
raise ValueError("daily_time is required for daily schedule")
parsed_time = self._parse_daily_time(daily_time)
candidate_date = baseline.date()
candidate = datetime.combine(candidate_date, parsed_time)
if normalized_start_at and candidate < normalized_start_at:
candidate = datetime.combine(normalized_start_at.date(), parsed_time)
if candidate <= normalized_reference:
candidate = candidate + timedelta(days=1)
return candidate
raise ValueError("schedule_type must be one of: interval, daily")
def _next_schedule_run_from_row(self, row: Dict[str, Any], reference: datetime) -> datetime:
schedule_type = row['schedule_type']
if schedule_type == 'interval':
return reference + timedelta(seconds=int(row['interval_seconds']))
if schedule_type == 'daily':
parsed_time = self._parse_daily_time(row['daily_time'])
candidate = datetime.combine(reference.date(), parsed_time)
if candidate <= reference:
candidate += timedelta(days=1)
return candidate
raise ValueError(f"Unsupported schedule_type: {schedule_type}")
def _parse_daily_time(self, raw_value: str) -> dt_time:
parts = raw_value.split(':')
if len(parts) not in (2, 3):
raise ValueError("daily_time must be HH:MM or HH:MM:SS")
hour = int(parts[0])
minute = int(parts[1])
second = int(parts[2]) if len(parts) == 3 else 0
return dt_time(hour=hour, minute=minute, second=second)
def _normalize_datetime(self, value: Optional[datetime]) -> Optional[datetime]:
if value is None:
return None
if value.tzinfo is not None:
return value.astimezone().replace(tzinfo=None)
return value
def _worker_loop(self):
self._ensure_schema()
while True:
try:
self._materialize_due_schedules()
job_row = self._claim_next_due_job()
if job_row:
self._execute_job(self._job_from_row(job_row))
continue
except Exception:
import logging
logging.getLogger(__name__).exception("Необработанная ошибка в _worker_loop")
time.sleep(1.0)
continue
with self.condition:
self.condition.wait(timeout=Config.QUEUE_POLL_SECONDS)
def _materialize_due_schedules(self):
now = datetime.now()
grace_cutoff = now - timedelta(seconds=Config.SCHEDULE_GRACE_SECONDS)
with self._get_engine().connect() as conn:
due_rows = conn.execute(self._text(f"""
SELECT *
FROM {self._qualified_table(self.SCHEDULES_TABLE)}
WHERE enabled = TRUE
AND next_run_at <= :now
ORDER BY next_run_at, created_at
FOR UPDATE
"""), {'now': now}).mappings().all()
for row in due_rows:
missed_run = row['next_run_at'] < grace_cutoff
next_run_at = self._next_schedule_run_from_row(row, now)
job_id = row['last_job_id']
force_full = bool(row.get('initial_force_full', False))
if not missed_run or row['catch_up_missed_runs']:
job_id = str(uuid.uuid4())
conn.execute(self._text(f"""
INSERT INTO {self._qualified_table(self.JOBS_TABLE)}
(job_id, schedule_id, created_at, run_at, tables_json, send_email, dry_run, read_limit, force_full, status)
VALUES
(
:job_id, :schedule_id, :created_at, :run_at, CAST(:tables_json AS jsonb),
:send_email, :dry_run, :read_limit, :force_full, 'queued'
)
"""), {
'job_id': job_id,
'schedule_id': row['schedule_id'],
'created_at': now,
'run_at': now,
'tables_json': json.dumps(row['tables_json']) if row['tables_json'] is not None else None,
'send_email': row['send_email'],
'dry_run': row['dry_run'],
'read_limit': row['read_limit'],
'force_full': force_full,
})
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.SCHEDULES_TABLE)}
SET updated_at = :updated_at,
last_enqueued_at = :last_enqueued_at,
last_job_id = :last_job_id,
initial_force_full = CASE
WHEN :reset_initial_force_full THEN FALSE
ELSE initial_force_full
END,
next_run_at = :next_run_at
WHERE schedule_id = :schedule_id
"""), {
'updated_at': now,
'last_enqueued_at': now if (not missed_run or row['catch_up_missed_runs']) else row['last_enqueued_at'],
'last_job_id': job_id,
'reset_initial_force_full': (not missed_run or row['catch_up_missed_runs']) and force_full,
'next_run_at': next_run_at,
'schedule_id': row['schedule_id'],
})
conn.commit()
if due_rows:
with self.condition:
self.condition.notify_all()
def _claim_next_due_job(self) -> Optional[Dict[str, Any]]:
now = datetime.now()
sql = self._text(f"""
WITH next_job AS (
SELECT job_id
FROM {self._qualified_table(self.JOBS_TABLE)}
WHERE status = 'queued'
AND run_at <= :now
ORDER BY run_at, queue_sequence
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE {self._qualified_table(self.JOBS_TABLE)} job
SET status = 'running',
started_at = :now,
finished_at = NULL,
error = NULL,
report_json = NULL
FROM next_job
WHERE job.job_id = next_job.job_id
RETURNING job.*
""")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {'now': now}).mappings().first()
conn.commit()
return row
def _execute_job(self, job: MigrationJob):
import logging
logger = logging.getLogger(__name__)
migrator = None
final_status = 'failed'
final_report = None
final_error = 'Неизвестная ошибка'
try:
from .migrator import DatabaseMigrator
config = Config()
migrator = DatabaseMigrator(config)
report = migrator.run_migration(
table_names=job.tables,
send_email=False,
dry_run=job.dry_run,
read_limit=job.read_limit,
force_full=job.force_full,
)
migrator.cleanup_old_logs(days_to_keep=7)
final_status = 'completed'
final_report = report
final_error = None
if job.send_email:
migrator.send_notification(report)
except Exception as exc:
final_error = str(exc)
logger.exception(f"Ошибка выполнения job {job.job_id}")
if job.send_email and migrator is not None:
try:
migrator.send_failure_notification(
error=str(exc),
table_names=job.tables,
job_id=job.job_id,
)
except Exception:
pass
finally:
if migrator is not None:
try:
migrator.logger.close()
except Exception:
pass
try:
self._finish_job(
job.job_id,
status=final_status,
report=final_report,
error=final_error,
)
except Exception:
logger.exception(f"Не удалось финализировать job {job.job_id}, принудительный сброс")
self._force_fail_job(job.job_id, 'Ошибка финализации job')
def _force_fail_job(self, job_id: str, error: str):
"""Аварийная финализация job через новое соединение вне пула."""
try:
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
engine = create_engine(Config.POSTGRES_CONNECTION_STRING, poolclass=NullPool)
try:
with engine.connect() as conn:
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.JOBS_TABLE)}
SET status = 'failed',
finished_at = :now,
error = :error
WHERE job_id = :job_id
"""), {'now': datetime.now(), 'error': error, 'job_id': job_id})
conn.commit()
finally:
engine.dispose()
except Exception:
import logging
logging.getLogger(__name__).exception(
f"Критическая ошибка: не удалось аварийно завершить job {job_id}"
)
def _finish_job(
self,
job_id: str,
status: str,
report: Optional[Dict[str, Any]],
error: Optional[str],
):
with self._get_engine().connect() as conn:
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.JOBS_TABLE)}
SET status = :status,
finished_at = :finished_at,
report_json = CAST(:report_json AS jsonb),
error = :error
WHERE job_id = :job_id
"""), {
'status': status,
'finished_at': datetime.now(),
'report_json': json.dumps(report) if report is not None else None,
'error': error,
'job_id': job_id,
})
conn.commit()
def _job_from_row(self, row: Optional[Dict[str, Any]]) -> Optional[MigrationJob]:
if row is None:
return None
tables = row['tables_json']
if isinstance(tables, str):
tables = json.loads(tables)
report = row['report_json']
if isinstance(report, str):
report = json.loads(report)
return MigrationJob(
job_id=row['job_id'],
created_at=row['created_at'],
run_at=row['run_at'],
tables=tables,
send_email=row['send_email'],
dry_run=row['dry_run'],
read_limit=row['read_limit'],
force_full=row.get('force_full', False),
status=row['status'],
started_at=row['started_at'],
finished_at=row['finished_at'],
report=report,
error=row['error'],
queue_sequence=row['queue_sequence'],
schedule_id=row['schedule_id'],
)
def _schedule_from_row(self, row: Optional[Dict[str, Any]]) -> Optional[MigrationSchedule]:
if row is None:
return None
tables = row['tables_json']
if isinstance(tables, str):
tables = json.loads(tables)
return MigrationSchedule(
schedule_id=row['schedule_id'],
created_at=row['created_at'],
updated_at=row['updated_at'],
name=row['name'],
schedule_type=row['schedule_type'],
enabled=row['enabled'],
catch_up_missed_runs=row['catch_up_missed_runs'],
initial_force_full=row.get('initial_force_full', False),
tables=tables,
send_email=row['send_email'],
dry_run=row['dry_run'],
read_limit=row['read_limit'],
interval_seconds=row['interval_seconds'],
daily_time=row['daily_time'],
start_at=row['start_at'],
next_run_at=row['next_run_at'],
last_enqueued_at=row['last_enqueued_at'],
last_job_id=row['last_job_id'],
)
migration_queue = MigrationJobQueue()