Files
syncio/app/queue.py
2026-04-16 17:57:58 +09:00

693 lines
27 KiB
Python

import json
import threading
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, time as dt_time
from typing import Any, Dict, List, Optional
from .config import Config
@dataclass
class MigrationJob:
"""Сериализуемое представление задания миграции."""
job_id: str
created_at: datetime
run_at: datetime
tables: Optional[List[str]]
send_email: bool
dry_run: Optional[bool]
read_limit: Optional[int]
force_full: bool
status: str
started_at: Optional[datetime]
finished_at: Optional[datetime]
report: Optional[Dict[str, Any]]
error: Optional[str]
queue_sequence: int
schedule_id: Optional[str]
def to_dict(self) -> Dict[str, Any]:
return {
'job_id': self.job_id,
'created_at': self.created_at.isoformat(),
'run_at': self.run_at.isoformat(),
'tables': self.tables,
'send_email': self.send_email,
'dry_run': self.dry_run,
'read_limit': self.read_limit,
'force_full': self.force_full,
'status': self.status,
'started_at': self.started_at.isoformat() if self.started_at else None,
'finished_at': self.finished_at.isoformat() if self.finished_at else None,
'report': self.report,
'error': self.error,
'queue_sequence': self.queue_sequence,
'schedule_id': self.schedule_id,
}
@dataclass
class MigrationSchedule:
"""Сериализуемое представление расписания."""
schedule_id: str
created_at: datetime
updated_at: datetime
name: Optional[str]
schedule_type: str
enabled: bool
catch_up_missed_runs: bool
initial_force_full: bool
tables: Optional[List[str]]
send_email: bool
dry_run: Optional[bool]
read_limit: Optional[int]
interval_seconds: Optional[int]
daily_time: Optional[str]
start_at: Optional[datetime]
next_run_at: datetime
last_enqueued_at: Optional[datetime]
last_job_id: Optional[str]
def to_dict(self) -> Dict[str, Any]:
return {
'schedule_id': self.schedule_id,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat(),
'name': self.name,
'schedule_type': self.schedule_type,
'enabled': self.enabled,
'catch_up_missed_runs': self.catch_up_missed_runs,
'initial_force_full': self.initial_force_full,
'tables': self.tables,
'send_email': self.send_email,
'dry_run': self.dry_run,
'read_limit': self.read_limit,
'interval_seconds': self.interval_seconds,
'daily_time': self.daily_time,
'start_at': self.start_at.isoformat() if self.start_at else None,
'next_run_at': self.next_run_at.isoformat(),
'last_enqueued_at': self.last_enqueued_at.isoformat() if self.last_enqueued_at else None,
'last_job_id': self.last_job_id,
}
class MigrationJobQueue:
"""Persistent очередь и планировщик миграций поверх PostgreSQL."""
JOBS_TABLE = 'migration_jobs'
SCHEDULES_TABLE = 'migration_schedules'
def __init__(self):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.engine = None
self.schema_ready = False
self.worker_started = False
self.worker = None
def start(self):
"""Ленивая инициализация worker thread."""
with self.condition:
if self.worker_started:
return
self.worker_started = True
self.worker = threading.Thread(target=self._worker_loop, daemon=True)
self.worker.start()
self.condition.notify_all()
def enqueue(
self,
tables: Optional[List[str]] = None,
send_email: bool = True,
dry_run: Optional[bool] = None,
read_limit: Optional[int] = None,
force_full: bool = False,
run_at: Optional[datetime] = None,
delay_seconds: Optional[int] = None,
schedule_id: Optional[str] = None,
) -> Dict[str, Any]:
self.start()
self._ensure_schema()
scheduled_at = self._resolve_run_at(run_at=run_at, delay_seconds=delay_seconds)
job_id = str(uuid.uuid4())
created_at = datetime.now()
sql = self._text(f"""
INSERT INTO {self._qualified_table(self.JOBS_TABLE)}
(job_id, schedule_id, created_at, run_at, tables_json, send_email, dry_run, read_limit, force_full, status)
VALUES
(:job_id, :schedule_id, :created_at, :run_at, CAST(:tables_json AS jsonb), :send_email, :dry_run, :read_limit, :force_full, 'queued')
RETURNING *
""")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {
'job_id': job_id,
'schedule_id': schedule_id,
'created_at': created_at,
'run_at': scheduled_at,
'tables_json': json.dumps(tables) if tables is not None else None,
'send_email': send_email,
'dry_run': dry_run,
'read_limit': read_limit,
'force_full': force_full,
}).mappings().first()
conn.commit()
with self.condition:
self.condition.notify_all()
return self._job_from_row(row).to_dict()
def list_jobs(self, limit: int = 100) -> List[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"""
SELECT *
FROM {self._qualified_table(self.JOBS_TABLE)}
ORDER BY queue_sequence DESC
LIMIT :limit
""")
with self._get_engine().connect() as conn:
rows = conn.execute(sql, {'limit': limit}).mappings().all()
return [self._job_from_row(row).to_dict() for row in rows]
def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.JOBS_TABLE)} WHERE job_id = :job_id")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {'job_id': job_id}).mappings().first()
return self._job_from_row(row).to_dict() if row else None
def create_schedule(
self,
schedule_type: str,
tables: Optional[List[str]] = None,
send_email: bool = True,
dry_run: Optional[bool] = None,
read_limit: Optional[int] = None,
interval_seconds: Optional[int] = None,
daily_time: Optional[str] = None,
start_at: Optional[datetime] = None,
name: Optional[str] = None,
enabled: bool = True,
catch_up_missed_runs: bool = False,
initial_force_full: bool = False,
) -> Dict[str, Any]:
self.start()
self._ensure_schema()
schedule_id = str(uuid.uuid4())
created_at = datetime.now()
normalized_start_at = self._normalize_datetime(start_at)
next_run_at = self._resolve_next_schedule_run_at(
schedule_type=schedule_type,
interval_seconds=interval_seconds,
daily_time=daily_time,
start_at=normalized_start_at,
reference=created_at,
)
sql = self._text(f"""
INSERT INTO {self._qualified_table(self.SCHEDULES_TABLE)}
(
schedule_id, created_at, updated_at, name, schedule_type, enabled,
catch_up_missed_runs, initial_force_full, tables_json, send_email, dry_run, read_limit, interval_seconds,
daily_time, start_at, next_run_at
)
VALUES
(
:schedule_id, :created_at, :updated_at, :name, :schedule_type, :enabled,
:catch_up_missed_runs, :initial_force_full,
CAST(:tables_json AS jsonb), :send_email, :dry_run, :read_limit, :interval_seconds,
:daily_time, :start_at, :next_run_at
)
RETURNING *
""")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {
'schedule_id': schedule_id,
'created_at': created_at,
'updated_at': created_at,
'name': name,
'schedule_type': schedule_type,
'enabled': enabled,
'catch_up_missed_runs': catch_up_missed_runs,
'initial_force_full': initial_force_full,
'tables_json': json.dumps(tables) if tables is not None else None,
'send_email': send_email,
'dry_run': dry_run,
'read_limit': read_limit,
'interval_seconds': interval_seconds,
'daily_time': daily_time,
'start_at': normalized_start_at,
'next_run_at': next_run_at,
}).mappings().first()
conn.commit()
with self.condition:
self.condition.notify_all()
return self._schedule_from_row(row).to_dict()
def list_schedules(self) -> List[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} ORDER BY next_run_at, created_at")
with self._get_engine().connect() as conn:
rows = conn.execute(sql).mappings().all()
return [self._schedule_from_row(row).to_dict() for row in rows]
def get_schedule(self, schedule_id: str) -> Optional[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} WHERE schedule_id = :schedule_id")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {'schedule_id': schedule_id}).mappings().first()
return self._schedule_from_row(row).to_dict() if row else None
def get_status(self) -> Dict[str, Any]:
self.start()
self._ensure_schema()
with self._get_engine().connect() as conn:
running_job = conn.execute(self._text(f"""
SELECT *
FROM {self._qualified_table(self.JOBS_TABLE)}
WHERE status = 'running'
ORDER BY started_at DESC
LIMIT 1
""")).mappings().first()
queued_jobs = conn.execute(self._text(f"""
SELECT COUNT(*)
FROM {self._qualified_table(self.JOBS_TABLE)}
WHERE status = 'queued'
""")).scalar()
schedules = conn.execute(self._text(f"""
SELECT COUNT(*)
FROM {self._qualified_table(self.SCHEDULES_TABLE)}
WHERE enabled = TRUE
""")).scalar()
return {
'running': running_job is not None,
'running_job': self._job_from_row(running_job).to_dict() if running_job else None,
'queued_jobs': int(queued_jobs or 0),
'enabled_schedules': int(schedules or 0),
}
def _get_engine(self):
if self.engine is None:
from sqlalchemy import create_engine
self.engine = create_engine(Config.POSTGRES_CONNECTION_STRING)
return self.engine
def _text(self, sql: str):
from sqlalchemy import text
return text(sql)
def _quote_identifier(self, identifier: str) -> str:
return '"' + identifier.replace('"', '""') + '"'
def _qualified_table(self, table_name: str) -> str:
return f'{self._quote_identifier(Config.REPLICATOR_SCHEMA)}.{self._quote_identifier(table_name)}'
def _ensure_schema(self):
if self.schema_ready:
return
with self.lock:
if self.schema_ready:
return
with self._get_engine().connect() as conn:
conn.execute(self._text(
f'CREATE SCHEMA IF NOT EXISTS {self._quote_identifier(Config.REPLICATOR_SCHEMA)}'
))
conn.execute(self._text(f"""
CREATE TABLE IF NOT EXISTS {self._qualified_table(self.JOBS_TABLE)} (
job_id text PRIMARY KEY,
schedule_id text NULL,
created_at timestamp NOT NULL,
run_at timestamp NOT NULL,
tables_json jsonb NULL,
send_email boolean NOT NULL DEFAULT TRUE,
dry_run boolean NULL,
read_limit integer NULL,
force_full boolean NOT NULL DEFAULT FALSE,
status text NOT NULL,
started_at timestamp NULL,
finished_at timestamp NULL,
report_json jsonb NULL,
error text NULL,
queue_sequence bigint GENERATED ALWAYS AS IDENTITY
)
"""))
conn.execute(self._text(f"""
CREATE INDEX IF NOT EXISTS idx_{self.JOBS_TABLE}_status_run_at
ON {self._qualified_table(self.JOBS_TABLE)} (status, run_at, queue_sequence)
"""))
conn.execute(self._text(f"""
CREATE TABLE IF NOT EXISTS {self._qualified_table(self.SCHEDULES_TABLE)} (
schedule_id text PRIMARY KEY,
created_at timestamp NOT NULL,
updated_at timestamp NOT NULL,
name text NULL,
schedule_type text NOT NULL,
enabled boolean NOT NULL DEFAULT TRUE,
catch_up_missed_runs boolean NOT NULL DEFAULT FALSE,
initial_force_full boolean NOT NULL DEFAULT FALSE,
tables_json jsonb NULL,
send_email boolean NOT NULL DEFAULT TRUE,
dry_run boolean NULL,
read_limit integer NULL,
interval_seconds integer NULL,
daily_time text NULL,
start_at timestamp NULL,
next_run_at timestamp NOT NULL,
last_enqueued_at timestamp NULL,
last_job_id text NULL
)
"""))
conn.execute(self._text(f"""
ALTER TABLE {self._qualified_table(self.SCHEDULES_TABLE)}
ADD COLUMN IF NOT EXISTS catch_up_missed_runs boolean NOT NULL DEFAULT FALSE
"""))
conn.execute(self._text(f"""
ALTER TABLE {self._qualified_table(self.SCHEDULES_TABLE)}
ADD COLUMN IF NOT EXISTS initial_force_full boolean NOT NULL DEFAULT FALSE
"""))
conn.execute(self._text(f"""
ALTER TABLE {self._qualified_table(self.JOBS_TABLE)}
ADD COLUMN IF NOT EXISTS force_full boolean NOT NULL DEFAULT FALSE
"""))
conn.execute(self._text(f"""
CREATE INDEX IF NOT EXISTS idx_{self.SCHEDULES_TABLE}_enabled_next_run_at
ON {self._qualified_table(self.SCHEDULES_TABLE)} (enabled, next_run_at)
"""))
conn.commit()
self.schema_ready = True
def _resolve_run_at(
self,
run_at: Optional[datetime] = None,
delay_seconds: Optional[int] = None,
) -> datetime:
if run_at and delay_seconds is not None:
raise ValueError("Specify either run_at or delay_seconds")
if delay_seconds is not None:
return datetime.now() + timedelta(seconds=delay_seconds)
if run_at is None:
return datetime.now()
return self._normalize_datetime(run_at)
def _resolve_next_schedule_run_at(
self,
schedule_type: str,
interval_seconds: Optional[int],
daily_time: Optional[str],
start_at: Optional[datetime],
reference: datetime,
) -> datetime:
normalized_reference = self._normalize_datetime(reference)
normalized_start_at = self._normalize_datetime(start_at) if start_at else None
baseline = normalized_start_at or normalized_reference
if schedule_type == 'interval':
if not interval_seconds or interval_seconds <= 0:
raise ValueError("interval_seconds must be greater than 0 for interval schedule")
return baseline if baseline > normalized_reference else normalized_reference + timedelta(seconds=interval_seconds)
if schedule_type == 'daily':
if not daily_time:
raise ValueError("daily_time is required for daily schedule")
parsed_time = self._parse_daily_time(daily_time)
candidate_date = baseline.date()
candidate = datetime.combine(candidate_date, parsed_time)
if normalized_start_at and candidate < normalized_start_at:
candidate = datetime.combine(normalized_start_at.date(), parsed_time)
if candidate <= normalized_reference:
candidate = candidate + timedelta(days=1)
return candidate
raise ValueError("schedule_type must be one of: interval, daily")
def _next_schedule_run_from_row(self, row: Dict[str, Any], reference: datetime) -> datetime:
schedule_type = row['schedule_type']
if schedule_type == 'interval':
return reference + timedelta(seconds=int(row['interval_seconds']))
if schedule_type == 'daily':
parsed_time = self._parse_daily_time(row['daily_time'])
candidate = datetime.combine(reference.date(), parsed_time)
if candidate <= reference:
candidate += timedelta(days=1)
return candidate
raise ValueError(f"Unsupported schedule_type: {schedule_type}")
def _parse_daily_time(self, raw_value: str) -> dt_time:
parts = raw_value.split(':')
if len(parts) not in (2, 3):
raise ValueError("daily_time must be HH:MM or HH:MM:SS")
hour = int(parts[0])
minute = int(parts[1])
second = int(parts[2]) if len(parts) == 3 else 0
return dt_time(hour=hour, minute=minute, second=second)
def _normalize_datetime(self, value: Optional[datetime]) -> Optional[datetime]:
if value is None:
return None
if value.tzinfo is not None:
return value.astimezone().replace(tzinfo=None)
return value
def _worker_loop(self):
self._ensure_schema()
while True:
try:
self._materialize_due_schedules()
job_row = self._claim_next_due_job()
if job_row:
self._execute_job(self._job_from_row(job_row))
continue
except Exception:
time.sleep(1.0)
continue
with self.condition:
self.condition.wait(timeout=Config.QUEUE_POLL_SECONDS)
def _materialize_due_schedules(self):
now = datetime.now()
grace_cutoff = now - timedelta(seconds=Config.SCHEDULE_GRACE_SECONDS)
with self._get_engine().connect() as conn:
due_rows = conn.execute(self._text(f"""
SELECT *
FROM {self._qualified_table(self.SCHEDULES_TABLE)}
WHERE enabled = TRUE
AND next_run_at <= :now
ORDER BY next_run_at, created_at
FOR UPDATE
"""), {'now': now}).mappings().all()
for row in due_rows:
missed_run = row['next_run_at'] < grace_cutoff
next_run_at = self._next_schedule_run_from_row(row, now)
job_id = row['last_job_id']
force_full = bool(row.get('initial_force_full', False))
if not missed_run or row['catch_up_missed_runs']:
job_id = str(uuid.uuid4())
conn.execute(self._text(f"""
INSERT INTO {self._qualified_table(self.JOBS_TABLE)}
(job_id, schedule_id, created_at, run_at, tables_json, send_email, dry_run, read_limit, force_full, status)
VALUES
(
:job_id, :schedule_id, :created_at, :run_at, CAST(:tables_json AS jsonb),
:send_email, :dry_run, :read_limit, :force_full, 'queued'
)
"""), {
'job_id': job_id,
'schedule_id': row['schedule_id'],
'created_at': now,
'run_at': now,
'tables_json': json.dumps(row['tables_json']) if row['tables_json'] is not None else None,
'send_email': row['send_email'],
'dry_run': row['dry_run'],
'read_limit': row['read_limit'],
'force_full': force_full,
})
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.SCHEDULES_TABLE)}
SET updated_at = :updated_at,
last_enqueued_at = :last_enqueued_at,
last_job_id = :last_job_id,
initial_force_full = CASE
WHEN :reset_initial_force_full THEN FALSE
ELSE initial_force_full
END,
next_run_at = :next_run_at
WHERE schedule_id = :schedule_id
"""), {
'updated_at': now,
'last_enqueued_at': now if (not missed_run or row['catch_up_missed_runs']) else row['last_enqueued_at'],
'last_job_id': job_id,
'reset_initial_force_full': (not missed_run or row['catch_up_missed_runs']) and force_full,
'next_run_at': next_run_at,
'schedule_id': row['schedule_id'],
})
conn.commit()
if due_rows:
with self.condition:
self.condition.notify_all()
def _claim_next_due_job(self) -> Optional[Dict[str, Any]]:
now = datetime.now()
sql = self._text(f"""
WITH next_job AS (
SELECT job_id
FROM {self._qualified_table(self.JOBS_TABLE)}
WHERE status = 'queued'
AND run_at <= :now
ORDER BY run_at, queue_sequence
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE {self._qualified_table(self.JOBS_TABLE)} job
SET status = 'running',
started_at = :now,
finished_at = NULL,
error = NULL,
report_json = NULL
FROM next_job
WHERE job.job_id = next_job.job_id
RETURNING job.*
""")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {'now': now}).mappings().first()
conn.commit()
return row
def _execute_job(self, job: MigrationJob):
migrator = None
try:
from .migrator import DatabaseMigrator
config = Config()
migrator = DatabaseMigrator(config)
report = migrator.run_migration(
table_names=job.tables,
send_email=False,
dry_run=job.dry_run,
read_limit=job.read_limit,
force_full=job.force_full,
)
migrator.cleanup_old_logs(days_to_keep=7)
self._finish_job(job.job_id, status='completed', report=report, error=None)
if job.send_email:
migrator.send_notification(report)
except Exception as exc:
self._finish_job(job.job_id, status='failed', report=None, error=str(exc))
if job.send_email and migrator is not None:
try:
migrator.send_failure_notification(
error=str(exc),
table_names=job.tables,
job_id=job.job_id,
)
except Exception:
pass
def _finish_job(
self,
job_id: str,
status: str,
report: Optional[Dict[str, Any]],
error: Optional[str],
):
with self._get_engine().connect() as conn:
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.JOBS_TABLE)}
SET status = :status,
finished_at = :finished_at,
report_json = CAST(:report_json AS jsonb),
error = :error
WHERE job_id = :job_id
"""), {
'status': status,
'finished_at': datetime.now(),
'report_json': json.dumps(report) if report is not None else None,
'error': error,
'job_id': job_id,
})
conn.commit()
def _job_from_row(self, row: Optional[Dict[str, Any]]) -> Optional[MigrationJob]:
if row is None:
return None
tables = row['tables_json']
if isinstance(tables, str):
tables = json.loads(tables)
report = row['report_json']
if isinstance(report, str):
report = json.loads(report)
return MigrationJob(
job_id=row['job_id'],
created_at=row['created_at'],
run_at=row['run_at'],
tables=tables,
send_email=row['send_email'],
dry_run=row['dry_run'],
read_limit=row['read_limit'],
force_full=row.get('force_full', False),
status=row['status'],
started_at=row['started_at'],
finished_at=row['finished_at'],
report=report,
error=row['error'],
queue_sequence=row['queue_sequence'],
schedule_id=row['schedule_id'],
)
def _schedule_from_row(self, row: Optional[Dict[str, Any]]) -> Optional[MigrationSchedule]:
if row is None:
return None
tables = row['tables_json']
if isinstance(tables, str):
tables = json.loads(tables)
return MigrationSchedule(
schedule_id=row['schedule_id'],
created_at=row['created_at'],
updated_at=row['updated_at'],
name=row['name'],
schedule_type=row['schedule_type'],
enabled=row['enabled'],
catch_up_missed_runs=row['catch_up_missed_runs'],
initial_force_full=row.get('initial_force_full', False),
tables=tables,
send_email=row['send_email'],
dry_run=row['dry_run'],
read_limit=row['read_limit'],
interval_seconds=row['interval_seconds'],
daily_time=row['daily_time'],
start_at=row['start_at'],
next_run_at=row['next_run_at'],
last_enqueued_at=row['last_enqueued_at'],
last_job_id=row['last_job_id'],
)
migration_queue = MigrationJobQueue()