refactor: add agent_id to various interfaces and improve field organization
This commit is contained in:
@@ -64,6 +64,7 @@ def _to_webhook_read(webhook: BoardWebhook) -> BoardWebhookRead:
|
||||
return BoardWebhookRead(
|
||||
id=webhook.id,
|
||||
board_id=webhook.board_id,
|
||||
agent_id=webhook.agent_id,
|
||||
description=webhook.description,
|
||||
enabled=webhook.enabled,
|
||||
endpoint_path=endpoint_path,
|
||||
@@ -206,12 +207,18 @@ async def _notify_lead_on_webhook_payload(
|
||||
webhook: BoardWebhook,
|
||||
payload: BoardWebhookPayload,
|
||||
) -> None:
|
||||
lead = (
|
||||
await Agent.objects.filter_by(board_id=board.id)
|
||||
.filter(col(Agent.is_board_lead).is_(True))
|
||||
.first(session)
|
||||
)
|
||||
if lead is None or not lead.openclaw_session_id:
|
||||
target_agent: Agent | None = None
|
||||
if webhook.agent_id is not None:
|
||||
target_agent = await Agent.objects.filter_by(id=webhook.agent_id, board_id=board.id).first(
|
||||
session
|
||||
)
|
||||
if target_agent is None:
|
||||
target_agent = (
|
||||
await Agent.objects.filter_by(board_id=board.id)
|
||||
.filter(col(Agent.is_board_lead).is_(True))
|
||||
.first(session)
|
||||
)
|
||||
if target_agent is None or not target_agent.openclaw_session_id:
|
||||
return
|
||||
|
||||
dispatch = GatewayDispatchService(session)
|
||||
@@ -236,14 +243,30 @@ async def _notify_lead_on_webhook_payload(
|
||||
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
|
||||
)
|
||||
await dispatch.try_send_agent_message(
|
||||
session_key=lead.openclaw_session_id,
|
||||
session_key=target_agent.openclaw_session_id,
|
||||
config=config,
|
||||
agent_name=lead.name,
|
||||
agent_name=target_agent.name,
|
||||
message=message,
|
||||
deliver=False,
|
||||
)
|
||||
|
||||
|
||||
async def _validate_agent_id(
|
||||
*,
|
||||
session: AsyncSession,
|
||||
board: Board,
|
||||
agent_id: UUID | None,
|
||||
) -> None:
|
||||
if agent_id is None:
|
||||
return
|
||||
agent = await Agent.objects.filter_by(id=agent_id, board_id=board.id).first(session)
|
||||
if agent is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
detail="agent_id must reference an agent on this board.",
|
||||
)
|
||||
|
||||
|
||||
@router.get("", response_model=DefaultLimitOffsetPage[BoardWebhookRead])
|
||||
async def list_board_webhooks(
|
||||
board: Board = BOARD_USER_READ_DEP,
|
||||
@@ -270,8 +293,14 @@ async def create_board_webhook(
|
||||
session: AsyncSession = SESSION_DEP,
|
||||
) -> BoardWebhookRead:
|
||||
"""Create a new board webhook with a generated UUID endpoint."""
|
||||
await _validate_agent_id(
|
||||
session=session,
|
||||
board=board,
|
||||
agent_id=payload.agent_id,
|
||||
)
|
||||
webhook = BoardWebhook(
|
||||
board_id=board.id,
|
||||
agent_id=payload.agent_id,
|
||||
description=payload.description,
|
||||
enabled=payload.enabled,
|
||||
)
|
||||
@@ -309,6 +338,11 @@ async def update_board_webhook(
|
||||
)
|
||||
updates = payload.model_dump(exclude_unset=True)
|
||||
if updates:
|
||||
await _validate_agent_id(
|
||||
session=session,
|
||||
board=board,
|
||||
agent_id=updates.get("agent_id"),
|
||||
)
|
||||
crud.apply_updates(webhook, updates)
|
||||
webhook.updated_at = utcnow()
|
||||
await crud.save(session, webhook)
|
||||
|
||||
@@ -20,6 +20,7 @@ class BoardWebhook(QueryModel, table=True):
|
||||
|
||||
id: UUID = Field(default_factory=uuid4, primary_key=True)
|
||||
board_id: UUID = Field(foreign_key="boards.id", index=True)
|
||||
agent_id: UUID | None = Field(default=None, foreign_key="agents.id", index=True)
|
||||
description: str
|
||||
enabled: bool = Field(default=True, index=True)
|
||||
created_at: datetime = Field(default_factory=utcnow)
|
||||
|
||||
@@ -17,6 +17,7 @@ class BoardWebhookCreate(SQLModel):
|
||||
|
||||
description: NonEmptyStr
|
||||
enabled: bool = True
|
||||
agent_id: UUID | None = None
|
||||
|
||||
|
||||
class BoardWebhookUpdate(SQLModel):
|
||||
@@ -24,6 +25,7 @@ class BoardWebhookUpdate(SQLModel):
|
||||
|
||||
description: NonEmptyStr | None = None
|
||||
enabled: bool | None = None
|
||||
agent_id: UUID | None = None
|
||||
|
||||
|
||||
class BoardWebhookRead(SQLModel):
|
||||
@@ -31,6 +33,7 @@ class BoardWebhookRead(SQLModel):
|
||||
|
||||
id: UUID
|
||||
board_id: UUID
|
||||
agent_id: UUID | None = None
|
||||
description: str
|
||||
enabled: bool
|
||||
endpoint_path: str
|
||||
|
||||
@@ -62,15 +62,23 @@ def _webhook_message(
|
||||
)
|
||||
|
||||
|
||||
async def _notify_lead(
|
||||
async def _notify_target_agent(
|
||||
*,
|
||||
session: AsyncSession,
|
||||
board: Board,
|
||||
webhook: BoardWebhook,
|
||||
payload: BoardWebhookPayload,
|
||||
) -> None:
|
||||
lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session)
|
||||
if lead is None or not lead.openclaw_session_id:
|
||||
target_agent: Agent | None = None
|
||||
if webhook.agent_id is not None:
|
||||
target_agent = await Agent.objects.filter_by(id=webhook.agent_id, board_id=board.id).first(
|
||||
session
|
||||
)
|
||||
if target_agent is None:
|
||||
target_agent = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(
|
||||
session
|
||||
)
|
||||
if target_agent is None or not target_agent.openclaw_session_id:
|
||||
return
|
||||
|
||||
dispatch = GatewayDispatchService(session)
|
||||
@@ -80,9 +88,9 @@ async def _notify_lead(
|
||||
|
||||
message = _webhook_message(board=board, webhook=webhook, payload=payload)
|
||||
await dispatch.try_send_agent_message(
|
||||
session_key=lead.openclaw_session_id,
|
||||
session_key=target_agent.openclaw_session_id,
|
||||
config=config,
|
||||
agent_name=lead.name,
|
||||
agent_name=target_agent.name,
|
||||
message=message,
|
||||
deliver=False,
|
||||
)
|
||||
@@ -160,7 +168,7 @@ async def _process_single_item(item: QueuedInboundDelivery) -> None:
|
||||
return
|
||||
|
||||
board, webhook, payload = loaded
|
||||
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
|
||||
await _notify_target_agent(session=session, board=board, webhook=webhook, payload=payload)
|
||||
await session.commit()
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
"""Add optional agent mapping to board webhooks.
|
||||
|
||||
Revision ID: b7a1d9c3e4f5
|
||||
Revises: a2f6c9d4b7e8
|
||||
Create Date: 2026-02-15 14:00:00.000000
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "b7a1d9c3e4f5"
|
||||
down_revision = "a2f6c9d4b7e8"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Add optional mapped agent reference on board webhooks."""
|
||||
op.add_column("board_webhooks", sa.Column("agent_id", sa.Uuid(), nullable=True))
|
||||
op.create_index("ix_board_webhooks_agent_id", "board_webhooks", ["agent_id"], unique=False)
|
||||
op.create_foreign_key(
|
||||
"fk_board_webhooks_agent_id_agents",
|
||||
"board_webhooks",
|
||||
"agents",
|
||||
["agent_id"],
|
||||
["id"],
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Remove optional mapped agent reference from board webhooks."""
|
||||
op.drop_constraint("fk_board_webhooks_agent_id_agents", "board_webhooks", type_="foreignkey")
|
||||
op.drop_index("ix_board_webhooks_agent_id", table_name="board_webhooks")
|
||||
op.drop_column("board_webhooks", "agent_id")
|
||||
@@ -5,6 +5,7 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from types import SimpleNamespace
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pytest
|
||||
@@ -222,6 +223,128 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo
|
||||
assert processed == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_notify_target_agent_prefers_mapped_agent(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
agent_id = uuid4()
|
||||
mapped_agent = SimpleNamespace(
|
||||
id=agent_id,
|
||||
name="Mapped Agent",
|
||||
openclaw_session_id="mapped:session",
|
||||
)
|
||||
lead_agent = SimpleNamespace(
|
||||
id=uuid4(),
|
||||
name="Lead Agent",
|
||||
openclaw_session_id="lead:session",
|
||||
)
|
||||
sent: list[dict[str, str]] = []
|
||||
|
||||
class _FakeAgentObjects:
|
||||
def filter_by(self, **kwargs: object) -> _FakeAgentObjects:
|
||||
self._kwargs = kwargs
|
||||
return self
|
||||
|
||||
async def first(self, session: object) -> object | None:
|
||||
del session
|
||||
if self._kwargs.get("id") == agent_id:
|
||||
return mapped_agent
|
||||
if self._kwargs.get("is_board_lead") is True:
|
||||
return lead_agent
|
||||
return None
|
||||
|
||||
class _FakeDispatchService:
|
||||
def __init__(self, session: object) -> None:
|
||||
del session
|
||||
|
||||
async def optional_gateway_config_for_board(self, board: object) -> object:
|
||||
del board
|
||||
return object()
|
||||
|
||||
async def try_send_agent_message(
|
||||
self,
|
||||
*,
|
||||
session_key: str,
|
||||
config: object,
|
||||
agent_name: str,
|
||||
message: str,
|
||||
deliver: bool = False,
|
||||
) -> None:
|
||||
del config, message, deliver
|
||||
sent.append({"session_key": session_key, "agent_name": agent_name})
|
||||
|
||||
monkeypatch.setattr(dispatch.Agent, "objects", _FakeAgentObjects())
|
||||
monkeypatch.setattr(dispatch, "GatewayDispatchService", _FakeDispatchService)
|
||||
|
||||
webhook = SimpleNamespace(id=uuid4(), description="desc", agent_id=agent_id)
|
||||
board = SimpleNamespace(id=uuid4(), name="Board")
|
||||
payload = SimpleNamespace(id=uuid4(), payload={"event": "test"})
|
||||
|
||||
await dispatch._notify_target_agent(
|
||||
session=SimpleNamespace(),
|
||||
board=board,
|
||||
webhook=webhook,
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
assert sent == [{"session_key": "mapped:session", "agent_name": "Mapped Agent"}]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_notify_target_agent_falls_back_to_lead(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
lead_agent = SimpleNamespace(
|
||||
id=uuid4(),
|
||||
name="Lead Agent",
|
||||
openclaw_session_id="lead:session",
|
||||
)
|
||||
sent: list[dict[str, str]] = []
|
||||
|
||||
class _FakeAgentObjects:
|
||||
def filter_by(self, **kwargs: object) -> _FakeAgentObjects:
|
||||
self._kwargs = kwargs
|
||||
return self
|
||||
|
||||
async def first(self, session: object) -> object | None:
|
||||
del session
|
||||
if self._kwargs.get("is_board_lead") is True:
|
||||
return lead_agent
|
||||
return None
|
||||
|
||||
class _FakeDispatchService:
|
||||
def __init__(self, session: object) -> None:
|
||||
del session
|
||||
|
||||
async def optional_gateway_config_for_board(self, board: object) -> object:
|
||||
del board
|
||||
return object()
|
||||
|
||||
async def try_send_agent_message(
|
||||
self,
|
||||
*,
|
||||
session_key: str,
|
||||
config: object,
|
||||
agent_name: str,
|
||||
message: str,
|
||||
deliver: bool = False,
|
||||
) -> None:
|
||||
del config, message, deliver
|
||||
sent.append({"session_key": session_key, "agent_name": agent_name})
|
||||
|
||||
monkeypatch.setattr(dispatch.Agent, "objects", _FakeAgentObjects())
|
||||
monkeypatch.setattr(dispatch, "GatewayDispatchService", _FakeDispatchService)
|
||||
|
||||
webhook = SimpleNamespace(id=uuid4(), description="desc", agent_id=None)
|
||||
board = SimpleNamespace(id=uuid4(), name="Board")
|
||||
payload = SimpleNamespace(id=uuid4(), payload={"event": "test"})
|
||||
|
||||
await dispatch._notify_target_agent(
|
||||
session=SimpleNamespace(),
|
||||
board=board,
|
||||
webhook=webhook,
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
assert sent == [{"session_key": "lead:session", "agent_name": "Lead Agent"}]
|
||||
|
||||
|
||||
def test_dispatch_run_entrypoint_calls_async_flush(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
called: list[bool] = []
|
||||
|
||||
|
||||
Reference in New Issue
Block a user