Убрал зависимость worker от api

This commit is contained in:
brusnitsyn
2026-06-18 11:14:16 +09:00
parent 6ef045ca88
commit 05da90aae1
3 changed files with 89 additions and 16 deletions

View File

@@ -176,6 +176,13 @@ def create_app():
job = migration_queue.enqueue(tables=[table_name], send_email=True)
return {"status": "queued", "job": job}
@api.post("/migrations/schedules/{schedule_id}/run")
def run_schedule_now(schedule_id: str):
job = migration_queue.run_schedule_now(schedule_id)
if not job:
raise HTTPException(status_code=404, detail="Schedule not found")
return {"status": "queued", "job": job}
@api.post("/migrations/schedules")
def create_schedule(request: ScheduleRequest):
try:

View File

@@ -435,7 +435,12 @@ class DatabaseMigrator:
"""))
conn.commit()
def _send_webhooks(self, report: dict) -> None:
def _send_webhooks(
self,
report: dict,
schedule_id: Optional[str] = None,
job_id: Optional[str] = None,
) -> None:
"""POST итогов миграции на все активные webhook-подписки."""
schema = self.quote_identifier(self.config.REPLICATOR_SCHEMA)
table = f"{schema}.webhook_subscriptions"
@@ -455,6 +460,8 @@ class DatabaseMigrator:
failed = report.get('failed_tables', [])
payload = json.dumps({
'status': 'success' if not failed else ('partial_success' if summary.get('successful_tables') else 'failed'),
'schedule_id': schedule_id,
'job_id': job_id,
'started_at': str(self.logger.start_time),
'finished_at': datetime.now().isoformat(),
'tables': {
@@ -1755,6 +1762,8 @@ class DatabaseMigrator:
dry_run: Optional[bool] = None,
read_limit: Optional[int] = None,
force_full: bool = False,
schedule_id: Optional[str] = None,
job_id: Optional[str] = None,
):
"""Запуск полной миграции"""
if dry_run is not None:
@@ -1810,7 +1819,7 @@ class DatabaseMigrator:
else:
self.logger.log_info("Email отключен для этого запуска")
self._send_webhooks(report)
self._send_webhooks(report, schedule_id=schedule_id, job_id=job_id)
return report

View File

@@ -128,7 +128,6 @@ class MigrationJobQueue:
delay_seconds: Optional[int] = None,
schedule_id: Optional[str] = None,
) -> Dict[str, Any]:
self.start()
self._ensure_schema()
scheduled_at = self._resolve_run_at(run_at=run_at, delay_seconds=delay_seconds)
job_id = str(uuid.uuid4())
@@ -161,7 +160,6 @@ class MigrationJobQueue:
return self._job_from_row(row).to_dict()
def list_jobs(self, limit: int = 100) -> List[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"""
SELECT *
@@ -174,7 +172,6 @@ class MigrationJobQueue:
return [self._job_from_row(row).to_dict() for row in rows]
def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.JOBS_TABLE)} WHERE job_id = :job_id")
with self._get_engine().connect() as conn:
@@ -196,7 +193,6 @@ class MigrationJobQueue:
catch_up_missed_runs: bool = False,
initial_force_full: bool = False,
) -> Dict[str, Any]:
self.start()
self._ensure_schema()
schedule_id = str(uuid.uuid4())
created_at = datetime.now()
@@ -252,7 +248,6 @@ class MigrationJobQueue:
return self._schedule_from_row(row).to_dict()
def list_schedules(self) -> List[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} ORDER BY next_run_at, created_at")
with self._get_engine().connect() as conn:
@@ -260,15 +255,65 @@ class MigrationJobQueue:
return [self._schedule_from_row(row).to_dict() for row in rows]
def get_schedule(self, schedule_id: str) -> Optional[Dict[str, Any]]:
self.start()
self._ensure_schema()
sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} WHERE schedule_id = :schedule_id")
with self._get_engine().connect() as conn:
row = conn.execute(sql, {'schedule_id': schedule_id}).mappings().first()
return self._schedule_from_row(row).to_dict() if row else None
def run_schedule_now(self, schedule_id: str) -> Optional[Dict[str, Any]]:
"""Немедленный запуск расписания: ставит задание в очередь с параметрами расписания."""
self._ensure_schema()
now = datetime.now()
with self._get_engine().connect() as conn:
row = conn.execute(self._text(
f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} WHERE schedule_id = :schedule_id"
), {'schedule_id': schedule_id}).mappings().first()
if row is None:
return None
job_id = str(uuid.uuid4())
tables_json = json.dumps(row['tables_json']) if row['tables_json'] is not None else None
force_full = bool(row.get('initial_force_full', False))
conn.execute(self._text(f"""
INSERT INTO {self._qualified_table(self.JOBS_TABLE)}
(job_id, schedule_id, created_at, run_at, tables_json, send_email, dry_run, read_limit, force_full, status)
VALUES
(
:job_id, :schedule_id, :created_at, :run_at, CAST(:tables_json AS jsonb),
:send_email, :dry_run, :read_limit, :force_full, 'queued'
)
"""), {
'job_id': job_id,
'schedule_id': schedule_id,
'created_at': now,
'run_at': now,
'tables_json': tables_json,
'send_email': row['send_email'],
'dry_run': row['dry_run'],
'read_limit': row['read_limit'],
'force_full': force_full,
})
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.SCHEDULES_TABLE)}
SET updated_at = :now,
last_enqueued_at = :now,
last_job_id = :job_id
WHERE schedule_id = :schedule_id
"""), {'now': now, 'job_id': job_id, 'schedule_id': schedule_id})
new_job = conn.execute(self._text(
f"SELECT * FROM {self._qualified_table(self.JOBS_TABLE)} WHERE job_id = :job_id"
), {'job_id': job_id}).mappings().first()
conn.commit()
with self.condition:
self.condition.notify_all()
return self._job_from_row(new_job).to_dict()
def get_status(self) -> Dict[str, Any]:
self.start()
self._ensure_schema()
with self._get_engine().connect() as conn:
running_job = conn.execute(self._text(f"""
@@ -387,6 +432,17 @@ class MigrationJobQueue:
CREATE INDEX IF NOT EXISTS idx_{self.SCHEDULES_TABLE}_enabled_next_run_at
ON {self._qualified_table(self.SCHEDULES_TABLE)} (enabled, next_run_at)
"""))
conn.commit()
self.schema_ready = True
def _reset_orphaned_running_jobs(self):
"""Сброс зависших в 'running' заданий при старте воркера.
Вызывается только из воркер-процесса (_worker_loop), чтобы API-процесс
не пометил выполняющуюся прямо сейчас миграцию как 'failed'.
"""
with self._get_engine().connect() as conn:
conn.execute(self._text(f"""
UPDATE {self._qualified_table(self.JOBS_TABLE)}
SET status = 'failed',
@@ -396,8 +452,6 @@ class MigrationJobQueue:
"""), {'now': datetime.now()})
conn.commit()
self.schema_ready = True
def _resolve_run_at(
self,
run_at: Optional[datetime] = None,
@@ -475,6 +529,7 @@ class MigrationJobQueue:
def _worker_loop(self):
self._ensure_schema()
self._reset_orphaned_running_jobs()
while True:
try:
@@ -605,6 +660,8 @@ class MigrationJobQueue:
dry_run=job.dry_run,
read_limit=job.read_limit,
force_full=job.force_full,
schedule_id=job.schedule_id,
job_id=job.job_id,
)
migrator.cleanup_old_logs(days_to_keep=7)
final_status = 'completed'