diff --git a/app/api.py b/app/api.py index 524653c..eba6e3d 100644 --- a/app/api.py +++ b/app/api.py @@ -10,6 +10,8 @@ except ImportError: HTTPException = None BaseModel = object +from sqlalchemy import text + from .config import Config from .queue import migration_queue from .table_config_repository import TableConfigRepository @@ -45,6 +47,16 @@ class ScheduleRequest(BaseModel): initial_force_full: bool = False +class WebhookCreate(BaseModel): + url: str + secret: Optional[str] = None + label: Optional[str] = None + + +class WebhookToggle(BaseModel): + active: bool + + def create_app(): """Создание FastAPI приложения.""" if FastAPI is None: @@ -57,12 +69,29 @@ def create_app(): from sqlalchemy import create_engine config = Config() - _shared['engine'] = create_engine( + engine = create_engine( config.POSTGRES_CONNECTION_STRING, pool_pre_ping=True, pool_recycle=1800, ) + _shared['engine'] = engine _shared['config'] = config + + schema = f'"{config.REPLICATOR_SCHEMA}"' + with engine.connect() as conn: + conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS {schema}')) + conn.execute(text(f""" + CREATE TABLE IF NOT EXISTS {schema}.webhook_subscriptions ( + id SERIAL PRIMARY KEY, + url TEXT NOT NULL, + secret TEXT, + label TEXT, + active BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMP NOT NULL DEFAULT now() + ) + """)) + conn.commit() + if Config.START_API_WORKER: migration_queue.start() @@ -169,6 +198,62 @@ def create_app(): return {"status": "scheduled", "schedule": schedule} + # ----------------------------------------------------------------------- + # Webhooks + # ----------------------------------------------------------------------- + + _WEBHOOKS_TABLE = f'"{Config.REPLICATOR_SCHEMA}".webhook_subscriptions' + + @api.get("/webhooks") + def list_webhooks(): + with _shared['engine'].connect() as conn: + rows = conn.execute( + text(f"SELECT id, url, secret, label, active, created_at FROM {_WEBHOOKS_TABLE} ORDER BY id") + ).mappings().all() + return [dict(r) for r in rows] + + @api.post("/webhooks", status_code=201) + def create_webhook(body: WebhookCreate): + with _shared['engine'].connect() as conn: + row = conn.execute( + text(f""" + INSERT INTO {_WEBHOOKS_TABLE} (url, secret, label) + VALUES (:url, :secret, :label) + RETURNING id, url, secret, label, active, created_at + """), + {'url': body.url, 'secret': body.secret, 'label': body.label}, + ).mappings().first() + conn.commit() + return dict(row) + + @api.patch("/webhooks/{webhook_id}") + def toggle_webhook(webhook_id: int, body: WebhookToggle): + with _shared['engine'].connect() as conn: + row = conn.execute( + text(f""" + UPDATE {_WEBHOOKS_TABLE} + SET active = :active + WHERE id = :id + RETURNING id, url, secret, label, active, created_at + """), + {'active': body.active, 'id': webhook_id}, + ).mappings().first() + conn.commit() + if not row: + raise HTTPException(status_code=404, detail="Webhook not found") + return dict(row) + + @api.delete("/webhooks/{webhook_id}", status_code=204) + def delete_webhook(webhook_id: int): + with _shared['engine'].connect() as conn: + result = conn.execute( + text(f"DELETE FROM {_WEBHOOKS_TABLE} WHERE id = :id"), + {'id': webhook_id}, + ) + conn.commit() + if result.rowcount == 0: + raise HTTPException(status_code=404, detail="Webhook not found") + return api diff --git a/app/migrator.py b/app/migrator.py index a3ac82b..400dfec 100644 --- a/app/migrator.py +++ b/app/migrator.py @@ -4,6 +4,8 @@ import threading import time import csv import io +import json +import urllib.request from datetime import datetime from typing import Any, Dict, List, Optional, Tuple @@ -415,6 +417,70 @@ class DatabaseMigrator: self._state_table_ready = True + def ensure_webhooks_table(self): + """Создание таблицы подписок webhook если не существует.""" + schema = self.quote_identifier(self.config.REPLICATOR_SCHEMA) + table = f"{schema}.webhook_subscriptions" + with self.dst_engine.connect() as conn: + conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS {schema}')) + conn.execute(text(f""" + CREATE TABLE IF NOT EXISTS {table} ( + id SERIAL PRIMARY KEY, + url TEXT NOT NULL, + secret TEXT, + label TEXT, + active BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMP NOT NULL DEFAULT now() + ) + """)) + conn.commit() + + def _send_webhooks(self, report: dict) -> None: + """POST итогов миграции на все активные webhook-подписки.""" + schema = self.quote_identifier(self.config.REPLICATOR_SCHEMA) + table = f"{schema}.webhook_subscriptions" + try: + with self.dst_engine.connect() as conn: + rows = conn.execute( + text(f"SELECT url, secret FROM {table} WHERE active = true") + ).mappings().all() + except Exception as e: + self.logger.log_warning(f"Не удалось прочитать webhook-подписки: {e}") + return + + if not rows: + return + + summary = report.get('summary', {}) + failed = report.get('failed_tables', []) + payload = json.dumps({ + 'status': 'success' if not failed else ('partial_success' if summary.get('successful_tables') else 'failed'), + 'started_at': str(self.logger.start_time), + 'finished_at': datetime.now().isoformat(), + 'tables': { + 'total': summary.get('total_tables', 0), + 'success': summary.get('successful_tables', 0), + 'failed': summary.get('failed_tables', 0), + }, + 'errors': [{'table': t.get('name'), 'error': t.get('error')} for t in failed], + }).encode() + + timeout = int(os.getenv('WEBHOOK_TIMEOUT', '10')) + for row in rows: + url, secret = row['url'], row['secret'] or '' + for attempt in range(3): + try: + req = urllib.request.Request(url, data=payload, headers={ + 'Content-Type': 'application/json', + 'X-Syncio-Secret': secret, + }) + urllib.request.urlopen(req, timeout=timeout) + self.logger.log_info(f"Webhook отправлен: {url}") + break + except Exception as e: + if attempt == 2: + self.logger.log_warning(f"Webhook {url} не отвечает (попытка 3/3): {e}") + def get_last_watermark(self, table_name: str) -> Dict[str, Any]: """Чтение последнего успешно обработанного x_DateTime.""" empty_watermark = {'last_x_datetime': None, 'last_sequence_value': None} @@ -1705,6 +1771,8 @@ class DatabaseMigrator: ] self.logger.stats['total_tables'] = len(table_configs) + self.ensure_webhooks_table() + self.logger.log_info("="*60) self.logger.log_info("НАЧАЛО МИГРАЦИИ ДАННЫХ") self.logger.log_info(f"Время начала: {self.logger.start_time}") @@ -1741,7 +1809,9 @@ class DatabaseMigrator: self.send_notification(report) else: self.logger.log_info("Email отключен для этого запуска") - + + self._send_webhooks(report) + return report def _get_pg_row_count(self, table_name: str) -> int: