# app/services/task_tracker.py import redis.asyncio as redis from typing import Optional import json class TaskTracker: def __init__(self, redis_url: str = "redis://127.0.0.1:6379/0"): self.redis_url = redis_url async def _get_redis(self): return await redis.from_url(self.redis_url) async def init_batch(self, batch_id: str, total_tasks: int): """Инициализировать пакет задач""" r = await self._get_redis() await r.set(f"replicator_batch:{batch_id}:total", total_tasks) await r.set(f"replicator_batch:{batch_id}:completed", 0) await r.set(f"replicator_batch:{batch_id}:results", json.dumps([])) await r.expire(f"replicator_batch:{batch_id}:total", 86400) await r.expire(f"replicator_batch:{batch_id}:completed", 86400) await r.expire(f"replicator_batch:{batch_id}:results", 86400) await r.close() async def mark_completed(self, batch_id: str, result: dict) -> int: """Отметить задачу как завершённую и вернуть количество выполненных""" r = await self._get_redis() completed = await r.incr(f"replicator_batch:{batch_id}:completed") # Сохраняем результат задачи results = json.loads(await r.get(f"replicator_batch:{batch_id}:results") or "[]") results.append(result) await r.set(f"replicator_batch:{batch_id}:results", json.dumps(results)) await r.close() return completed async def get_batch_status(self, batch_id: str) -> dict: """Получить статус пакета""" r = await self._get_redis() total = await r.get(f"replicator_batch:{batch_id}:total") completed = await r.get(f"replicator_batch:{batch_id}:completed") results = await r.get(f"replicator_batch:{batch_id}:results") await r.close() return { "total": int(total) if total else 0, "completed": int(completed) if completed else 0, "results": json.loads(results) if results else [] } async def is_batch_complete(self, batch_id: str) -> bool: """Проверить, завершены ли все задачи""" status = await self.get_batch_status(batch_id) return status["completed"] >= status["total"] task_tracker = TaskTracker()