From 05da90aae1457ea49b5e4745e4446d6fcdedd0bb Mon Sep 17 00:00:00 2001 From: brusnitsyn Date: Thu, 18 Jun 2026 11:14:16 +0900 Subject: [PATCH] =?UTF-8?q?=D0=A3=D0=B1=D1=80=D0=B0=D0=BB=20=D0=B7=D0=B0?= =?UTF-8?q?=D0=B2=D0=B8=D1=81=D0=B8=D0=BC=D0=BE=D1=81=D1=82=D1=8C=20worker?= =?UTF-8?q?=20=D0=BE=D1=82=20api?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api.py | 7 ++++ app/migrator.py | 13 ++++++-- app/queue.py | 85 +++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 89 insertions(+), 16 deletions(-) diff --git a/app/api.py b/app/api.py index eba6e3d..8f50976 100644 --- a/app/api.py +++ b/app/api.py @@ -176,6 +176,13 @@ def create_app(): job = migration_queue.enqueue(tables=[table_name], send_email=True) return {"status": "queued", "job": job} + @api.post("/migrations/schedules/{schedule_id}/run") + def run_schedule_now(schedule_id: str): + job = migration_queue.run_schedule_now(schedule_id) + if not job: + raise HTTPException(status_code=404, detail="Schedule not found") + return {"status": "queued", "job": job} + @api.post("/migrations/schedules") def create_schedule(request: ScheduleRequest): try: diff --git a/app/migrator.py b/app/migrator.py index 400dfec..0b49dc8 100644 --- a/app/migrator.py +++ b/app/migrator.py @@ -435,7 +435,12 @@ class DatabaseMigrator: """)) conn.commit() - def _send_webhooks(self, report: dict) -> None: + def _send_webhooks( + self, + report: dict, + schedule_id: Optional[str] = None, + job_id: Optional[str] = None, + ) -> None: """POST итогов миграции на все активные webhook-подписки.""" schema = self.quote_identifier(self.config.REPLICATOR_SCHEMA) table = f"{schema}.webhook_subscriptions" @@ -455,6 +460,8 @@ class DatabaseMigrator: failed = report.get('failed_tables', []) payload = json.dumps({ 'status': 'success' if not failed else ('partial_success' if summary.get('successful_tables') else 'failed'), + 'schedule_id': schedule_id, + 'job_id': job_id, 'started_at': str(self.logger.start_time), 'finished_at': datetime.now().isoformat(), 'tables': { @@ -1755,6 +1762,8 @@ class DatabaseMigrator: dry_run: Optional[bool] = None, read_limit: Optional[int] = None, force_full: bool = False, + schedule_id: Optional[str] = None, + job_id: Optional[str] = None, ): """Запуск полной миграции""" if dry_run is not None: @@ -1810,7 +1819,7 @@ class DatabaseMigrator: else: self.logger.log_info("Email отключен для этого запуска") - self._send_webhooks(report) + self._send_webhooks(report, schedule_id=schedule_id, job_id=job_id) return report diff --git a/app/queue.py b/app/queue.py index 3fa67b4..506ee2f 100644 --- a/app/queue.py +++ b/app/queue.py @@ -128,7 +128,6 @@ class MigrationJobQueue: delay_seconds: Optional[int] = None, schedule_id: Optional[str] = None, ) -> Dict[str, Any]: - self.start() self._ensure_schema() scheduled_at = self._resolve_run_at(run_at=run_at, delay_seconds=delay_seconds) job_id = str(uuid.uuid4()) @@ -161,7 +160,6 @@ class MigrationJobQueue: return self._job_from_row(row).to_dict() def list_jobs(self, limit: int = 100) -> List[Dict[str, Any]]: - self.start() self._ensure_schema() sql = self._text(f""" SELECT * @@ -174,7 +172,6 @@ class MigrationJobQueue: return [self._job_from_row(row).to_dict() for row in rows] def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: - self.start() self._ensure_schema() sql = self._text(f"SELECT * FROM {self._qualified_table(self.JOBS_TABLE)} WHERE job_id = :job_id") with self._get_engine().connect() as conn: @@ -196,7 +193,6 @@ class MigrationJobQueue: catch_up_missed_runs: bool = False, initial_force_full: bool = False, ) -> Dict[str, Any]: - self.start() self._ensure_schema() schedule_id = str(uuid.uuid4()) created_at = datetime.now() @@ -252,7 +248,6 @@ class MigrationJobQueue: return self._schedule_from_row(row).to_dict() def list_schedules(self) -> List[Dict[str, Any]]: - self.start() self._ensure_schema() sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} ORDER BY next_run_at, created_at") with self._get_engine().connect() as conn: @@ -260,15 +255,65 @@ class MigrationJobQueue: return [self._schedule_from_row(row).to_dict() for row in rows] def get_schedule(self, schedule_id: str) -> Optional[Dict[str, Any]]: - self.start() self._ensure_schema() sql = self._text(f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} WHERE schedule_id = :schedule_id") with self._get_engine().connect() as conn: row = conn.execute(sql, {'schedule_id': schedule_id}).mappings().first() return self._schedule_from_row(row).to_dict() if row else None + def run_schedule_now(self, schedule_id: str) -> Optional[Dict[str, Any]]: + """Немедленный запуск расписания: ставит задание в очередь с параметрами расписания.""" + self._ensure_schema() + now = datetime.now() + + with self._get_engine().connect() as conn: + row = conn.execute(self._text( + f"SELECT * FROM {self._qualified_table(self.SCHEDULES_TABLE)} WHERE schedule_id = :schedule_id" + ), {'schedule_id': schedule_id}).mappings().first() + if row is None: + return None + + job_id = str(uuid.uuid4()) + tables_json = json.dumps(row['tables_json']) if row['tables_json'] is not None else None + force_full = bool(row.get('initial_force_full', False)) + + conn.execute(self._text(f""" + INSERT INTO {self._qualified_table(self.JOBS_TABLE)} + (job_id, schedule_id, created_at, run_at, tables_json, send_email, dry_run, read_limit, force_full, status) + VALUES + ( + :job_id, :schedule_id, :created_at, :run_at, CAST(:tables_json AS jsonb), + :send_email, :dry_run, :read_limit, :force_full, 'queued' + ) + """), { + 'job_id': job_id, + 'schedule_id': schedule_id, + 'created_at': now, + 'run_at': now, + 'tables_json': tables_json, + 'send_email': row['send_email'], + 'dry_run': row['dry_run'], + 'read_limit': row['read_limit'], + 'force_full': force_full, + }) + conn.execute(self._text(f""" + UPDATE {self._qualified_table(self.SCHEDULES_TABLE)} + SET updated_at = :now, + last_enqueued_at = :now, + last_job_id = :job_id + WHERE schedule_id = :schedule_id + """), {'now': now, 'job_id': job_id, 'schedule_id': schedule_id}) + new_job = conn.execute(self._text( + f"SELECT * FROM {self._qualified_table(self.JOBS_TABLE)} WHERE job_id = :job_id" + ), {'job_id': job_id}).mappings().first() + conn.commit() + + with self.condition: + self.condition.notify_all() + + return self._job_from_row(new_job).to_dict() + def get_status(self) -> Dict[str, Any]: - self.start() self._ensure_schema() with self._get_engine().connect() as conn: running_job = conn.execute(self._text(f""" @@ -387,17 +432,26 @@ class MigrationJobQueue: CREATE INDEX IF NOT EXISTS idx_{self.SCHEDULES_TABLE}_enabled_next_run_at ON {self._qualified_table(self.SCHEDULES_TABLE)} (enabled, next_run_at) """)) - conn.execute(self._text(f""" - UPDATE {self._qualified_table(self.JOBS_TABLE)} - SET status = 'failed', - finished_at = :now, - error = 'Прервано перезапуском воркера' - WHERE status = 'running' - """), {'now': datetime.now()}) conn.commit() self.schema_ready = True + def _reset_orphaned_running_jobs(self): + """Сброс зависших в 'running' заданий при старте воркера. + + Вызывается только из воркер-процесса (_worker_loop), чтобы API-процесс + не пометил выполняющуюся прямо сейчас миграцию как 'failed'. + """ + with self._get_engine().connect() as conn: + conn.execute(self._text(f""" + UPDATE {self._qualified_table(self.JOBS_TABLE)} + SET status = 'failed', + finished_at = :now, + error = 'Прервано перезапуском воркера' + WHERE status = 'running' + """), {'now': datetime.now()}) + conn.commit() + def _resolve_run_at( self, run_at: Optional[datetime] = None, @@ -475,6 +529,7 @@ class MigrationJobQueue: def _worker_loop(self): self._ensure_schema() + self._reset_orphaned_running_jobs() while True: try: @@ -605,6 +660,8 @@ class MigrationJobQueue: dry_run=job.dry_run, read_limit=job.read_limit, force_full=job.force_full, + schedule_id=job.schedule_id, + job_id=job.job_id, ) migrator.cleanup_old_logs(days_to_keep=7) final_status = 'completed'