feat: add board webhook configuration and payload models

This commit is contained in:
Abhimanyu Saharan
2026-02-13 00:31:32 +05:30
parent afc8de3c24
commit 2e4739300c
31 changed files with 3801 additions and 158 deletions

View File

@@ -0,0 +1,451 @@
"""Board webhook configuration and inbound payload ingestion endpoints."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlmodel import col, select
from app.api.deps import get_board_for_user_read, get_board_for_user_write, get_board_or_404
from app.core.config import settings
from app.core.time import utcnow
from app.db import crud
from app.db.pagination import paginate
from app.db.session import get_session
from app.models.agents import Agent
from app.models.board_memory import BoardMemory
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.schemas.board_webhooks import (
BoardWebhookCreate,
BoardWebhookIngestResponse,
BoardWebhookPayloadRead,
BoardWebhookRead,
BoardWebhookUpdate,
)
from app.schemas.common import OkResponse
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
if TYPE_CHECKING:
from collections.abc import Sequence
from fastapi_pagination.limit_offset import LimitOffsetPage
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.boards import Board
router = APIRouter(prefix="/boards/{board_id}/webhooks", tags=["board-webhooks"])
SESSION_DEP = Depends(get_session)
BOARD_USER_READ_DEP = Depends(get_board_for_user_read)
BOARD_USER_WRITE_DEP = Depends(get_board_for_user_write)
BOARD_OR_404_DEP = Depends(get_board_or_404)
PAYLOAD_PREVIEW_MAX_CHARS = 1600
def _webhook_endpoint_path(board_id: UUID, webhook_id: UUID) -> str:
return f"/api/v1/boards/{board_id}/webhooks/{webhook_id}"
def _webhook_endpoint_url(endpoint_path: str) -> str | None:
base_url = settings.base_url.rstrip("/")
if not base_url:
return None
return f"{base_url}{endpoint_path}"
def _to_webhook_read(webhook: BoardWebhook) -> BoardWebhookRead:
endpoint_path = _webhook_endpoint_path(webhook.board_id, webhook.id)
return BoardWebhookRead(
id=webhook.id,
board_id=webhook.board_id,
description=webhook.description,
enabled=webhook.enabled,
endpoint_path=endpoint_path,
endpoint_url=_webhook_endpoint_url(endpoint_path),
created_at=webhook.created_at,
updated_at=webhook.updated_at,
)
def _to_payload_read(payload: BoardWebhookPayload) -> BoardWebhookPayloadRead:
return BoardWebhookPayloadRead.model_validate(payload, from_attributes=True)
def _coerce_webhook_items(items: Sequence[object]) -> list[BoardWebhook]:
values: list[BoardWebhook] = []
for item in items:
if not isinstance(item, BoardWebhook):
msg = "Expected BoardWebhook items from paginated query"
raise TypeError(msg)
values.append(item)
return values
def _coerce_payload_items(items: Sequence[object]) -> list[BoardWebhookPayload]:
values: list[BoardWebhookPayload] = []
for item in items:
if not isinstance(item, BoardWebhookPayload):
msg = "Expected BoardWebhookPayload items from paginated query"
raise TypeError(msg)
values.append(item)
return values
async def _require_board_webhook(
session: AsyncSession,
*,
board_id: UUID,
webhook_id: UUID,
) -> BoardWebhook:
webhook = (
await session.exec(
select(BoardWebhook)
.where(col(BoardWebhook.id) == webhook_id)
.where(col(BoardWebhook.board_id) == board_id),
)
).first()
if webhook is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return webhook
async def _require_board_webhook_payload(
session: AsyncSession,
*,
board_id: UUID,
webhook_id: UUID,
payload_id: UUID,
) -> BoardWebhookPayload:
payload = (
await session.exec(
select(BoardWebhookPayload)
.where(col(BoardWebhookPayload.id) == payload_id)
.where(col(BoardWebhookPayload.board_id) == board_id)
.where(col(BoardWebhookPayload.webhook_id) == webhook_id),
)
).first()
if payload is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return payload
def _decode_payload(
raw_body: bytes,
*,
content_type: str | None,
) -> dict[str, object] | list[object] | str | int | float | bool | None:
if not raw_body:
return {}
body_text = raw_body.decode("utf-8", errors="replace")
normalized_content_type = (content_type or "").lower()
should_parse_json = "application/json" in normalized_content_type
if not should_parse_json:
should_parse_json = body_text.startswith(("{", "[", '"')) or body_text in {"true", "false"}
if should_parse_json:
try:
parsed = json.loads(body_text)
except json.JSONDecodeError:
return body_text
if isinstance(parsed, (dict, list, str, int, float, bool)) or parsed is None:
return parsed
return body_text
def _captured_headers(request: Request) -> dict[str, str] | None:
captured: dict[str, str] = {}
for header, value in request.headers.items():
normalized = header.lower()
if normalized in {"content-type", "user-agent"} or normalized.startswith("x-"):
captured[normalized] = value
return captured or None
def _payload_preview(
value: dict[str, object] | list[object] | str | int | float | bool | None,
) -> str:
if isinstance(value, str):
preview = value
else:
try:
preview = json.dumps(value, indent=2, ensure_ascii=True)
except TypeError:
preview = str(value)
if len(preview) <= PAYLOAD_PREVIEW_MAX_CHARS:
return preview
return f"{preview[: PAYLOAD_PREVIEW_MAX_CHARS - 3]}..."
def _webhook_memory_content(
*,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> str:
preview = _payload_preview(payload.payload)
inspect_path = f"/api/v1/boards/{webhook.board_id}/webhooks/{webhook.id}/payloads/{payload.id}"
return (
"WEBHOOK PAYLOAD RECEIVED\n"
f"Webhook ID: {webhook.id}\n"
f"Payload ID: {payload.id}\n"
f"Instruction: {webhook.description}\n"
f"Inspect (admin API): {inspect_path}\n\n"
"Payload preview:\n"
f"{preview}"
)
async def _notify_lead_on_webhook_payload(
*,
session: AsyncSession,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> None:
lead = (
await Agent.objects.filter_by(board_id=board.id)
.filter(col(Agent.is_board_lead).is_(True))
.first(session)
)
if lead is None or not lead.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
payload_preview = _payload_preview(payload.payload)
message = (
"WEBHOOK EVENT RECEIVED\n"
f"Board: {board.name}\n"
f"Webhook ID: {webhook.id}\n"
f"Payload ID: {payload.id}\n"
f"Instruction: {webhook.description}\n\n"
"Take action:\n"
"1) Triage this payload against the webhook instruction.\n"
"2) Create/update tasks as needed.\n"
f"3) Reference payload ID {payload.id} in task descriptions.\n\n"
"Payload preview:\n"
f"{payload_preview}\n\n"
"To inspect board memory entries:\n"
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
)
await dispatch.try_send_agent_message(
session_key=lead.openclaw_session_id,
config=config,
agent_name=lead.name,
message=message,
deliver=False,
)
@router.get("", response_model=DefaultLimitOffsetPage[BoardWebhookRead])
async def list_board_webhooks(
board: Board = BOARD_USER_READ_DEP,
session: AsyncSession = SESSION_DEP,
) -> LimitOffsetPage[BoardWebhookRead]:
"""List configured webhooks for a board."""
statement = (
select(BoardWebhook)
.where(col(BoardWebhook.board_id) == board.id)
.order_by(col(BoardWebhook.created_at).desc())
)
def _transform(items: Sequence[object]) -> Sequence[object]:
webhooks = _coerce_webhook_items(items)
return [_to_webhook_read(value) for value in webhooks]
return await paginate(session, statement, transformer=_transform)
@router.post("", response_model=BoardWebhookRead)
async def create_board_webhook(
payload: BoardWebhookCreate,
board: Board = BOARD_USER_WRITE_DEP,
session: AsyncSession = SESSION_DEP,
) -> BoardWebhookRead:
"""Create a new board webhook with a generated UUID endpoint."""
webhook = BoardWebhook(
board_id=board.id,
description=payload.description,
enabled=payload.enabled,
)
await crud.save(session, webhook)
return _to_webhook_read(webhook)
@router.get("/{webhook_id}", response_model=BoardWebhookRead)
async def get_board_webhook(
webhook_id: UUID,
board: Board = BOARD_USER_READ_DEP,
session: AsyncSession = SESSION_DEP,
) -> BoardWebhookRead:
"""Get one board webhook configuration."""
webhook = await _require_board_webhook(
session,
board_id=board.id,
webhook_id=webhook_id,
)
return _to_webhook_read(webhook)
@router.patch("/{webhook_id}", response_model=BoardWebhookRead)
async def update_board_webhook(
webhook_id: UUID,
payload: BoardWebhookUpdate,
board: Board = BOARD_USER_WRITE_DEP,
session: AsyncSession = SESSION_DEP,
) -> BoardWebhookRead:
"""Update board webhook description or enabled state."""
webhook = await _require_board_webhook(
session,
board_id=board.id,
webhook_id=webhook_id,
)
updates = payload.model_dump(exclude_unset=True)
if updates:
crud.apply_updates(webhook, updates)
webhook.updated_at = utcnow()
await crud.save(session, webhook)
return _to_webhook_read(webhook)
@router.delete("/{webhook_id}", response_model=OkResponse)
async def delete_board_webhook(
webhook_id: UUID,
board: Board = BOARD_USER_WRITE_DEP,
session: AsyncSession = SESSION_DEP,
) -> OkResponse:
"""Delete a webhook and its stored payload rows."""
webhook = await _require_board_webhook(
session,
board_id=board.id,
webhook_id=webhook_id,
)
await crud.delete_where(
session,
BoardWebhookPayload,
col(BoardWebhookPayload.webhook_id) == webhook.id,
commit=False,
)
await session.delete(webhook)
await session.commit()
return OkResponse()
@router.get(
"/{webhook_id}/payloads", response_model=DefaultLimitOffsetPage[BoardWebhookPayloadRead]
)
async def list_board_webhook_payloads(
webhook_id: UUID,
board: Board = BOARD_USER_READ_DEP,
session: AsyncSession = SESSION_DEP,
) -> LimitOffsetPage[BoardWebhookPayloadRead]:
"""List stored payloads for one board webhook."""
await _require_board_webhook(
session,
board_id=board.id,
webhook_id=webhook_id,
)
statement = (
select(BoardWebhookPayload)
.where(col(BoardWebhookPayload.board_id) == board.id)
.where(col(BoardWebhookPayload.webhook_id) == webhook_id)
.order_by(col(BoardWebhookPayload.received_at).desc())
)
def _transform(items: Sequence[object]) -> Sequence[object]:
payloads = _coerce_payload_items(items)
return [_to_payload_read(value) for value in payloads]
return await paginate(session, statement, transformer=_transform)
@router.get("/{webhook_id}/payloads/{payload_id}", response_model=BoardWebhookPayloadRead)
async def get_board_webhook_payload(
webhook_id: UUID,
payload_id: UUID,
board: Board = BOARD_USER_READ_DEP,
session: AsyncSession = SESSION_DEP,
) -> BoardWebhookPayloadRead:
"""Get a single stored payload for one board webhook."""
await _require_board_webhook(
session,
board_id=board.id,
webhook_id=webhook_id,
)
payload = await _require_board_webhook_payload(
session,
board_id=board.id,
webhook_id=webhook_id,
payload_id=payload_id,
)
return _to_payload_read(payload)
@router.post(
"/{webhook_id}",
response_model=BoardWebhookIngestResponse,
status_code=status.HTTP_202_ACCEPTED,
)
async def ingest_board_webhook(
request: Request,
webhook_id: UUID,
board: Board = BOARD_OR_404_DEP,
session: AsyncSession = SESSION_DEP,
) -> BoardWebhookIngestResponse:
"""Open inbound webhook endpoint that stores payloads and nudges the board lead."""
webhook = await _require_board_webhook(
session,
board_id=board.id,
webhook_id=webhook_id,
)
if not webhook.enabled:
raise HTTPException(
status_code=status.HTTP_410_GONE,
detail="Webhook is disabled.",
)
content_type = request.headers.get("content-type")
payload_value = _decode_payload(
await request.body(),
content_type=content_type,
)
payload = BoardWebhookPayload(
board_id=board.id,
webhook_id=webhook.id,
payload=payload_value,
headers=_captured_headers(request),
source_ip=request.client.host if request.client else None,
content_type=content_type,
)
session.add(payload)
memory = BoardMemory(
board_id=board.id,
content=_webhook_memory_content(webhook=webhook, payload=payload),
tags=[
"webhook",
f"webhook:{webhook.id}",
f"payload:{payload.id}",
],
source="webhook",
is_chat=False,
)
session.add(memory)
await session.commit()
await _notify_lead_on_webhook_payload(
session=session,
board=board,
webhook=webhook,
payload=payload,
)
return BoardWebhookIngestResponse(
board_id=board.id,
webhook_id=webhook.id,
payload_id=payload.id,
)

View File

@@ -24,6 +24,8 @@ from app.models.board_group_memory import BoardGroupMemory
from app.models.board_groups import BoardGroup
from app.models.board_memory import BoardMemory
from app.models.board_onboarding import BoardOnboardingSession
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organization_board_access import OrganizationBoardAccess
@@ -290,6 +292,18 @@ async def delete_my_org(
col(BoardMemory.board_id).in_(board_ids),
commit=False,
)
await crud.delete_where(
session,
BoardWebhookPayload,
col(BoardWebhookPayload.board_id).in_(board_ids),
commit=False,
)
await crud.delete_where(
session,
BoardWebhook,
col(BoardWebhook.board_id).in_(board_ids),
commit=False,
)
await crud.delete_where(
session,
BoardOnboardingSession,

View File

@@ -120,9 +120,7 @@ def _approval_required_for_done_error() -> HTTPException:
return HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"message": (
"Task can only be marked done when a linked approval has been approved."
),
"message": ("Task can only be marked done when a linked approval has been approved."),
"blocked_by_task_ids": [],
},
)
@@ -132,9 +130,7 @@ def _review_required_for_done_error() -> HTTPException:
return HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"message": (
"Task can only be marked done from review when the board rule is enabled."
),
"message": ("Task can only be marked done from review when the board rule is enabled."),
"blocked_by_task_ids": [],
},
)
@@ -144,9 +140,7 @@ def _pending_approval_blocks_status_change_error() -> HTTPException:
return HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"message": (
"Task status cannot be changed while a linked approval is pending."
),
"message": ("Task status cannot be changed while a linked approval is pending."),
"blocked_by_task_ids": [],
},
)

View File

@@ -18,6 +18,7 @@ from app.api.board_group_memory import router as board_group_memory_router
from app.api.board_groups import router as board_groups_router
from app.api.board_memory import router as board_memory_router
from app.api.board_onboarding import router as board_onboarding_router
from app.api.board_webhooks import router as board_webhooks_router
from app.api.boards import router as boards_router
from app.api.gateway import router as gateway_router
from app.api.gateways import router as gateways_router
@@ -105,6 +106,7 @@ api_v1.include_router(board_groups_router)
api_v1.include_router(board_group_memory_router)
api_v1.include_router(boards_router)
api_v1.include_router(board_memory_router)
api_v1.include_router(board_webhooks_router)
api_v1.include_router(board_onboarding_router)
api_v1.include_router(approvals_router)
api_v1.include_router(tasks_router)

View File

@@ -8,6 +8,8 @@ from app.models.board_group_memory import BoardGroupMemory
from app.models.board_groups import BoardGroup
from app.models.board_memory import BoardMemory
from app.models.board_onboarding import BoardOnboardingSession
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organization_board_access import OrganizationBoardAccess
@@ -28,6 +30,8 @@ __all__ = [
"ApprovalTaskLink",
"Approval",
"BoardGroupMemory",
"BoardWebhook",
"BoardWebhookPayload",
"BoardMemory",
"BoardOnboardingSession",
"BoardGroup",

View File

@@ -0,0 +1,32 @@
"""Persisted webhook payloads received for board webhooks."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field
from app.core.time import utcnow
from app.models.base import QueryModel
RUNTIME_ANNOTATION_TYPES = (datetime,)
class BoardWebhookPayload(QueryModel, table=True):
"""Captured inbound webhook payload with request metadata."""
__tablename__ = "board_webhook_payloads" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
webhook_id: UUID = Field(foreign_key="board_webhooks.id", index=True)
payload: dict[str, object] | list[object] | str | int | float | bool | None = Field(
default=None,
sa_column=Column(JSON),
)
headers: dict[str, str] | None = Field(default=None, sa_column=Column(JSON))
source_ip: str | None = None
content_type: str | None = None
received_at: datetime = Field(default_factory=utcnow, index=True)

View File

@@ -0,0 +1,26 @@
"""Board webhook configuration model."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlmodel import Field
from app.core.time import utcnow
from app.models.base import QueryModel
RUNTIME_ANNOTATION_TYPES = (datetime,)
class BoardWebhook(QueryModel, table=True):
"""Inbound webhook endpoint configuration for a board."""
__tablename__ = "board_webhooks" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
description: str
enabled: bool = Field(default=True, index=True)
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)

View File

@@ -11,6 +11,13 @@ from app.schemas.board_onboarding import (
BoardOnboardingRead,
BoardOnboardingStart,
)
from app.schemas.board_webhooks import (
BoardWebhookCreate,
BoardWebhookIngestResponse,
BoardWebhookPayloadRead,
BoardWebhookRead,
BoardWebhookUpdate,
)
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
from app.schemas.gateways import GatewayCreate, GatewayRead, GatewayUpdate
from app.schemas.metrics import DashboardMetrics
@@ -47,6 +54,11 @@ __all__ = [
"BoardGroupMemoryRead",
"BoardMemoryCreate",
"BoardMemoryRead",
"BoardWebhookCreate",
"BoardWebhookIngestResponse",
"BoardWebhookPayloadRead",
"BoardWebhookRead",
"BoardWebhookUpdate",
"BoardOnboardingAnswer",
"BoardOnboardingConfirm",
"BoardOnboardingRead",

View File

@@ -0,0 +1,61 @@
"""Schemas for board webhook configuration and payload capture endpoints."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from sqlmodel import SQLModel
from app.schemas.common import NonEmptyStr
RUNTIME_ANNOTATION_TYPES = (datetime, UUID, NonEmptyStr)
class BoardWebhookCreate(SQLModel):
"""Payload for creating a board webhook."""
description: NonEmptyStr
enabled: bool = True
class BoardWebhookUpdate(SQLModel):
"""Payload for updating a board webhook."""
description: NonEmptyStr | None = None
enabled: bool | None = None
class BoardWebhookRead(SQLModel):
"""Serialized board webhook configuration."""
id: UUID
board_id: UUID
description: str
enabled: bool
endpoint_path: str
endpoint_url: str | None = None
created_at: datetime
updated_at: datetime
class BoardWebhookPayloadRead(SQLModel):
"""Serialized stored webhook payload."""
id: UUID
board_id: UUID
webhook_id: UUID
payload: dict[str, object] | list[object] | str | int | float | bool | None = None
headers: dict[str, str] | None = None
source_ip: str | None = None
content_type: str | None = None
received_at: datetime
class BoardWebhookIngestResponse(SQLModel):
"""Response payload for inbound webhook ingestion."""
ok: bool = True
board_id: UUID
webhook_id: UUID
payload_id: UUID

View File

@@ -18,6 +18,8 @@ from app.models.approval_task_links import ApprovalTaskLink
from app.models.approvals import Approval
from app.models.board_memory import BoardMemory
from app.models.board_onboarding import BoardOnboardingSession
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.organization_board_access import OrganizationBoardAccess
from app.models.organization_invite_board_access import OrganizationInviteBoardAccess
from app.models.task_dependencies import TaskDependency
@@ -84,6 +86,12 @@ async def delete_board(session: AsyncSession, *, board: Board) -> OkResponse:
await crud.delete_where(session, Approval, col(Approval.board_id) == board.id)
await crud.delete_where(session, BoardMemory, col(BoardMemory.board_id) == board.id)
await crud.delete_where(
session,
BoardWebhookPayload,
col(BoardWebhookPayload.board_id) == board.id,
)
await crud.delete_where(session, BoardWebhook, col(BoardWebhook.board_id) == board.id)
await crud.delete_where(
session,
BoardOnboardingSession,

View File

@@ -0,0 +1,130 @@
"""Add board webhook configuration and payload storage tables.
Revision ID: fa6e83f8d9a1
Revises: c2e9f1a6d4b8
Create Date: 2026-02-13 00:10:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "fa6e83f8d9a1"
down_revision = "c2e9f1a6d4b8"
branch_labels = None
depends_on = None
def _index_names(inspector: sa.Inspector, table_name: str) -> set[str]:
return {item["name"] for item in inspector.get_indexes(table_name)}
def upgrade() -> None:
"""Create board webhook and payload capture tables."""
bind = op.get_bind()
inspector = sa.inspect(bind)
if not inspector.has_table("board_webhooks"):
op.create_table(
"board_webhooks",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("description", sa.String(), nullable=False),
sa.Column("enabled", sa.Boolean(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.PrimaryKeyConstraint("id"),
)
inspector = sa.inspect(bind)
webhook_indexes = _index_names(inspector, "board_webhooks")
if "ix_board_webhooks_board_id" not in webhook_indexes:
op.create_index("ix_board_webhooks_board_id", "board_webhooks", ["board_id"])
if "ix_board_webhooks_enabled" not in webhook_indexes:
op.create_index("ix_board_webhooks_enabled", "board_webhooks", ["enabled"])
if not inspector.has_table("board_webhook_payloads"):
op.create_table(
"board_webhook_payloads",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("webhook_id", sa.Uuid(), nullable=False),
sa.Column("payload", sa.JSON(), nullable=True),
sa.Column("headers", sa.JSON(), nullable=True),
sa.Column("source_ip", sa.String(), nullable=True),
sa.Column("content_type", sa.String(), nullable=True),
sa.Column("received_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.ForeignKeyConstraint(["webhook_id"], ["board_webhooks.id"]),
sa.PrimaryKeyConstraint("id"),
)
inspector = sa.inspect(bind)
payload_indexes = _index_names(inspector, "board_webhook_payloads")
if "ix_board_webhook_payloads_board_id" not in payload_indexes:
op.create_index(
"ix_board_webhook_payloads_board_id",
"board_webhook_payloads",
["board_id"],
)
if "ix_board_webhook_payloads_webhook_id" not in payload_indexes:
op.create_index(
"ix_board_webhook_payloads_webhook_id",
"board_webhook_payloads",
["webhook_id"],
)
if "ix_board_webhook_payloads_received_at" not in payload_indexes:
op.create_index(
"ix_board_webhook_payloads_received_at",
"board_webhook_payloads",
["received_at"],
)
if "ix_board_webhook_payloads_board_webhook_received_at" not in payload_indexes:
op.create_index(
"ix_board_webhook_payloads_board_webhook_received_at",
"board_webhook_payloads",
["board_id", "webhook_id", "received_at"],
)
def downgrade() -> None:
"""Drop board webhook and payload capture tables."""
bind = op.get_bind()
inspector = sa.inspect(bind)
if inspector.has_table("board_webhook_payloads"):
payload_indexes = _index_names(inspector, "board_webhook_payloads")
if "ix_board_webhook_payloads_board_webhook_received_at" in payload_indexes:
op.drop_index(
"ix_board_webhook_payloads_board_webhook_received_at",
table_name="board_webhook_payloads",
)
if "ix_board_webhook_payloads_received_at" in payload_indexes:
op.drop_index(
"ix_board_webhook_payloads_received_at",
table_name="board_webhook_payloads",
)
if "ix_board_webhook_payloads_webhook_id" in payload_indexes:
op.drop_index(
"ix_board_webhook_payloads_webhook_id",
table_name="board_webhook_payloads",
)
if "ix_board_webhook_payloads_board_id" in payload_indexes:
op.drop_index(
"ix_board_webhook_payloads_board_id",
table_name="board_webhook_payloads",
)
op.drop_table("board_webhook_payloads")
inspector = sa.inspect(bind)
if inspector.has_table("board_webhooks"):
webhook_indexes = _index_names(inspector, "board_webhooks")
if "ix_board_webhooks_enabled" in webhook_indexes:
op.drop_index("ix_board_webhooks_enabled", table_name="board_webhooks")
if "ix_board_webhooks_board_id" in webhook_indexes:
op.drop_index("ix_board_webhooks_board_id", table_name="board_webhooks")
op.drop_table("board_webhooks")

View File

@@ -0,0 +1,282 @@
# ruff: noqa: INP001
"""Integration tests for board webhook ingestion behavior."""
from __future__ import annotations
from uuid import UUID, uuid4
import pytest
from fastapi import APIRouter, Depends, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.api import board_webhooks
from app.api.board_webhooks import router as board_webhooks_router
from app.api.deps import get_board_or_404
from app.db.session import get_session
from app.models.agents import Agent
from app.models.board_memory import BoardMemory
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organizations import Organization
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.connect() as conn, conn.begin():
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(
session_maker: async_sessionmaker[AsyncSession],
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(board_webhooks_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
async def _override_get_board_or_404(
board_id: str,
session: AsyncSession = Depends(get_session),
) -> Board:
board = await Board.objects.by_id(UUID(board_id)).first(session)
if board is None:
from fastapi import HTTPException, status
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return board
app.dependency_overrides[get_session] = _override_get_session
app.dependency_overrides[get_board_or_404] = _override_get_board_or_404
return app
async def _seed_webhook(
session: AsyncSession,
*,
enabled: bool,
) -> tuple[Board, BoardWebhook]:
organization_id = uuid4()
gateway_id = uuid4()
board_id = uuid4()
webhook_id = uuid4()
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
session.add(
Gateway(
id=gateway_id,
organization_id=organization_id,
name="gateway",
url="https://gateway.example.local",
workspace_root="/tmp/workspace",
),
)
board = Board(
id=board_id,
organization_id=organization_id,
gateway_id=gateway_id,
name="Launch board",
slug="launch-board",
description="Board for launch automation.",
)
session.add(board)
session.add(
Agent(
id=uuid4(),
board_id=board_id,
gateway_id=gateway_id,
name="Lead Agent",
status="online",
openclaw_session_id="lead:session:key",
is_board_lead=True,
),
)
webhook = BoardWebhook(
id=webhook_id,
board_id=board_id,
description="Triage payload and create tasks for impacted services.",
enabled=enabled,
)
session.add(webhook)
await session.commit()
return board, webhook
@pytest.mark.asyncio
async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
app = _build_test_app(session_maker)
sent_messages: list[dict[str, str]] = []
async with session_maker() as session:
board, webhook = await _seed_webhook(session, enabled=True)
async def _fake_optional_gateway_config_for_board(
self: board_webhooks.GatewayDispatchService,
_board: Board,
) -> object:
return object()
async def _fake_try_send_agent_message(
self: board_webhooks.GatewayDispatchService,
*,
session_key: str,
config: object,
agent_name: str,
message: str,
deliver: bool = False,
) -> None:
del self, config, deliver
sent_messages.append(
{
"session_key": session_key,
"agent_name": agent_name,
"message": message,
},
)
return None
monkeypatch.setattr(
board_webhooks.GatewayDispatchService,
"optional_gateway_config_for_board",
_fake_optional_gateway_config_for_board,
)
monkeypatch.setattr(
board_webhooks.GatewayDispatchService,
"try_send_agent_message",
_fake_try_send_agent_message,
)
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/boards/{board.id}/webhooks/{webhook.id}",
json={"event": "deploy", "service": "api"},
headers={"X-Signature": "sha256=abc123"},
)
assert response.status_code == 202
body = response.json()
payload_id = UUID(body["payload_id"])
assert body["board_id"] == str(board.id)
assert body["webhook_id"] == str(webhook.id)
async with session_maker() as session:
payloads = (
await session.exec(
select(BoardWebhookPayload).where(col(BoardWebhookPayload.id) == payload_id),
)
).all()
assert len(payloads) == 1
assert payloads[0].payload == {"event": "deploy", "service": "api"}
assert payloads[0].headers is not None
assert payloads[0].headers.get("x-signature") == "sha256=abc123"
assert payloads[0].headers.get("content-type") == "application/json"
memory_items = (
await session.exec(
select(BoardMemory).where(col(BoardMemory.board_id) == board.id),
)
).all()
assert len(memory_items) == 1
assert memory_items[0].source == "webhook"
assert memory_items[0].tags is not None
assert f"webhook:{webhook.id}" in memory_items[0].tags
assert f"payload:{payload_id}" in memory_items[0].tags
assert f"Payload ID: {payload_id}" in memory_items[0].content
assert len(sent_messages) == 1
assert sent_messages[0]["session_key"] == "lead:session:key"
assert "WEBHOOK EVENT RECEIVED" in sent_messages[0]["message"]
assert str(payload_id) in sent_messages[0]["message"]
assert webhook.description in sent_messages[0]["message"]
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_ingest_board_webhook_rejects_disabled_endpoint(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
app = _build_test_app(session_maker)
sent_messages: list[str] = []
async with session_maker() as session:
board, webhook = await _seed_webhook(session, enabled=False)
async def _fake_try_send_agent_message(
self: board_webhooks.GatewayDispatchService,
*,
session_key: str,
config: object,
agent_name: str,
message: str,
deliver: bool = False,
) -> None:
del self, session_key, config, agent_name, deliver
sent_messages.append(message)
return None
monkeypatch.setattr(
board_webhooks.GatewayDispatchService,
"try_send_agent_message",
_fake_try_send_agent_message,
)
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/boards/{board.id}/webhooks/{webhook.id}",
json={"event": "deploy"},
)
assert response.status_code == 410
assert response.json() == {"detail": "Webhook is disabled."}
async with session_maker() as session:
stored_payloads = (
await session.exec(
select(BoardWebhookPayload).where(
col(BoardWebhookPayload.board_id) == board.id
),
)
).all()
assert stored_payloads == []
stored_memory = (
await session.exec(
select(BoardMemory).where(col(BoardMemory.board_id) == board.id),
)
).all()
assert stored_memory == []
assert sent_messages == []
finally:
await engine.dispose()

View File

@@ -59,6 +59,8 @@ async def test_delete_my_org_cleans_dependents_before_organization_delete() -> N
"approval_task_links",
"approvals",
"board_memory",
"board_webhook_payloads",
"board_webhooks",
"board_onboarding_sessions",
"organization_board_access",
"organization_invite_board_access",

View File

@@ -277,7 +277,9 @@ async def test_update_task_allows_done_from_review_when_review_toggle_enabled()
@pytest.mark.asyncio
async def test_update_task_rejects_status_change_with_pending_approval_when_toggle_enabled() -> None:
async def test_update_task_rejects_status_change_with_pending_approval_when_toggle_enabled() -> (
None
):
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
@@ -318,7 +320,9 @@ async def test_update_task_rejects_status_change_with_pending_approval_when_togg
@pytest.mark.asyncio
async def test_update_task_allows_status_change_with_pending_approval_when_toggle_disabled() -> None:
async def test_update_task_allows_status_change_with_pending_approval_when_toggle_disabled() -> (
None
):
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
@@ -353,7 +357,9 @@ async def test_update_task_allows_status_change_with_pending_approval_when_toggl
@pytest.mark.asyncio
async def test_update_task_rejects_status_change_for_pending_multi_task_link_when_toggle_enabled() -> None:
async def test_update_task_rejects_status_change_for_pending_multi_task_link_when_toggle_enabled() -> (
None
):
engine = await _make_engine()
try:
async with await _make_session(engine) as session: