56 lines
2.4 KiB
Python
56 lines
2.4 KiB
Python
# app/services/task_tracker.py
|
|
import redis.asyncio as redis
|
|
from typing import Optional
|
|
import json
|
|
|
|
class TaskTracker:
|
|
def __init__(self, redis_url: str = "redis://127.0.0.1:6379/0"):
|
|
self.redis_url = redis_url
|
|
|
|
async def _get_redis(self):
|
|
return await redis.from_url(self.redis_url)
|
|
|
|
async def init_batch(self, batch_id: str, total_tasks: int):
|
|
"""Инициализировать пакет задач"""
|
|
r = await self._get_redis()
|
|
await r.set(f"replicator_batch:{batch_id}:total", total_tasks)
|
|
await r.set(f"replicator_batch:{batch_id}:completed", 0)
|
|
await r.set(f"replicator_batch:{batch_id}:results", json.dumps([]))
|
|
await r.expire(f"replicator_batch:{batch_id}:total", 86400)
|
|
await r.expire(f"replicator_batch:{batch_id}:completed", 86400)
|
|
await r.expire(f"replicator_batch:{batch_id}:results", 86400)
|
|
await r.close()
|
|
|
|
async def mark_completed(self, batch_id: str, result: dict) -> int:
|
|
"""Отметить задачу как завершённую и вернуть количество выполненных"""
|
|
r = await self._get_redis()
|
|
completed = await r.incr(f"replicator_batch:{batch_id}:completed")
|
|
|
|
# Сохраняем результат задачи
|
|
results = json.loads(await r.get(f"replicator_batch:{batch_id}:results") or "[]")
|
|
results.append(result)
|
|
await r.set(f"replicator_batch:{batch_id}:results", json.dumps(results))
|
|
|
|
await r.close()
|
|
return completed
|
|
|
|
async def get_batch_status(self, batch_id: str) -> dict:
|
|
"""Получить статус пакета"""
|
|
r = await self._get_redis()
|
|
total = await r.get(f"replicator_batch:{batch_id}:total")
|
|
completed = await r.get(f"replicator_batch:{batch_id}:completed")
|
|
results = await r.get(f"replicator_batch:{batch_id}:results")
|
|
await r.close()
|
|
|
|
return {
|
|
"total": int(total) if total else 0,
|
|
"completed": int(completed) if completed else 0,
|
|
"results": json.loads(results) if results else []
|
|
}
|
|
|
|
async def is_batch_complete(self, batch_id: str) -> bool:
|
|
"""Проверить, завершены ли все задачи"""
|
|
status = await self.get_batch_status(batch_id)
|
|
return status["completed"] >= status["total"]
|
|
|
|
task_tracker = TaskTracker() |