refactor: migrate webhook queue to RQ with updated configuration

This commit is contained in:
Abhimanyu Saharan
2026-02-15 12:30:15 +05:30
parent 1284820ec7
commit f9b14af477
14 changed files with 387 additions and 246 deletions

View File

@@ -5,16 +5,14 @@ Prefer importing from this package when used by other modules.
from app.services.webhooks.dispatch import run_flush_webhook_delivery_queue
from app.services.webhooks.queue import (
QueuedWebhookDelivery,
QueuedInboundDelivery,
dequeue_webhook_delivery,
enqueue_webhook_delivery,
requeue_if_failed,
)
from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule
__all__ = [
"QueuedWebhookDelivery",
"bootstrap_webhook_dispatch_schedule",
"QueuedInboundDelivery",
"dequeue_webhook_delivery",
"enqueue_webhook_delivery",
"requeue_if_failed",

View File

@@ -17,7 +17,7 @@ from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import (
QueuedWebhookDelivery,
QueuedInboundDelivery,
dequeue_webhook_delivery,
requeue_if_failed,
)
@@ -153,7 +153,7 @@ async def _load_webhook_payload(
return board, webhook, payload
async def _process_single_item(item: QueuedWebhookDelivery) -> None:
async def _process_single_item(item: QueuedInboundDelivery) -> None:
async with async_session_maker() as session:
loaded = await _load_webhook_payload(
session=session,
@@ -206,7 +206,7 @@ async def flush_webhook_delivery_queue() -> None:
},
)
requeue_if_failed(item)
time.sleep(settings.webhook_dispatch_throttle_seconds)
time.sleep(settings.rq_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
@@ -214,7 +214,7 @@ def run_flush_webhook_delivery_queue() -> None:
"""RQ entrypoint for running the async queue flush from worker jobs."""
logger.info(
"webhook.dispatch.batch_started",
extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds},
extra={"throttle_seconds": settings.rq_dispatch_throttle_seconds},
)
start = time.time()
asyncio.run(flush_webhook_delivery_queue())

View File

@@ -2,56 +2,73 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
from typing import cast
import redis
from app.core.config import settings
from app.core.logging import get_logger
from app.services.queue import QueuedTask, dequeue_task, enqueue_task, requeue_if_failed as generic_requeue_if_failed
logger = get_logger(__name__)
TASK_TYPE = "webhook_delivery"
@dataclass(frozen=True)
class QueuedWebhookDelivery:
class QueuedInboundDelivery:
"""Payload metadata stored for deferred webhook lead dispatch."""
board_id: UUID
webhook_id: UUID
payload_id: UUID
payload_event: str | None
received_at: datetime
attempts: int = 0
def to_json(self) -> str:
return json.dumps(
{
"board_id": str(self.board_id),
"webhook_id": str(self.webhook_id),
"payload_id": str(self.payload_id),
"payload_event": self.payload_event,
"received_at": self.received_at.isoformat(),
"attempts": self.attempts,
},
sort_keys=True,
def _task_from_payload(payload: QueuedInboundDelivery) -> QueuedTask:
return QueuedTask(
task_type=TASK_TYPE,
payload={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"received_at": payload.received_at.isoformat(),
},
created_at=payload.received_at,
attempts=payload.attempts,
)
def _payload_from_task(task: QueuedTask) -> QueuedInboundDelivery:
if task.task_type not in {TASK_TYPE, "legacy"}:
raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}")
payload: dict[str, Any] = task.payload
if task.task_type == "legacy":
received_at = payload.get("received_at") or payload.get("created_at")
return QueuedInboundDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
received_at=datetime.fromisoformat(received_at) if isinstance(received_at, str) else datetime.now(UTC),
attempts=int(payload.get("attempts", task.attempts)),
)
def _redis_client() -> redis.Redis:
return redis.Redis.from_url(settings.webhook_redis_url)
return QueuedInboundDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
received_at=datetime.fromisoformat(payload["received_at"]),
attempts=int(payload.get("attempts", task.attempts)),
)
def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
def enqueue_webhook_delivery(payload: QueuedInboundDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
client = _redis_client()
client.lpush(settings.webhook_queue_name, payload.to_json())
queued = _task_from_payload(payload)
enqueue_task(queued, settings.rq_queue_name, redis_url=settings.rq_redis_url)
logger.info(
"webhook.queue.enqueued",
extra={
@@ -75,62 +92,44 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
return False
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
def dequeue_webhook_delivery() -> QueuedInboundDelivery | None:
"""Pop one queued webhook delivery payload."""
client = _redis_client()
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
if raw is None:
return None
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
try:
payload: dict[str, Any] = json.loads(raw)
event = payload.get("payload_event")
if event is not None:
event = str(event)
return QueuedWebhookDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
payload_event=event,
received_at=datetime.fromisoformat(payload["received_at"]),
attempts=int(payload.get("attempts", 0)),
)
task = dequeue_task(settings.rq_queue_name, redis_url=settings.rq_redis_url)
if task is None:
return None
return _payload_from_task(task)
except Exception as exc:
logger.error(
"webhook.queue.dequeue_failed",
extra={"raw_payload": str(raw), "error": str(exc)},
extra={
"queue_name": settings.rq_queue_name,
"error": str(exc),
},
)
raise
def _requeue_with_attempt(payload: QueuedWebhookDelivery) -> None:
payload = QueuedWebhookDelivery(
board_id=payload.board_id,
webhook_id=payload.webhook_id,
payload_id=payload.payload_id,
payload_event=payload.payload_event,
received_at=payload.received_at,
attempts=payload.attempts + 1,
)
enqueue_webhook_delivery(payload)
def requeue_if_failed(payload: QueuedWebhookDelivery) -> bool:
def requeue_if_failed(payload: QueuedInboundDelivery) -> bool:
"""Requeue payload delivery with capped retries.
Returns True if requeued.
"""
if payload.attempts >= settings.webhook_dispatch_max_retries:
try:
return generic_requeue_if_failed(
_task_from_payload(payload),
settings.rq_queue_name,
max_retries=settings.rq_dispatch_max_retries,
redis_url=settings.rq_redis_url,
)
except Exception as exc:
logger.warning(
"webhook.queue.drop_failed_delivery",
"webhook.queue.requeue_failed",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempts": payload.attempts,
"error": str(exc),
},
)
return False
_requeue_with_attempt(payload)
return True
raise

View File

@@ -1,83 +0,0 @@
"""Webhook dispatch scheduler bootstrap for rq-scheduler.
This module is typically run once at container start to ensure the recurring
job exists (idempotent registration).
"""
from __future__ import annotations
import time
from datetime import datetime, timedelta, timezone
from redis import Redis
from rq_scheduler import Scheduler # type: ignore[import-untyped]
from app.core.config import settings
from app.core.logging import get_logger
from app.services.webhooks.dispatch import run_flush_webhook_delivery_queue
logger = get_logger(__name__)
def bootstrap_webhook_dispatch_schedule(
interval_seconds: int | None = None,
*,
max_attempts: int = 5,
retry_sleep_seconds: float = 1.0,
) -> None:
"""Register a recurring queue-flush job and keep it idempotent.
Retries Redis connectivity to avoid crashing on transient startup ordering.
"""
effective_interval_seconds = (
settings.webhook_dispatch_schedule_interval_seconds
if interval_seconds is None
else interval_seconds
)
last_exc: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
connection = Redis.from_url(settings.webhook_redis_url)
connection.ping()
scheduler = Scheduler(
queue_name=settings.webhook_queue_name,
connection=connection,
)
for job in scheduler.get_jobs():
if job.id == settings.webhook_dispatch_schedule_id:
scheduler.cancel(job)
scheduler.schedule(
datetime.now(tz=timezone.utc) + timedelta(seconds=5),
func=run_flush_webhook_delivery_queue,
interval=effective_interval_seconds,
repeat=None,
id=settings.webhook_dispatch_schedule_id,
queue_name=settings.webhook_queue_name,
)
logger.info(
"webhook.scheduler.bootstrapped",
extra={
"schedule_id": settings.webhook_dispatch_schedule_id,
"queue_name": settings.webhook_queue_name,
"interval_seconds": effective_interval_seconds,
},
)
return
except Exception as exc:
last_exc = exc
logger.warning(
"webhook.scheduler.bootstrap_failed",
extra={
"attempt": attempt,
"max_attempts": max_attempts,
"error": str(exc),
},
)
if attempt < max_attempts:
time.sleep(retry_sleep_seconds * attempt)
raise RuntimeError("Failed to bootstrap webhook dispatch schedule") from last_exc