Merge pull request #28 from abhi1693/feature/board-lead-orchestration

Board lead orchestration: goals, onboarding, approvals, and live comments
This commit is contained in:
Abhimanyu Saharan
2026-02-06 11:50:34 +05:30
committed by GitHub
61 changed files with 7043 additions and 307 deletions

View File

@@ -0,0 +1,145 @@
"""board lead orchestration
Revision ID: 3b9b2f1a6c2d
Revises: 9f2c1a7b0d3e
Create Date: 2026-02-05 14:45:00.000000
"""
from __future__ import annotations
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "3b9b2f1a6c2d"
down_revision = "9f2c1a7b0d3e"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column("boards", sa.Column("board_type", sa.String(), server_default="goal", nullable=False))
op.add_column("boards", sa.Column("objective", sa.Text(), nullable=True))
op.add_column("boards", sa.Column("success_metrics", sa.JSON(), nullable=True))
op.add_column("boards", sa.Column("target_date", sa.DateTime(), nullable=True))
op.add_column(
"boards",
sa.Column("goal_confirmed", sa.Boolean(), server_default=sa.text("false"), nullable=False),
)
op.add_column("boards", sa.Column("goal_source", sa.Text(), nullable=True))
op.add_column(
"agents",
sa.Column("is_board_lead", sa.Boolean(), server_default=sa.text("false"), nullable=False),
)
op.add_column(
"tasks",
sa.Column("auto_created", sa.Boolean(), server_default=sa.text("false"), nullable=False),
)
op.add_column("tasks", sa.Column("auto_reason", sa.Text(), nullable=True))
op.create_table(
"board_memory",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("tags", sa.JSON(), nullable=True),
sa.Column("source", sa.Text(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_board_memory_board_id", "board_memory", ["board_id"], unique=False)
op.create_table(
"approvals",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("agent_id", sa.Uuid(), nullable=True),
sa.Column("action_type", sa.String(), nullable=False),
sa.Column("payload", sa.JSON(), nullable=True),
sa.Column("confidence", sa.Integer(), nullable=False),
sa.Column("rubric_scores", sa.JSON(), nullable=True),
sa.Column("status", sa.String(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("resolved_at", sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(["agent_id"], ["agents.id"]),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_approvals_board_id", "approvals", ["board_id"], unique=False)
op.create_index("ix_approvals_agent_id", "approvals", ["agent_id"], unique=False)
op.create_index("ix_approvals_status", "approvals", ["status"], unique=False)
op.create_table(
"board_onboarding_sessions",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("session_key", sa.String(), nullable=False),
sa.Column("status", sa.String(), nullable=False),
sa.Column("messages", sa.JSON(), nullable=True),
sa.Column("draft_goal", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_board_onboarding_sessions_board_id",
"board_onboarding_sessions",
["board_id"],
unique=False,
)
op.create_index(
"ix_board_onboarding_sessions_status",
"board_onboarding_sessions",
["status"],
unique=False,
)
op.create_table(
"task_fingerprints",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("fingerprint_hash", sa.String(), nullable=False),
sa.Column("task_id", sa.Uuid(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.ForeignKeyConstraint(["task_id"], ["tasks.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_task_fingerprints_board_hash",
"task_fingerprints",
["board_id", "fingerprint_hash"],
unique=True,
)
def downgrade() -> None:
op.drop_index("ix_task_fingerprints_board_hash", table_name="task_fingerprints")
op.drop_table("task_fingerprints")
op.drop_index(
"ix_board_onboarding_sessions_status", table_name="board_onboarding_sessions"
)
op.drop_index(
"ix_board_onboarding_sessions_board_id", table_name="board_onboarding_sessions"
)
op.drop_table("board_onboarding_sessions")
op.drop_index("ix_approvals_status", table_name="approvals")
op.drop_index("ix_approvals_agent_id", table_name="approvals")
op.drop_index("ix_approvals_board_id", table_name="approvals")
op.drop_table("approvals")
op.drop_index("ix_board_memory_board_id", table_name="board_memory")
op.drop_table("board_memory")
op.drop_column("tasks", "auto_reason")
op.drop_column("tasks", "auto_created")
op.drop_column("agents", "is_board_lead")
op.drop_column("boards", "goal_source")
op.drop_column("boards", "goal_confirmed")
op.drop_column("boards", "target_date")
op.drop_column("boards", "success_metrics")
op.drop_column("boards", "objective")
op.drop_column("boards", "board_type")

385
backend/app/api/agent.py Normal file
View File

@@ -0,0 +1,385 @@
from __future__ import annotations
import asyncio
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlmodel import Session, select
from app.api import agents as agents_api
from app.api import approvals as approvals_api
from app.api import board_memory as board_memory_api
from app.api import board_onboarding as onboarding_api
from app.api import tasks as tasks_api
from app.api.deps import ActorContext, get_board_or_404, get_task_or_404
from app.core.agent_auth import AgentAuthContext, get_agent_auth_context
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.agents import AgentCreate, AgentHeartbeat, AgentHeartbeatCreate, AgentNudge, AgentRead
from app.schemas.approvals import ApprovalCreate, ApprovalRead
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
from app.schemas.board_onboarding import BoardOnboardingRead
from app.schemas.boards import BoardRead
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
router = APIRouter(prefix="/agent", tags=["agent"])
def _actor(agent_ctx: AgentAuthContext) -> ActorContext:
return ActorContext(actor_type="agent", agent=agent_ctx.agent)
def _guard_board_access(agent_ctx: AgentAuthContext, board: Board) -> None:
if agent_ctx.agent.board_id and agent_ctx.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
def _gateway_config(session: Session, board: Board) -> GatewayClientConfig:
if not board.gateway_id:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
gateway = session.get(Gateway, board.gateway_id)
if gateway is None or not gateway.url:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
return GatewayClientConfig(url=gateway.url, token=gateway.token)
@router.get("/boards", response_model=list[BoardRead])
def list_boards(
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[Board]:
if agent_ctx.agent.board_id:
board = session.get(Board, agent_ctx.agent.board_id)
return [board] if board else []
return list(session.exec(select(Board)))
@router.get("/boards/{board_id}", response_model=BoardRead)
def get_board(
board: Board = Depends(get_board_or_404),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> Board:
_guard_board_access(agent_ctx, board)
return board
@router.get("/agents", response_model=list[AgentRead])
def list_agents(
board_id: UUID | None = Query(default=None),
limit: int | None = Query(default=None, ge=1, le=200),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[AgentRead]:
statement = select(Agent)
if agent_ctx.agent.board_id:
if board_id and board_id != agent_ctx.agent.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
statement = statement.where(Agent.board_id == agent_ctx.agent.board_id)
elif board_id:
statement = statement.where(Agent.board_id == board_id)
if limit is not None:
statement = statement.limit(limit)
agents = list(session.exec(statement))
main_session_keys = agents_api._get_gateway_main_session_keys(session)
return [
agents_api._to_agent_read(agents_api._with_computed_status(agent), main_session_keys)
for agent in agents
]
@router.get("/boards/{board_id}/tasks", response_model=list[TaskRead])
def list_tasks(
status_filter: str | None = Query(default=None, alias="status"),
assigned_agent_id: UUID | None = None,
unassigned: bool | None = None,
limit: int | None = Query(default=None, ge=1, le=200),
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[TaskRead]:
_guard_board_access(agent_ctx, board)
return tasks_api.list_tasks(
status_filter=status_filter,
assigned_agent_id=assigned_agent_id,
unassigned=unassigned,
limit=limit,
board=board,
session=session,
actor=_actor(agent_ctx),
)
@router.post("/boards/{board_id}/tasks", response_model=TaskRead)
def create_task(
payload: TaskCreate,
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> TaskRead:
_guard_board_access(agent_ctx, board)
if not agent_ctx.agent.is_board_lead:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
tasks_api.validate_task_status(payload.status)
task = Task.model_validate(payload)
task.board_id = board.id
task.auto_created = True
task.auto_reason = f"lead_agent:{agent_ctx.agent.id}"
if task.assigned_agent_id:
agent = session.get(Agent, task.assigned_agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if agent.is_board_lead:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads cannot assign tasks to themselves.",
)
if agent.board_id and agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
session.add(task)
session.commit()
session.refresh(task)
record_activity(
session,
event_type="task.created",
task_id=task.id,
message=f"Task created by lead: {task.title}.",
agent_id=agent_ctx.agent.id,
)
session.commit()
if task.assigned_agent_id:
assigned_agent = session.get(Agent, task.assigned_agent_id)
if assigned_agent:
tasks_api._notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
return task
@router.patch("/boards/{board_id}/tasks/{task_id}", response_model=TaskRead)
def update_task(
payload: TaskUpdate,
task=Depends(get_task_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> TaskRead:
if agent_ctx.agent.board_id and task.board_id and agent_ctx.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return tasks_api.update_task(
payload=payload,
task=task,
session=session,
actor=_actor(agent_ctx),
)
@router.get("/boards/{board_id}/tasks/{task_id}/comments", response_model=list[TaskCommentRead])
def list_task_comments(
task=Depends(get_task_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[TaskCommentRead]:
if agent_ctx.agent.board_id and task.board_id and agent_ctx.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return tasks_api.list_task_comments(
task=task,
session=session,
actor=_actor(agent_ctx),
)
@router.post("/boards/{board_id}/tasks/{task_id}/comments", response_model=TaskCommentRead)
def create_task_comment(
payload: TaskCommentCreate,
task=Depends(get_task_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> TaskCommentRead:
if agent_ctx.agent.board_id and task.board_id and agent_ctx.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return tasks_api.create_task_comment(
payload=payload,
task=task,
session=session,
actor=_actor(agent_ctx),
)
@router.get("/boards/{board_id}/memory", response_model=list[BoardMemoryRead])
def list_board_memory(
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[BoardMemoryRead]:
_guard_board_access(agent_ctx, board)
return board_memory_api.list_board_memory(
limit=limit,
offset=offset,
board=board,
session=session,
actor=_actor(agent_ctx),
)
@router.post("/boards/{board_id}/memory", response_model=BoardMemoryRead)
def create_board_memory(
payload: BoardMemoryCreate,
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> BoardMemoryRead:
_guard_board_access(agent_ctx, board)
return board_memory_api.create_board_memory(
payload=payload,
board=board,
session=session,
actor=_actor(agent_ctx),
)
@router.get("/boards/{board_id}/approvals", response_model=list[ApprovalRead])
def list_approvals(
status_filter: str | None = Query(default=None, alias="status"),
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[ApprovalRead]:
_guard_board_access(agent_ctx, board)
return approvals_api.list_approvals(
status_filter=status_filter,
board=board,
session=session,
actor=_actor(agent_ctx),
)
@router.post("/boards/{board_id}/approvals", response_model=ApprovalRead)
def create_approval(
payload: ApprovalCreate,
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> ApprovalRead:
_guard_board_access(agent_ctx, board)
return approvals_api.create_approval(
payload=payload,
board=board,
session=session,
actor=_actor(agent_ctx),
)
@router.post("/boards/{board_id}/onboarding", response_model=BoardOnboardingRead)
def update_onboarding(
payload: dict[str, object],
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> BoardOnboardingRead:
_guard_board_access(agent_ctx, board)
return onboarding_api.agent_onboarding_update(
payload=payload,
board=board,
session=session,
actor=_actor(agent_ctx),
)
@router.post("/agents", response_model=AgentRead)
async def create_agent(
payload: AgentCreate,
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> AgentRead:
if not agent_ctx.agent.is_board_lead:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if not agent_ctx.agent.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
payload = AgentCreate(**{**payload.model_dump(), "board_id": agent_ctx.agent.board_id})
return await agents_api.create_agent(
payload=payload,
session=session,
actor=_actor(agent_ctx),
)
@router.post("/boards/{board_id}/agents/{agent_id}/nudge")
def nudge_agent(
payload: AgentNudge,
agent_id: str,
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> dict[str, bool]:
_guard_board_access(agent_ctx, board)
if not agent_ctx.agent.is_board_lead:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
target = session.get(Agent, agent_id)
if target is None or (target.board_id and target.board_id != board.id):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if not target.openclaw_session_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Target agent has no session key",
)
message = payload.message.strip()
if not message:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="message is required",
)
config = _gateway_config(session, board)
async def _send() -> None:
await ensure_session(target.openclaw_session_id, config=config, label=target.name)
await send_message(
message,
session_key=target.openclaw_session_id,
config=config,
deliver=True,
)
try:
asyncio.run(_send())
except OpenClawGatewayError as exc:
record_activity(
session,
event_type="agent.nudge.failed",
message=f"Nudge failed for {target.name}: {exc}",
agent_id=agent_ctx.agent.id,
)
session.commit()
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
record_activity(
session,
event_type="agent.nudge.sent",
message=f"Nudge sent to {target.name}.",
agent_id=agent_ctx.agent.id,
)
session.commit()
return {"ok": True}
@router.post("/heartbeat", response_model=AgentRead)
async def agent_heartbeat(
payload: AgentHeartbeatCreate,
session: Session = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> AgentRead:
# Heartbeats must apply to the authenticated agent; agent names are not unique.
return agents_api.heartbeat_agent( # type: ignore[attr-defined]
agent_id=str(agent_ctx.agent.id),
payload=AgentHeartbeat(status=payload.status),
session=session,
actor=_actor(agent_ctx),
)

View File

@@ -1,17 +1,21 @@
from __future__ import annotations
import asyncio
import json
import re
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import update
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, or_, update
from sqlmodel import Session, col, select
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agent
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext
from app.db.session import get_session
from app.db.session import engine, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.activity_events import ActivityEvent
@@ -31,6 +35,7 @@ from app.services.agent_provisioning import (
DEFAULT_HEARTBEAT_CONFIG,
cleanup_agent,
provision_agent,
provision_main_agent,
)
router = APIRouter(prefix="/agents", tags=["agents"])
@@ -39,6 +44,22 @@ OFFLINE_AFTER = timedelta(minutes=10)
AGENT_SESSION_PREFIX = "agent"
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
def _normalize_identity_profile(
profile: dict[str, object] | None,
) -> dict[str, str] | None:
@@ -121,6 +142,35 @@ def _require_gateway(session: Session, board: Board) -> tuple[Gateway, GatewayCl
return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token)
def _gateway_client_config(gateway: Gateway) -> GatewayClientConfig:
if not gateway.url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Gateway url is required",
)
return GatewayClientConfig(url=gateway.url, token=gateway.token)
def _get_gateway_main_session_keys(session: Session) -> set[str]:
keys = session.exec(select(Gateway.main_session_key)).all()
return {key for key in keys if key}
def _is_gateway_main(agent: Agent, main_session_keys: set[str]) -> bool:
return bool(agent.openclaw_session_id and agent.openclaw_session_id in main_session_keys)
def _to_agent_read(agent: Agent, main_session_keys: set[str]) -> AgentRead:
model = AgentRead.model_validate(agent, from_attributes=True)
return model.model_copy(update={"is_gateway_main": _is_gateway_main(agent, main_session_keys)})
def _find_gateway_for_main_session(session: Session, session_key: str | None) -> Gateway | None:
if not session_key:
return None
return session.exec(select(Gateway).where(Gateway.main_session_key == session_key)).first()
async def _ensure_gateway_session(
agent_name: str,
config: GatewayClientConfig,
@@ -144,6 +194,27 @@ def _with_computed_status(agent: Agent) -> Agent:
return agent
def _serialize_agent(agent: Agent, main_session_keys: set[str]) -> dict[str, object]:
return _to_agent_read(_with_computed_status(agent), main_session_keys).model_dump(mode="json")
def _fetch_agent_events(
board_id: UUID | None,
since: datetime,
) -> list[Agent]:
with Session(engine) as session:
statement = select(Agent)
if board_id:
statement = statement.where(col(Agent.board_id) == board_id)
statement = statement.where(
or_(
col(Agent.updated_at) >= since,
col(Agent.last_seen_at) >= since,
)
).order_by(asc(col(Agent.updated_at)))
return list(session.exec(statement))
def _record_heartbeat(session: Session, agent: Agent) -> None:
record_activity(
session,
@@ -182,25 +253,84 @@ def list_agents(
auth: AuthContext = Depends(require_admin_auth),
) -> list[Agent]:
agents = list(session.exec(select(Agent)))
return [_with_computed_status(agent) for agent in agents]
main_session_keys = _get_gateway_main_session_keys(session)
return [_to_agent_read(_with_computed_status(agent), main_session_keys) for agent in agents]
@router.get("/stream")
async def stream_agents(
request: Request,
board_id: UUID | None = Query(default=None),
since: str | None = Query(default=None),
auth: AuthContext = Depends(require_admin_auth),
) -> EventSourceResponse:
since_dt = _parse_since(since) or datetime.utcnow()
last_seen = since_dt
async def event_generator():
nonlocal last_seen
while True:
if await request.is_disconnected():
break
agents = await run_in_threadpool(_fetch_agent_events, board_id, last_seen)
if agents:
with Session(engine) as session:
main_session_keys = _get_gateway_main_session_keys(session)
for agent in agents:
updated_at = agent.updated_at or agent.last_seen_at or datetime.utcnow()
if updated_at > last_seen:
last_seen = updated_at
payload = {"agent": _serialize_agent(agent, main_session_keys)}
yield {"event": "agent", "data": json.dumps(payload)}
await asyncio.sleep(2)
return EventSourceResponse(event_generator(), ping=15)
@router.post("", response_model=AgentRead)
async def create_agent(
payload: AgentCreate,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Agent:
if actor.actor_type == "agent":
if not actor.agent or not actor.agent.is_board_lead:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only board leads can create agents",
)
if not actor.agent.board_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board lead must be assigned to a board",
)
if payload.board_id and payload.board_id != actor.agent.board_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only create agents in their own board",
)
payload = AgentCreate(**{**payload.model_dump(), "board_id": actor.agent.board_id})
board = _require_board(session, payload.board_id)
gateway, client_config = _require_gateway(session, board)
data = payload.model_dump()
requested_name = (data.get("name") or "").strip()
if requested_name:
existing = session.exec(
select(Agent)
.where(Agent.board_id == board.id)
.where(col(Agent.name).ilike(requested_name))
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="An agent with this name already exists on this board.",
)
if data.get("identity_template") == "":
data["identity_template"] = None
if data.get("soul_template") == "":
data["soul_template"] = None
data["identity_profile"] = _normalize_identity_profile(
data.get("identity_profile")
)
data["identity_profile"] = _normalize_identity_profile(data.get("identity_profile"))
agent = Agent.model_validate(data)
agent.status = "provisioning"
raw_token = generate_agent_token()
@@ -230,7 +360,14 @@ async def create_agent(
)
session.commit()
try:
await provision_agent(agent, board, gateway, raw_token, auth.user, action="provision")
await provision_agent(
agent,
board,
gateway,
raw_token,
actor.user if actor.actor_type == "user" else None,
action="provision",
)
await _send_wakeup_message(agent, client_config, verb="provisioned")
agent.provision_confirm_token_hash = None
agent.provision_requested_at = None
@@ -269,13 +406,15 @@ def get_agent(
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return _with_computed_status(agent)
main_session_keys = _get_gateway_main_session_keys(session)
return _to_agent_read(_with_computed_status(agent), main_session_keys)
@router.patch("/{agent_id}", response_model=AgentRead)
async def update_agent(
agent_id: str,
payload: AgentUpdate,
force: bool = False,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Agent:
@@ -283,6 +422,7 @@ async def update_agent(
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
updates = payload.model_dump(exclude_unset=True)
make_main = updates.pop("is_gateway_main", None)
if "status" in updates:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
@@ -293,25 +433,72 @@ async def update_agent(
if updates.get("soul_template") == "":
updates["soul_template"] = None
if "identity_profile" in updates:
updates["identity_profile"] = _normalize_identity_profile(
updates.get("identity_profile")
)
if not updates:
return _with_computed_status(agent)
if "board_id" in updates:
updates["identity_profile"] = _normalize_identity_profile(updates.get("identity_profile"))
if not updates and not force and make_main is None:
main_session_keys = _get_gateway_main_session_keys(session)
return _to_agent_read(_with_computed_status(agent), main_session_keys)
main_gateway = _find_gateway_for_main_session(session, agent.openclaw_session_id)
gateway_for_main: Gateway | None = None
if make_main is True:
board_source = updates.get("board_id") or agent.board_id
board_for_main = _require_board(session, board_source)
gateway_for_main, _ = _require_gateway(session, board_for_main)
updates["board_id"] = None
agent.is_board_lead = False
agent.openclaw_session_id = gateway_for_main.main_session_key
main_gateway = gateway_for_main
elif make_main is False:
agent.openclaw_session_id = None
if make_main is not True and "board_id" in updates:
_require_board(session, updates["board_id"])
for key, value in updates.items():
setattr(agent, key, value)
if make_main is None and main_gateway is not None:
agent.board_id = None
agent.is_board_lead = False
agent.updated_at = datetime.utcnow()
if agent.heartbeat_config is None:
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
session.add(agent)
session.commit()
session.refresh(agent)
board = _require_board(session, agent.board_id)
gateway, client_config = _require_gateway(session, board)
is_main_agent = False
board: Board | None = None
gateway: Gateway | None = None
client_config: GatewayClientConfig | None = None
if make_main is True:
is_main_agent = True
gateway = gateway_for_main
elif make_main is None and agent.board_id is None and main_gateway is not None:
is_main_agent = True
gateway = main_gateway
if is_main_agent:
if gateway is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Main agent requires a gateway main_session_key",
)
if not gateway.main_session_key:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Gateway main_session_key is required",
)
client_config = _gateway_client_config(gateway)
else:
if agent.board_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="board_id is required for non-main agents",
)
board = _require_board(session, agent.board_id)
gateway, client_config = _require_gateway(session, board)
session_key = agent.openclaw_session_id or _build_session_key(agent.name)
try:
if client_config is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Gateway configuration is required",
)
await ensure_session(session_key, config=client_config, label=agent.name)
if not agent.openclaw_session_id:
agent.openclaw_session_id = session_key
@@ -330,7 +517,32 @@ async def update_agent(
session.commit()
session.refresh(agent)
try:
await provision_agent(agent, board, gateway, raw_token, auth.user, action="update")
if gateway is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Gateway configuration is required",
)
if is_main_agent:
await provision_main_agent(
agent,
gateway,
raw_token,
auth.user,
action="update",
force_bootstrap=force,
reset_session=True,
)
else:
await provision_agent(
agent,
board,
gateway,
raw_token,
auth.user,
action="update",
force_bootstrap=force,
reset_session=True,
)
await _send_wakeup_message(agent, client_config, verb="updated")
agent.provision_confirm_token_hash = None
agent.provision_requested_at = None
@@ -355,10 +567,19 @@ async def update_agent(
except OpenClawGatewayError as exc:
_record_instruction_failure(session, agent, str(exc), "update")
session.commit()
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway update failed: {exc}",
) from exc
except Exception as exc: # pragma: no cover - unexpected provisioning errors
_record_instruction_failure(session, agent, str(exc), "update")
session.commit()
return _with_computed_status(agent)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Unexpected error updating agent provisioning.",
) from exc
main_session_keys = _get_gateway_main_session_keys(session)
return _to_agent_read(_with_computed_status(agent), main_session_keys)
@router.post("/{agent_id}/heartbeat", response_model=AgentRead)
@@ -367,7 +588,7 @@ def heartbeat_agent(
payload: AgentHeartbeat,
session: Session = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Agent:
) -> AgentRead:
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@@ -383,7 +604,8 @@ def heartbeat_agent(
session.add(agent)
session.commit()
session.refresh(agent)
return _with_computed_status(agent)
main_session_keys = _get_gateway_main_session_keys(session)
return _to_agent_read(_with_computed_status(agent), main_session_keys)
@router.post("/heartbeat", response_model=AgentRead)
@@ -391,8 +613,20 @@ async def heartbeat_or_create_agent(
payload: AgentHeartbeatCreate,
session: Session = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Agent:
agent = session.exec(select(Agent).where(Agent.name == payload.name)).first()
) -> AgentRead:
# Agent tokens must heartbeat their authenticated agent record. Names are not unique.
if actor.actor_type == "agent" and actor.agent:
return heartbeat_agent(
agent_id=str(actor.agent.id),
payload=AgentHeartbeat(status=payload.status),
session=session,
actor=actor,
)
statement = select(Agent).where(Agent.name == payload.name)
if payload.board_id is not None:
statement = statement.where(Agent.board_id == payload.board_id)
agent = session.exec(statement).first()
if agent is None:
if actor.actor_type == "agent":
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@@ -528,7 +762,8 @@ async def heartbeat_or_create_agent(
session.add(agent)
session.commit()
session.refresh(agent)
return _with_computed_status(agent)
main_session_keys = _get_gateway_main_session_keys(session)
return _to_agent_read(_with_computed_status(agent), main_session_keys)
@router.delete("/{agent_id}")
@@ -590,9 +825,7 @@ def delete_agent(
)
)
session.execute(
update(ActivityEvent)
.where(col(ActivityEvent.agent_id) == agent.id)
.values(agent_id=None)
update(ActivityEvent).where(col(ActivityEvent.agent_id) == agent.id).values(agent_id=None)
)
session.delete(agent)
session.commit()

View File

@@ -0,0 +1,162 @@
from __future__ import annotations
import asyncio
import json
from datetime import datetime, timezone
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, or_
from sqlmodel import Session, col, select
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from app.api.deps import ActorContext, get_board_or_404, require_admin_auth, require_admin_or_agent
from app.db.session import engine, get_session
from app.models.approvals import Approval
from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalUpdate
router = APIRouter(prefix="/boards/{board_id}/approvals", tags=["approvals"])
ALLOWED_STATUSES = {"pending", "approved", "rejected"}
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
def _approval_updated_at(approval: Approval) -> datetime:
return approval.resolved_at or approval.created_at
def _serialize_approval(approval: Approval) -> dict[str, object]:
return ApprovalRead.model_validate(approval, from_attributes=True).model_dump(mode="json")
def _fetch_approval_events(
board_id: UUID,
since: datetime,
) -> list[Approval]:
with Session(engine) as session:
statement = (
select(Approval)
.where(col(Approval.board_id) == board_id)
.where(
or_(
col(Approval.created_at) >= since,
col(Approval.resolved_at) >= since,
)
)
.order_by(asc(col(Approval.created_at)))
)
return list(session.exec(statement))
@router.get("", response_model=list[ApprovalRead])
def list_approvals(
status_filter: str | None = Query(default=None, alias="status"),
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[Approval]:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
statement = select(Approval).where(col(Approval.board_id) == board.id)
if status_filter:
if status_filter not in ALLOWED_STATUSES:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
statement = statement.where(col(Approval.status) == status_filter)
statement = statement.order_by(col(Approval.created_at).desc())
return list(session.exec(statement))
@router.get("/stream")
async def stream_approvals(
request: Request,
board=Depends(get_board_or_404),
actor: ActorContext = Depends(require_admin_or_agent),
since: str | None = Query(default=None),
) -> EventSourceResponse:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
since_dt = _parse_since(since) or datetime.utcnow()
last_seen = since_dt
async def event_generator():
nonlocal last_seen
while True:
if await request.is_disconnected():
break
approvals = await run_in_threadpool(_fetch_approval_events, board.id, last_seen)
for approval in approvals:
updated_at = _approval_updated_at(approval)
if updated_at > last_seen:
last_seen = updated_at
payload = {"approval": _serialize_approval(approval)}
yield {"event": "approval", "data": json.dumps(payload)}
await asyncio.sleep(2)
return EventSourceResponse(event_generator(), ping=15)
@router.post("", response_model=ApprovalRead)
def create_approval(
payload: ApprovalCreate,
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Approval:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
approval = Approval(
board_id=board.id,
agent_id=payload.agent_id,
action_type=payload.action_type,
payload=payload.payload,
confidence=payload.confidence,
rubric_scores=payload.rubric_scores,
status=payload.status,
)
session.add(approval)
session.commit()
session.refresh(approval)
return approval
@router.patch("/{approval_id}", response_model=ApprovalRead)
def update_approval(
approval_id: str,
payload: ApprovalUpdate,
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
auth=Depends(require_admin_auth),
) -> Approval:
approval = session.get(Approval, approval_id)
if approval is None or approval.board_id != board.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
updates = payload.model_dump(exclude_unset=True)
if "status" in updates:
if updates["status"] not in ALLOWED_STATUSES:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
approval.status = updates["status"]
if approval.status != "pending":
approval.resolved_at = datetime.utcnow()
session.add(approval)
session.commit()
session.refresh(approval)
return approval

View File

@@ -0,0 +1,238 @@
from __future__ import annotations
import asyncio
import json
import re
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlmodel import Session, col, select
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from app.api.deps import ActorContext, get_board_or_404, require_admin_or_agent
from app.core.config import settings
from app.db.session import engine, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.agents import Agent
from app.models.board_memory import BoardMemory
from app.models.gateways import Gateway
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
router = APIRouter(prefix="/boards/{board_id}/memory", tags=["board-memory"])
MENTION_PATTERN = re.compile(r"@([A-Za-z][\w-]{0,31})")
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
def _serialize_memory(memory: BoardMemory) -> dict[str, object]:
return BoardMemoryRead.model_validate(memory, from_attributes=True).model_dump(mode="json")
def _extract_mentions(message: str) -> set[str]:
return {match.group(1).lower() for match in MENTION_PATTERN.finditer(message)}
def _matches_mention(agent: Agent, mentions: set[str]) -> bool:
if not mentions:
return False
name = (agent.name or "").strip()
if not name:
return False
normalized = name.lower()
if normalized in mentions:
return True
first = normalized.split()[0]
return first in mentions
def _gateway_config(session: Session, board) -> GatewayClientConfig | None:
if not board.gateway_id:
return None
gateway = session.get(Gateway, board.gateway_id)
if gateway is None or not gateway.url:
return None
return GatewayClientConfig(url=gateway.url, token=gateway.token)
async def _send_agent_message(
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
) -> None:
await ensure_session(session_key, config=config, label=agent_name)
await send_message(message, session_key=session_key, config=config, deliver=False)
def _fetch_memory_events(
board_id,
since: datetime,
) -> list[BoardMemory]:
with Session(engine) as session:
statement = (
select(BoardMemory)
.where(col(BoardMemory.board_id) == board_id)
.where(col(BoardMemory.created_at) >= since)
.order_by(col(BoardMemory.created_at))
)
return list(session.exec(statement))
def _notify_chat_targets(
*,
session: Session,
board,
memory: BoardMemory,
actor: ActorContext,
) -> None:
if not memory.content:
return
config = _gateway_config(session, board)
if config is None:
return
mentions = _extract_mentions(memory.content)
statement = select(Agent).where(col(Agent.board_id) == board.id)
targets: dict[str, Agent] = {}
for agent in session.exec(statement):
if agent.is_board_lead:
targets[str(agent.id)] = agent
continue
if mentions and _matches_mention(agent, mentions):
targets[str(agent.id)] = agent
if actor.actor_type == "agent" and actor.agent:
targets.pop(str(actor.agent.id), None)
if not targets:
return
actor_name = "User"
if actor.actor_type == "agent" and actor.agent:
actor_name = actor.agent.name
elif actor.user:
actor_name = actor.user.preferred_name or actor.user.name or actor_name
snippet = memory.content.strip()
if len(snippet) > 800:
snippet = f"{snippet[:797]}..."
base_url = settings.base_url or "http://localhost:8000"
for agent in targets.values():
if not agent.openclaw_session_id:
continue
mentioned = _matches_mention(agent, mentions)
header = "BOARD CHAT MENTION" if mentioned else "BOARD CHAT"
message = (
f"{header}\n"
f"Board: {board.name}\n"
f"From: {actor_name}\n\n"
f"{snippet}\n\n"
"Reply via board chat:\n"
f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n"
'Body: {"content":"...","tags":["chat"]}'
)
try:
asyncio.run(
_send_agent_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
message=message,
)
)
except OpenClawGatewayError:
continue
@router.get("", response_model=list[BoardMemoryRead])
def list_board_memory(
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[BoardMemory]:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
statement = (
select(BoardMemory)
.where(col(BoardMemory.board_id) == board.id)
.order_by(col(BoardMemory.created_at).desc())
.offset(offset)
.limit(limit)
)
return list(session.exec(statement))
@router.get("/stream")
async def stream_board_memory(
request: Request,
board=Depends(get_board_or_404),
actor: ActorContext = Depends(require_admin_or_agent),
since: str | None = Query(default=None),
) -> EventSourceResponse:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
since_dt = _parse_since(since) or datetime.utcnow()
last_seen = since_dt
async def event_generator():
nonlocal last_seen
while True:
if await request.is_disconnected():
break
memories = await run_in_threadpool(_fetch_memory_events, board.id, last_seen)
for memory in memories:
if memory.created_at > last_seen:
last_seen = memory.created_at
payload = {"memory": _serialize_memory(memory)}
yield {"event": "memory", "data": json.dumps(payload)}
await asyncio.sleep(2)
return EventSourceResponse(event_generator(), ping=15)
@router.post("", response_model=BoardMemoryRead)
def create_board_memory(
payload: BoardMemoryCreate,
board=Depends(get_board_or_404),
session: Session = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> BoardMemory:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
is_chat = payload.tags is not None and "chat" in payload.tags
source = payload.source
if is_chat and not source:
if actor.actor_type == "agent" and actor.agent:
source = actor.agent.name
elif actor.user:
source = actor.user.preferred_name or actor.user.name or "User"
memory = BoardMemory(
board_id=board.id,
content=payload.content,
tags=payload.tags,
source=source,
)
session.add(memory)
session.commit()
session.refresh(memory)
if is_chat:
_notify_chat_targets(session=session, board=board, memory=memory, actor=actor)
return memory

View File

@@ -0,0 +1,332 @@
from __future__ import annotations
import json
import logging
import re
from datetime import datetime
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from app.api.deps import ActorContext, get_board_or_404, require_admin_auth, require_admin_or_agent
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext
from app.core.config import settings
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.agents import Agent
from app.models.board_onboarding import BoardOnboardingSession
from app.models.boards import Board
from app.models.gateways import Gateway
from app.schemas.board_onboarding import (
BoardOnboardingAnswer,
BoardOnboardingConfirm,
BoardOnboardingRead,
BoardOnboardingStart,
)
from app.schemas.boards import BoardRead
from app.services.agent_provisioning import DEFAULT_HEARTBEAT_CONFIG, provision_agent
router = APIRouter(prefix="/boards/{board_id}/onboarding", tags=["board-onboarding"])
logger = logging.getLogger(__name__)
def _gateway_config(session: Session, board: Board) -> tuple[Gateway, GatewayClientConfig]:
if not board.gateway_id:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
gateway = session.get(Gateway, board.gateway_id)
if gateway is None or not gateway.url or not gateway.main_session_key:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token)
def _build_session_key(agent_name: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", agent_name.lower()).strip("-")
return f"agent:{slug or uuid4().hex}:main"
def _lead_agent_name(board: Board) -> str:
return "Lead Agent"
def _lead_session_key(board: Board) -> str:
return f"agent:lead-{board.id}:main"
async def _ensure_lead_agent(
session: Session,
board: Board,
gateway: Gateway,
config: GatewayClientConfig,
auth: AuthContext,
) -> Agent:
existing = session.exec(
select(Agent).where(Agent.board_id == board.id).where(Agent.is_board_lead.is_(True))
).first()
if existing:
if existing.name != _lead_agent_name(board):
existing.name = _lead_agent_name(board)
session.add(existing)
session.commit()
session.refresh(existing)
return existing
agent = Agent(
name=_lead_agent_name(board),
status="provisioning",
board_id=board.id,
is_board_lead=True,
heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(),
identity_profile={
"role": "Board Lead",
"communication_style": "direct, concise, practical",
"emoji": ":gear:",
},
)
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
agent.provision_requested_at = datetime.utcnow()
agent.provision_action = "provision"
agent.openclaw_session_id = _lead_session_key(board)
session.add(agent)
session.commit()
session.refresh(agent)
try:
await provision_agent(agent, board, gateway, raw_token, auth.user, action="provision")
await ensure_session(agent.openclaw_session_id, config=config, label=agent.name)
await send_message(
(
f"Hello {agent.name}. Your workspace has been provisioned.\n\n"
"Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once "
"then delete it. Begin heartbeats after startup."
),
session_key=agent.openclaw_session_id,
config=config,
deliver=True,
)
except OpenClawGatewayError:
# Best-effort provisioning. Board confirmation should still succeed.
pass
return agent
@router.get("", response_model=BoardOnboardingRead)
def get_onboarding(
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> BoardOnboardingSession:
onboarding = session.exec(
select(BoardOnboardingSession)
.where(BoardOnboardingSession.board_id == board.id)
.order_by(BoardOnboardingSession.created_at.desc())
).first()
if onboarding is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return onboarding
@router.post("/start", response_model=BoardOnboardingRead)
async def start_onboarding(
payload: BoardOnboardingStart,
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> BoardOnboardingSession:
onboarding = session.exec(
select(BoardOnboardingSession)
.where(BoardOnboardingSession.board_id == board.id)
.where(BoardOnboardingSession.status == "active")
).first()
if onboarding:
return onboarding
gateway, config = _gateway_config(session, board)
session_key = gateway.main_session_key
base_url = settings.base_url or "http://localhost:8000"
prompt = (
"BOARD ONBOARDING REQUEST\n\n"
f"Board Name: {board.name}\n"
"You are the main agent. Ask the user 3-6 focused questions to clarify their goal.\n"
"Do NOT respond in OpenClaw chat.\n"
"All onboarding responses MUST be sent to Mission Control via API.\n"
f"Mission Control base URL: {base_url}\n"
"Use the AUTH_TOKEN from USER.md or TOOLS.md and pass it as X-Agent-Token.\n"
"Onboarding response endpoint:\n"
f"POST {base_url}/api/v1/agent/boards/{board.id}/onboarding\n"
"QUESTION example (send JSON body exactly as shown):\n"
f'curl -s -X POST "{base_url}/api/v1/agent/boards/{board.id}/onboarding" '
'-H "X-Agent-Token: $AUTH_TOKEN" '
'-H "Content-Type: application/json" '
'-d \'{"question":"...","options":[{"id":"1","label":"..."},{"id":"2","label":"..."}]}\'\n'
"COMPLETION example (send JSON body exactly as shown):\n"
f'curl -s -X POST "{base_url}/api/v1/agent/boards/{board.id}/onboarding" '
'-H "X-Agent-Token: $AUTH_TOKEN" '
'-H "Content-Type: application/json" '
'-d \'{"status":"complete","board_type":"goal","objective":"...","success_metrics":{...},"target_date":"YYYY-MM-DD"}\'\n'
"QUESTION FORMAT (one question per response, no arrays, no markdown, no extra text):\n"
'{"question":"...","options":[{"id":"1","label":"..."},{"id":"2","label":"..."}]}\n'
"Do NOT wrap questions in a list. Do NOT add commentary.\n"
"When you have enough info, return JSON ONLY (via API):\n"
'{"status":"complete","board_type":"goal"|"general","objective":"...",'
'"success_metrics":{...},"target_date":"YYYY-MM-DD"}.'
)
try:
await ensure_session(session_key, config=config, label="Main Agent")
await send_message(prompt, session_key=session_key, config=config, deliver=False)
except OpenClawGatewayError as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
onboarding = BoardOnboardingSession(
board_id=board.id,
session_key=session_key,
status="active",
messages=[{"role": "user", "content": prompt, "timestamp": datetime.utcnow().isoformat()}],
)
session.add(onboarding)
session.commit()
session.refresh(onboarding)
return onboarding
@router.post("/answer", response_model=BoardOnboardingRead)
async def answer_onboarding(
payload: BoardOnboardingAnswer,
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> BoardOnboardingSession:
onboarding = session.exec(
select(BoardOnboardingSession)
.where(BoardOnboardingSession.board_id == board.id)
.order_by(BoardOnboardingSession.created_at.desc())
).first()
if onboarding is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
_, config = _gateway_config(session, board)
answer_text = payload.answer
if payload.other_text:
answer_text = f"{payload.answer}: {payload.other_text}"
messages = list(onboarding.messages or [])
messages.append(
{"role": "user", "content": answer_text, "timestamp": datetime.utcnow().isoformat()}
)
try:
await ensure_session(onboarding.session_key, config=config, label="Main Agent")
await send_message(
answer_text, session_key=onboarding.session_key, config=config, deliver=False
)
except OpenClawGatewayError as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
onboarding.messages = messages
onboarding.updated_at = datetime.utcnow()
session.add(onboarding)
session.commit()
session.refresh(onboarding)
return onboarding
@router.post("/agent", response_model=BoardOnboardingRead)
def agent_onboarding_update(
payload: dict[str, object],
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> BoardOnboardingSession:
if actor.actor_type != "agent" or actor.agent is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
agent = actor.agent
if agent.board_id is not None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if board.gateway_id:
gateway = session.get(Gateway, board.gateway_id)
if gateway and gateway.main_session_key and agent.openclaw_session_id:
if agent.openclaw_session_id != gateway.main_session_key:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
onboarding = session.exec(
select(BoardOnboardingSession)
.where(BoardOnboardingSession.board_id == board.id)
.order_by(BoardOnboardingSession.created_at.desc())
).first()
if onboarding is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if onboarding.status == "confirmed":
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
messages = list(onboarding.messages or [])
now = datetime.utcnow().isoformat()
payload_text = json.dumps(payload)
logger.info(
"onboarding.agent.update board_id=%s agent_id=%s payload=%s",
board.id,
agent.id,
payload_text,
)
payload_status = payload.get("status")
if payload_status == "complete":
onboarding.draft_goal = payload
onboarding.status = "completed"
messages.append({"role": "assistant", "content": payload_text, "timestamp": now})
else:
question = payload.get("question")
options = payload.get("options")
if not isinstance(question, str) or not isinstance(options, list):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
messages.append({"role": "assistant", "content": payload_text, "timestamp": now})
onboarding.messages = messages
onboarding.updated_at = datetime.utcnow()
session.add(onboarding)
session.commit()
session.refresh(onboarding)
logger.info(
"onboarding.agent.update stored board_id=%s messages_count=%s status=%s",
board.id,
len(onboarding.messages or []),
onboarding.status,
)
return onboarding
@router.post("/confirm", response_model=BoardRead)
async def confirm_onboarding(
payload: BoardOnboardingConfirm,
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Board:
onboarding = session.exec(
select(BoardOnboardingSession)
.where(BoardOnboardingSession.board_id == board.id)
.order_by(BoardOnboardingSession.created_at.desc())
).first()
if onboarding is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
board.board_type = payload.board_type
board.objective = payload.objective
board.success_metrics = payload.success_metrics
board.target_date = payload.target_date
board.goal_confirmed = True
board.goal_source = "lead_agent_onboarding"
onboarding.status = "confirmed"
onboarding.updated_at = datetime.utcnow()
gateway, config = _gateway_config(session, board)
session.add(board)
session.add(onboarding)
session.commit()
session.refresh(board)
await _ensure_lead_agent(session, board, gateway, config, auth)
return board

View File

@@ -20,8 +20,12 @@ from app.integrations.openclaw_gateway import (
)
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.board_memory import BoardMemory
from app.models.board_onboarding import BoardOnboardingSession
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
@@ -161,6 +165,14 @@ def update_board(
)
for key, value in updates.items():
setattr(board, key, value)
if updates.get("board_type") == "goal":
objective = updates.get("objective") or board.objective
metrics = updates.get("success_metrics") or board.success_metrics
if not objective or not metrics:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Goal boards require objective and success_metrics",
)
if not board.gateway_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
@@ -194,10 +206,16 @@ def delete_board(
if task_ids:
session.execute(delete(ActivityEvent).where(col(ActivityEvent.task_id).in_(task_ids)))
session.execute(delete(TaskFingerprint).where(col(TaskFingerprint.board_id) == board.id))
if agents:
agent_ids = [agent.id for agent in agents]
session.execute(delete(ActivityEvent).where(col(ActivityEvent.agent_id).in_(agent_ids)))
session.execute(delete(Agent).where(col(Agent.id).in_(agent_ids)))
session.execute(delete(Approval).where(col(Approval.board_id) == board.id))
session.execute(delete(BoardMemory).where(col(BoardMemory.board_id) == board.id))
session.execute(
delete(BoardOnboardingSession).where(col(BoardOnboardingSession.board_id) == board.id)
)
session.execute(delete(Task).where(col(Task.board_id) == board.id))
session.delete(board)
session.commit()

View File

@@ -1,16 +1,20 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext, get_auth_context
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.agents import Agent
from app.models.gateways import Gateway
from app.schemas.gateways import GatewayCreate, GatewayRead, GatewayUpdate
from app.services.agent_provisioning import DEFAULT_HEARTBEAT_CONFIG, provision_main_agent
router = APIRouter(prefix="/gateways", tags=["gateways"])
@@ -227,6 +231,100 @@ rm -rf ~/.openclaw/skills/skyll
""".strip()
def _main_agent_name(gateway: Gateway) -> str:
return f"{gateway.name} Main"
def _find_main_agent(
session: Session,
gateway: Gateway,
previous_name: str | None = None,
previous_session_key: str | None = None,
) -> Agent | None:
if gateway.main_session_key:
agent = session.exec(
select(Agent).where(Agent.openclaw_session_id == gateway.main_session_key)
).first()
if agent:
return agent
if previous_session_key:
agent = session.exec(
select(Agent).where(Agent.openclaw_session_id == previous_session_key)
).first()
if agent:
return agent
names = {_main_agent_name(gateway)}
if previous_name:
names.add(f"{previous_name} Main")
for name in names:
agent = session.exec(select(Agent).where(Agent.name == name)).first()
if agent:
return agent
return None
async def _ensure_main_agent(
session: Session,
gateway: Gateway,
auth: AuthContext,
*,
previous_name: str | None = None,
previous_session_key: str | None = None,
action: str = "provision",
) -> Agent | None:
if not gateway.url or not gateway.main_session_key:
return None
agent = _find_main_agent(session, gateway, previous_name, previous_session_key)
if agent is None:
agent = Agent(
name=_main_agent_name(gateway),
status="provisioning",
board_id=None,
is_board_lead=False,
openclaw_session_id=gateway.main_session_key,
heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(),
identity_profile={
"role": "Main Agent",
"communication_style": "direct, concise, practical",
"emoji": ":compass:",
},
)
session.add(agent)
agent.name = _main_agent_name(gateway)
agent.openclaw_session_id = gateway.main_session_key
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
agent.provision_requested_at = datetime.utcnow()
agent.provision_action = action
agent.updated_at = datetime.utcnow()
if agent.heartbeat_config is None:
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
session.add(agent)
session.commit()
session.refresh(agent)
try:
await provision_main_agent(agent, gateway, raw_token, auth.user, action=action)
await ensure_session(
gateway.main_session_key,
config=GatewayClientConfig(url=gateway.url, token=gateway.token),
label=agent.name,
)
await send_message(
(
f"Hello {agent.name}. Your gateway provisioning was updated.\n\n"
"Please re-read AGENTS.md, USER.md, HEARTBEAT.md, and TOOLS.md. "
"If BOOTSTRAP.md exists, run it once then delete it. Begin heartbeats after startup."
),
session_key=gateway.main_session_key,
config=GatewayClientConfig(url=gateway.url, token=gateway.token),
deliver=True,
)
except OpenClawGatewayError:
# Best-effort provisioning.
pass
return agent
async def _send_skyll_enable_message(gateway: Gateway) -> None:
if not gateway.url:
raise OpenClawGatewayError("Gateway url is required")
@@ -278,6 +376,7 @@ async def create_gateway(
session.add(gateway)
session.commit()
session.refresh(gateway)
await _ensure_main_agent(session, gateway, auth, action="provision")
if gateway.skyll_enabled:
try:
await _send_skyll_enable_message(gateway)
@@ -308,6 +407,8 @@ async def update_gateway(
gateway = session.get(Gateway, gateway_id)
if gateway is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Gateway not found")
previous_name = gateway.name
previous_session_key = gateway.main_session_key
previous_skyll_enabled = gateway.skyll_enabled
updates = payload.model_dump(exclude_unset=True)
if updates.get("token") == "":
@@ -317,6 +418,14 @@ async def update_gateway(
session.add(gateway)
session.commit()
session.refresh(gateway)
await _ensure_main_agent(
session,
gateway,
auth,
previous_name=previous_name,
previous_session_key=previous_session_key,
action="update",
)
if not previous_skyll_enabled and gateway.skyll_enabled:
try:
await _send_skyll_enable_message(gateway)

View File

@@ -1,16 +1,17 @@
from __future__ import annotations
from datetime import datetime, timezone
import asyncio
import json
import re
from collections import deque
from datetime import datetime, timezone
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, delete, desc
from sqlmodel import Session, col, select
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from sqlalchemy import asc, desc
from sqlmodel import Session, col, select
from app.api.deps import (
ActorContext,
@@ -21,9 +22,13 @@ from app.api.deps import (
)
from app.core.auth import AuthContext
from app.db.session import engine, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
@@ -38,6 +43,7 @@ TASK_EVENT_TYPES = {
"task.comment",
}
SSE_SEEN_MAX = 2000
MENTION_PATTERN = re.compile(r"@([A-Za-z][\w-]{0,31})")
def validate_task_status(status_value: str) -> None:
@@ -93,14 +99,55 @@ def _parse_since(value: str | None) -> datetime | None:
return parsed
def _extract_mentions(message: str) -> set[str]:
return {match.group(1).lower() for match in MENTION_PATTERN.finditer(message)}
def _matches_mention(agent: Agent, mentions: set[str]) -> bool:
if not mentions:
return False
name = (agent.name or "").strip()
if not name:
return False
normalized = name.lower()
if normalized in mentions:
return True
first = normalized.split()[0]
return first in mentions
def _lead_was_mentioned(
session: Session,
task: Task,
lead: Agent,
) -> bool:
statement = (
select(ActivityEvent.message)
.where(col(ActivityEvent.task_id) == task.id)
.where(col(ActivityEvent.event_type) == "task.comment")
.order_by(desc(col(ActivityEvent.created_at)))
)
for message in session.exec(statement):
if not message:
continue
mentions = _extract_mentions(message)
if _matches_mention(lead, mentions):
return True
return False
def _lead_created_task(task: Task, lead: Agent) -> bool:
if not task.auto_created or not task.auto_reason:
return False
return task.auto_reason == f"lead_agent:{lead.id}"
def _fetch_task_events(
board_id: UUID,
since: datetime,
) -> list[tuple[ActivityEvent, Task | None]]:
with Session(engine) as session:
task_ids = list(
session.exec(select(Task.id).where(col(Task.board_id) == board_id))
)
task_ids = list(session.exec(select(Task.id).where(col(Task.board_id) == board_id)))
if not task_ids:
return []
statement = (
@@ -124,6 +171,206 @@ def _serialize_comment(event: ActivityEvent) -> dict[str, object]:
return TaskCommentRead.model_validate(event).model_dump(mode="json")
def _gateway_config(session: Session, board: Board) -> GatewayClientConfig | None:
if not board.gateway_id:
return None
gateway = session.get(Gateway, board.gateway_id)
if gateway is None or not gateway.url:
return None
return GatewayClientConfig(url=gateway.url, token=gateway.token)
async def _send_lead_task_message(
*,
session_key: str,
config: GatewayClientConfig,
message: str,
) -> None:
await ensure_session(session_key, config=config, label="Lead Agent")
await send_message(message, session_key=session_key, config=config, deliver=False)
async def _send_agent_task_message(
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
) -> None:
await ensure_session(session_key, config=config, label=agent_name)
await send_message(message, session_key=session_key, config=config, deliver=False)
def _notify_agent_on_task_assign(
*,
session: Session,
board: Board,
task: Task,
agent: Agent,
) -> None:
if not agent.openclaw_session_id:
return
config = _gateway_config(session, board)
if config is None:
return
description = (task.description or "").strip()
if len(description) > 500:
description = f"{description[:497]}..."
details = [
f"Board: {board.name}",
f"Task: {task.title}",
f"Task ID: {task.id}",
f"Status: {task.status}",
]
if description:
details.append(f"Description: {description}")
message = (
"TASK ASSIGNED\n"
+ "\n".join(details)
+ "\n\nTake action: open the task and begin work. Post updates as task comments."
)
try:
asyncio.run(
_send_agent_task_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
message=message,
)
)
record_activity(
session,
event_type="task.assignee_notified",
message=f"Agent notified for assignment: {agent.name}.",
agent_id=agent.id,
task_id=task.id,
)
session.commit()
except OpenClawGatewayError as exc:
record_activity(
session,
event_type="task.assignee_notify_failed",
message=f"Assignee notify failed: {exc}",
agent_id=agent.id,
task_id=task.id,
)
session.commit()
def _notify_lead_on_task_create(
*,
session: Session,
board: Board,
task: Task,
) -> None:
lead = session.exec(
select(Agent).where(Agent.board_id == board.id).where(Agent.is_board_lead.is_(True))
).first()
if lead is None or not lead.openclaw_session_id:
return
config = _gateway_config(session, board)
if config is None:
return
description = (task.description or "").strip()
if len(description) > 500:
description = f"{description[:497]}..."
details = [
f"Board: {board.name}",
f"Task: {task.title}",
f"Task ID: {task.id}",
f"Status: {task.status}",
]
if description:
details.append(f"Description: {description}")
message = (
"NEW TASK ADDED\n"
+ "\n".join(details)
+ "\n\nTake action: triage, assign, or plan next steps."
)
try:
asyncio.run(
_send_lead_task_message(
session_key=lead.openclaw_session_id,
config=config,
message=message,
)
)
record_activity(
session,
event_type="task.lead_notified",
message=f"Lead agent notified for task: {task.title}.",
agent_id=lead.id,
task_id=task.id,
)
session.commit()
except OpenClawGatewayError as exc:
record_activity(
session,
event_type="task.lead_notify_failed",
message=f"Lead notify failed: {exc}",
agent_id=lead.id,
task_id=task.id,
)
session.commit()
def _notify_lead_on_task_unassigned(
*,
session: Session,
board: Board,
task: Task,
) -> None:
lead = session.exec(
select(Agent).where(Agent.board_id == board.id).where(Agent.is_board_lead.is_(True))
).first()
if lead is None or not lead.openclaw_session_id:
return
config = _gateway_config(session, board)
if config is None:
return
description = (task.description or "").strip()
if len(description) > 500:
description = f"{description[:497]}..."
details = [
f"Board: {board.name}",
f"Task: {task.title}",
f"Task ID: {task.id}",
f"Status: {task.status}",
]
if description:
details.append(f"Description: {description}")
message = (
"TASK BACK IN INBOX\n"
+ "\n".join(details)
+ "\n\nTake action: assign a new owner or adjust the plan."
)
try:
asyncio.run(
_send_lead_task_message(
session_key=lead.openclaw_session_id,
config=config,
message=message,
)
)
record_activity(
session,
event_type="task.lead_unassigned_notified",
message=f"Lead notified task returned to inbox: {task.title}.",
agent_id=lead.id,
task_id=task.id,
)
session.commit()
except OpenClawGatewayError as exc:
record_activity(
session,
event_type="task.lead_unassigned_notify_failed",
message=f"Lead notify failed: {exc}",
agent_id=lead.id,
task_id=task.id,
)
session.commit()
@router.get("/stream")
async def stream_tasks(
request: Request,
@@ -214,6 +461,16 @@ def create_task(
message=f"Task created: {task.title}.",
)
session.commit()
_notify_lead_on_task_create(session=session, board=board, task=task)
if task.assigned_agent_id:
assigned_agent = session.get(Agent, task.assigned_agent_id)
if assigned_agent:
_notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
return task
@@ -225,10 +482,83 @@ def update_task(
actor: ActorContext = Depends(require_admin_or_agent),
) -> Task:
previous_status = task.status
previous_assigned = task.assigned_agent_id
updates = payload.model_dump(exclude_unset=True)
comment = updates.pop("comment", None)
if comment is not None and not comment.strip():
comment = None
if actor.actor_type == "agent" and actor.agent and actor.agent.is_board_lead:
allowed_fields = {"assigned_agent_id", "status"}
if comment is not None or not set(updates).issubset(allowed_fields):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only assign or unassign tasks.",
)
if "assigned_agent_id" in updates:
assigned_id = updates["assigned_agent_id"]
if assigned_id:
agent = session.get(Agent, assigned_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if agent.is_board_lead:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads cannot assign tasks to themselves.",
)
if agent.board_id and task.board_id and agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
task.assigned_agent_id = agent.id
else:
task.assigned_agent_id = None
if "status" in updates:
validate_task_status(updates["status"])
if task.status != "review":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only change status when a task is in review.",
)
if updates["status"] not in {"done", "inbox"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only move review tasks to done or inbox.",
)
if updates["status"] == "inbox":
task.assigned_agent_id = None
task.in_progress_at = None
task.status = updates["status"]
task.updated_at = datetime.utcnow()
session.add(task)
if task.status != previous_status:
event_type = "task.status_changed"
message = f"Task moved to {task.status}: {task.title}."
else:
event_type = "task.updated"
message = f"Task updated: {task.title}."
record_activity(
session,
event_type=event_type,
task_id=task.id,
message=message,
agent_id=actor.agent.id,
)
session.commit()
session.refresh(task)
if task.assigned_agent_id and task.assigned_agent_id != previous_assigned:
if actor.actor_type == "agent" and actor.agent and task.assigned_agent_id == actor.agent.id:
return task
assigned_agent = session.get(Agent, task.assigned_agent_id)
if assigned_agent:
board = session.get(Board, task.board_id) if task.board_id else None
if board:
_notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
return task
if actor.actor_type == "agent":
if actor.agent and actor.agent.board_id and task.board_id:
if actor.agent.board_id != task.board_id:
@@ -303,6 +633,28 @@ def update_task(
agent_id=actor.agent.id if actor.actor_type == "agent" and actor.agent else None,
)
session.commit()
if task.status == "inbox" and task.assigned_agent_id is None:
if previous_status != "inbox" or previous_assigned is not None:
board = session.get(Board, task.board_id) if task.board_id else None
if board:
_notify_lead_on_task_unassigned(
session=session,
board=board,
task=task,
)
if task.assigned_agent_id and task.assigned_agent_id != previous_assigned:
if actor.actor_type == "agent" and actor.agent and task.assigned_agent_id == actor.agent.id:
return task
assigned_agent = session.get(Agent, task.assigned_agent_id)
if assigned_agent:
board = session.get(Board, task.board_id) if task.board_id else None
if board:
_notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
return task
@@ -312,6 +664,8 @@ def delete_task(
task: Task = Depends(get_task_or_404),
auth: AuthContext = Depends(require_admin_auth),
) -> dict[str, bool]:
session.execute(delete(ActivityEvent).where(col(ActivityEvent.task_id) == task.id))
session.execute(delete(TaskFingerprint).where(col(TaskFingerprint.task_id) == task.id))
session.delete(task)
session.commit()
return {"ok": True}
@@ -343,6 +697,16 @@ def create_task_comment(
actor: ActorContext = Depends(require_admin_or_agent),
) -> ActivityEvent:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.is_board_lead and task.status != "review":
if not _lead_was_mentioned(session, task, actor.agent) and not _lead_created_task(
task, actor.agent
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Board leads can only comment during review, when mentioned, or on tasks they created."
),
)
if actor.agent.board_id and task.board_id and actor.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if not payload.message.strip():
@@ -356,4 +720,56 @@ def create_task_comment(
session.add(event)
session.commit()
session.refresh(event)
mention_names = _extract_mentions(payload.message)
targets: dict[UUID, Agent] = {}
if mention_names and task.board_id:
statement = select(Agent).where(col(Agent.board_id) == task.board_id)
for agent in session.exec(statement):
if _matches_mention(agent, mention_names):
targets[agent.id] = agent
if not mention_names and task.assigned_agent_id:
assigned_agent = session.get(Agent, task.assigned_agent_id)
if assigned_agent:
targets[assigned_agent.id] = assigned_agent
if actor.actor_type == "agent" and actor.agent:
targets.pop(actor.agent.id, None)
if targets:
board = session.get(Board, task.board_id) if task.board_id else None
config = _gateway_config(session, board) if board else None
if board and config:
snippet = payload.message.strip()
if len(snippet) > 500:
snippet = f"{snippet[:497]}..."
actor_name = actor.agent.name if actor.actor_type == "agent" and actor.agent else "User"
for agent in targets.values():
if not agent.openclaw_session_id:
continue
mentioned = _matches_mention(agent, mention_names)
header = "TASK MENTION" if mentioned else "NEW TASK COMMENT"
action_line = (
"You were mentioned in this comment."
if mentioned
else "A new comment was posted on your task."
)
message = (
f"{header}\n"
f"Board: {board.name}\n"
f"Task: {task.title}\n"
f"Task ID: {task.id}\n"
f"From: {actor_name}\n\n"
f"{action_line}\n\n"
f"Comment:\n{snippet}\n\n"
"If you are mentioned but not assigned, reply in the task thread but do not change task status."
)
try:
asyncio.run(
_send_agent_task_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
message=message,
)
)
except OpenClawGatewayError:
pass
return event

View File

@@ -1,15 +1,18 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Literal
from fastapi import Depends, Header, HTTPException, status
from fastapi import Depends, Header, HTTPException, Request, status
from sqlmodel import Session, col, select
from app.core.agent_tokens import verify_agent_token
from app.db.session import get_session
from app.models.agents import Agent
logger = logging.getLogger(__name__)
@dataclass
class AgentAuthContext:
@@ -25,25 +28,78 @@ def _find_agent_for_token(session: Session, token: str) -> Agent | None:
return None
def _resolve_agent_token(
agent_token: str | None,
authorization: str | None,
*,
accept_authorization: bool = True,
) -> str | None:
if agent_token:
return agent_token
if not accept_authorization:
return None
if not authorization:
return None
value = authorization.strip()
if not value:
return None
if value.lower().startswith("bearer "):
return value.split(" ", 1)[1].strip() or None
return None
def get_agent_auth_context(
request: Request,
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
authorization: str | None = Header(default=None, alias="Authorization"),
session: Session = Depends(get_session),
) -> AgentAuthContext:
if not agent_token:
resolved = _resolve_agent_token(agent_token, authorization, accept_authorization=True)
if not resolved:
logger.warning(
"agent auth missing token path=%s x_agent=%s authorization=%s",
request.url.path,
bool(agent_token),
bool(authorization),
)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
agent = _find_agent_for_token(session, agent_token)
agent = _find_agent_for_token(session, resolved)
if agent is None:
logger.warning(
"agent auth invalid token path=%s token_prefix=%s",
request.url.path,
resolved[:6],
)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return AgentAuthContext(actor_type="agent", agent=agent)
def get_agent_auth_context_optional(
request: Request,
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
authorization: str | None = Header(default=None, alias="Authorization"),
session: Session = Depends(get_session),
) -> AgentAuthContext | None:
if not agent_token:
resolved = _resolve_agent_token(
agent_token,
authorization,
accept_authorization=False,
)
if not resolved:
if agent_token:
logger.warning(
"agent auth optional missing token path=%s x_agent=%s authorization=%s",
request.url.path,
bool(agent_token),
bool(authorization),
)
return None
agent = _find_agent_for_token(session, agent_token)
agent = _find_agent_for_token(session, resolved)
if agent is None:
logger.warning(
"agent auth optional invalid token path=%s token_prefix=%s",
request.url.path,
resolved[:6],
)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return AgentAuthContext(actor_type="agent", agent=agent)

View File

@@ -112,17 +112,17 @@ async def get_auth_context_optional(
clerk_credentials = await guard(request)
except (RuntimeError, ValueError) as exc:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) from exc
except HTTPException as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from exc
except HTTPException:
return None
auth_data = _resolve_clerk_auth(request, clerk_credentials)
try:
clerk_user_id = _parse_subject(auth_data)
except ValidationError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from exc
except ValidationError:
return None
if not clerk_user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return None
user = session.exec(select(User).where(User.clerk_user_id == clerk_user_id)).first()
if user is None:

View File

@@ -31,5 +31,4 @@ class Settings(BaseSettings):
log_use_utc: bool = False
settings = Settings()

View File

@@ -4,8 +4,12 @@ from fastapi import APIRouter, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.activity import router as activity_router
from app.api.agent import router as agent_router
from app.api.agents import router as agents_router
from app.api.approvals import router as approvals_router
from app.api.auth import router as auth_router
from app.api.board_memory import router as board_memory_router
from app.api.board_onboarding import router as board_onboarding_router
from app.api.boards import router as boards_router
from app.api.gateway import router as gateway_router
from app.api.gateways import router as gateways_router
@@ -53,12 +57,16 @@ def readyz() -> dict[str, bool]:
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(auth_router)
api_v1.include_router(agent_router)
api_v1.include_router(agents_router)
api_v1.include_router(activity_router)
api_v1.include_router(gateway_router)
api_v1.include_router(gateways_router)
api_v1.include_router(metrics_router)
api_v1.include_router(boards_router)
api_v1.include_router(board_memory_router)
api_v1.include_router(board_onboarding_router)
api_v1.include_router(approvals_router)
api_v1.include_router(tasks_router)
api_v1.include_router(users_router)
app.include_router(api_v1)

View File

@@ -1,15 +1,23 @@
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.board_memory import BoardMemory
from app.models.board_onboarding import BoardOnboardingSession
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
from app.models.users import User
__all__ = [
"ActivityEvent",
"Agent",
"Approval",
"BoardMemory",
"BoardOnboardingSession",
"Board",
"Gateway",
"Task",
"TaskFingerprint",
"User",
]

View File

@@ -27,5 +27,6 @@ class Agent(SQLModel, table=True):
delete_requested_at: datetime | None = Field(default=None)
delete_confirm_token_hash: str | None = Field(default=None, index=True)
last_seen_at: datetime | None = Field(default=None)
is_board_lead: bool = Field(default=False, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class Approval(SQLModel, table=True):
__tablename__ = "approvals"
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
agent_id: UUID | None = Field(default=None, foreign_key="agents.id", index=True)
action_type: str
payload: dict[str, object] | None = Field(default=None, sa_column=Column(JSON))
confidence: int
rubric_scores: dict[str, int] | None = Field(default=None, sa_column=Column(JSON))
status: str = Field(default="pending", index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
resolved_at: datetime | None = None

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class BoardMemory(SQLModel, table=True):
__tablename__ = "board_memory"
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
content: str
tags: list[str] | None = Field(default=None, sa_column=Column(JSON))
source: str | None = None
created_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -0,0 +1,20 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class BoardOnboardingSession(SQLModel, table=True):
__tablename__ = "board_onboarding_sessions"
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
session_key: str
status: str = Field(default="active", index=True)
messages: list[dict[str, object]] | None = Field(default=None, sa_column=Column(JSON))
draft_goal: dict[str, object] | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field
from app.models.tenancy import TenantScoped
@@ -15,5 +16,11 @@ class Board(TenantScoped, table=True):
name: str
slug: str = Field(index=True)
gateway_id: UUID | None = Field(default=None, foreign_key="gateways.id", index=True)
board_type: str = Field(default="goal", index=True)
objective: str | None = None
success_metrics: dict[str, object] | None = Field(default=None, sa_column=Column(JSON))
target_date: datetime | None = None
goal_confirmed: bool = Field(default=False)
goal_source: str | None = None
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlmodel import Field, SQLModel
class TaskFingerprint(SQLModel, table=True):
__tablename__ = "task_fingerprints"
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
fingerprint_hash: str = Field(index=True)
task_id: UUID = Field(foreign_key="tasks.id")
created_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -23,6 +23,8 @@ class Task(TenantScoped, table=True):
created_by_user_id: UUID | None = Field(default=None, foreign_key="users.id", index=True)
assigned_agent_id: UUID | None = Field(default=None, foreign_key="agents.id", index=True)
auto_created: bool = Field(default=False)
auto_reason: str | None = None
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -1,5 +1,13 @@
from app.schemas.activity_events import ActivityEventRead
from app.schemas.agents import AgentCreate, AgentRead, AgentUpdate
from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalUpdate
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
from app.schemas.board_onboarding import (
BoardOnboardingAnswer,
BoardOnboardingConfirm,
BoardOnboardingRead,
BoardOnboardingStart,
)
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
from app.schemas.gateways import GatewayCreate, GatewayRead, GatewayUpdate
from app.schemas.metrics import DashboardMetrics
@@ -11,6 +19,15 @@ __all__ = [
"AgentCreate",
"AgentRead",
"AgentUpdate",
"ApprovalCreate",
"ApprovalRead",
"ApprovalUpdate",
"BoardMemoryCreate",
"BoardMemoryRead",
"BoardOnboardingAnswer",
"BoardOnboardingConfirm",
"BoardOnboardingRead",
"BoardOnboardingStart",
"BoardCreate",
"BoardRead",
"BoardUpdate",

View File

@@ -23,6 +23,7 @@ class AgentCreate(AgentBase):
class AgentUpdate(SQLModel):
board_id: UUID | None = None
is_gateway_main: bool | None = None
name: str | None = None
status: str | None = None
heartbeat_config: dict[str, Any] | None = None
@@ -33,6 +34,8 @@ class AgentUpdate(SQLModel):
class AgentRead(AgentBase):
id: UUID
is_board_lead: bool = False
is_gateway_main: bool = False
openclaw_session_id: str | None = None
last_seen_at: datetime | None
created_at: datetime
@@ -46,3 +49,7 @@ class AgentHeartbeat(SQLModel):
class AgentHeartbeatCreate(AgentHeartbeat):
name: str
board_id: UUID | None = None
class AgentNudge(SQLModel):
message: str

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from sqlmodel import SQLModel
class ApprovalBase(SQLModel):
action_type: str
payload: dict[str, object] | None = None
confidence: int
rubric_scores: dict[str, int] | None = None
status: str = "pending"
class ApprovalCreate(ApprovalBase):
agent_id: UUID | None = None
class ApprovalUpdate(SQLModel):
status: str | None = None
class ApprovalRead(ApprovalBase):
id: UUID
board_id: UUID
agent_id: UUID | None = None
created_at: datetime
resolved_at: datetime | None = None

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from sqlmodel import SQLModel
class BoardMemoryCreate(SQLModel):
content: str
tags: list[str] | None = None
source: str | None = None
class BoardMemoryRead(BoardMemoryCreate):
id: UUID
board_id: UUID
created_at: datetime

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import model_validator
from sqlmodel import SQLModel
class BoardOnboardingStart(SQLModel):
pass
class BoardOnboardingAnswer(SQLModel):
answer: str
other_text: str | None = None
class BoardOnboardingConfirm(SQLModel):
board_type: str
objective: str | None = None
success_metrics: dict[str, object] | None = None
target_date: datetime | None = None
@model_validator(mode="after")
def validate_goal_fields(self):
if self.board_type == "goal":
if not self.objective or not self.success_metrics:
raise ValueError("Confirmed goal boards require objective and success_metrics")
return self
class BoardOnboardingRead(SQLModel):
id: UUID
board_id: UUID
session_key: str
status: str
messages: list[dict[str, object]] | None = None
draft_goal: dict[str, object] | None = None
created_at: datetime
updated_at: datetime

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import model_validator
from sqlmodel import SQLModel
@@ -10,16 +11,33 @@ class BoardBase(SQLModel):
name: str
slug: str
gateway_id: UUID | None = None
board_type: str = "goal"
objective: str | None = None
success_metrics: dict[str, object] | None = None
target_date: datetime | None = None
goal_confirmed: bool = False
goal_source: str | None = None
class BoardCreate(BoardBase):
pass
@model_validator(mode="after")
def validate_goal_fields(self):
if self.board_type == "goal" and self.goal_confirmed:
if not self.objective or not self.success_metrics:
raise ValueError("Confirmed goal boards require objective and success_metrics")
return self
class BoardUpdate(SQLModel):
name: str | None = None
slug: str | None = None
gateway_id: UUID | None = None
board_type: str | None = None
objective: str | None = None
success_metrics: dict[str, object] | None = None
target_date: datetime | None = None
goal_confirmed: bool | None = None
goal_source: str | None = None
class BoardRead(BoardBase):

View File

@@ -37,11 +37,22 @@ DEFAULT_GATEWAY_FILES = frozenset(
"IDENTITY.md",
"USER.md",
"HEARTBEAT.md",
"BOOT.md",
"BOOTSTRAP.md",
"MEMORY.md",
}
)
HEARTBEAT_LEAD_TEMPLATE = "HEARTBEAT_LEAD.md"
HEARTBEAT_AGENT_TEMPLATE = "HEARTBEAT_AGENT.md"
MAIN_TEMPLATE_MAP = {
"AGENTS.md": "MAIN_AGENTS.md",
"HEARTBEAT.md": "MAIN_HEARTBEAT.md",
"USER.md": "MAIN_USER.md",
"BOOT.md": "MAIN_BOOT.md",
"TOOLS.md": "MAIN_TOOLS.md",
}
def _repo_root() -> Path:
return Path(__file__).resolve().parents[3]
@@ -80,6 +91,10 @@ def _template_env() -> Environment:
)
def _heartbeat_template_name(agent: Agent) -> str:
return HEARTBEAT_LEAD_TEMPLATE if agent.is_board_lead else HEARTBEAT_AGENT_TEMPLATE
def _workspace_path(agent_name: str, workspace_root: str) -> str:
if not workspace_root:
raise ValueError("gateway_workspace_root is required")
@@ -125,10 +140,20 @@ def _build_context(
context_key: normalized_identity.get(field, DEFAULT_IDENTITY_PROFILE[field])
for field, context_key in IDENTITY_PROFILE_FIELDS.items()
}
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
return {
"agent_name": agent.name,
"agent_id": agent_id,
"board_id": str(board.id),
"board_name": board.name,
"board_type": board.board_type,
"board_objective": board.objective or "",
"board_success_metrics": json.dumps(board.success_metrics or {}),
"board_target_date": board.target_date.isoformat() if board.target_date else "",
"board_goal_confirmed": str(board.goal_confirmed).lower(),
"is_board_lead": str(agent.is_board_lead).lower(),
"session_key": session_key,
"workspace_path": workspace_path,
"base_url": base_url,
@@ -136,7 +161,55 @@ def _build_context(
"main_session_key": main_session_key,
"workspace_root": workspace_root,
"user_name": (user.name or "") if user else "",
"user_preferred_name": (user.preferred_name or "") if user else "",
"user_preferred_name": preferred_name,
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
"user_context": (user.context or "") if user else "",
**identity_context,
}
def _build_main_context(
agent: Agent,
gateway: Gateway,
auth_token: str,
user: User | None,
) -> dict[str, str]:
base_url = settings.base_url or "REPLACE_WITH_BASE_URL"
identity_profile: dict[str, Any] = {}
if isinstance(agent.identity_profile, dict):
identity_profile = agent.identity_profile
normalized_identity: dict[str, str] = {}
for key, value in identity_profile.items():
if value is None:
continue
if isinstance(value, list):
parts = [str(item).strip() for item in value if str(item).strip()]
if not parts:
continue
normalized_identity[key] = ", ".join(parts)
continue
text = str(value).strip()
if text:
normalized_identity[key] = text
identity_context = {
context_key: normalized_identity.get(field, DEFAULT_IDENTITY_PROFILE[field])
for field, context_key in IDENTITY_PROFILE_FIELDS.items()
}
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
return {
"agent_name": agent.name,
"agent_id": str(agent.id),
"session_key": agent.openclaw_session_id or "",
"base_url": base_url,
"auth_token": auth_token,
"main_session_key": gateway.main_session_key or "",
"workspace_root": gateway.workspace_root or "",
"user_name": (user.name or "") if user else "",
"user_preferred_name": preferred_name,
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
@@ -174,6 +247,12 @@ async def _supported_gateway_files(config: GatewayClientConfig) -> set[str]:
return set(DEFAULT_GATEWAY_FILES)
async def _reset_session(session_key: str, config: GatewayClientConfig) -> None:
if not session_key:
return
await openclaw_call("sessions.reset", {"key": session_key}, config=config)
async def _gateway_agent_files_index(
agent_id: str, config: GatewayClientConfig
) -> dict[str, dict[str, Any]]:
@@ -197,6 +276,7 @@ def _render_agent_files(
file_names: set[str],
*,
include_bootstrap: bool,
template_overrides: dict[str, str] | None = None,
) -> dict[str, str]:
env = _template_env()
overrides: dict[str, str] = {}
@@ -212,18 +292,53 @@ def _render_agent_files(
if name == "MEMORY.md":
rendered[name] = "# MEMORY\n\nBootstrap pending.\n"
continue
if name == "HEARTBEAT.md":
heartbeat_template = (
template_overrides.get(name)
if template_overrides and name in template_overrides
else _heartbeat_template_name(agent)
)
heartbeat_path = _templates_root() / heartbeat_template
if heartbeat_path.exists():
rendered[name] = env.get_template(heartbeat_template).render(**context).strip()
continue
override = overrides.get(name)
if override:
rendered[name] = env.from_string(override).render(**context).strip()
continue
path = _templates_root() / name
template_name = (
template_overrides.get(name)
if template_overrides and name in template_overrides
else name
)
path = _templates_root() / template_name
if path.exists():
rendered[name] = env.get_template(name).render(**context).strip()
rendered[name] = env.get_template(template_name).render(**context).strip()
continue
rendered[name] = ""
return rendered
async def _gateway_default_agent_id(
config: GatewayClientConfig,
) -> str | None:
try:
payload = await openclaw_call("agents.list", config=config)
except OpenClawGatewayError:
return None
if not isinstance(payload, dict):
return None
default_id = payload.get("defaultId") or payload.get("default_id")
if default_id:
return default_id
agents = payload.get("agents") or []
if isinstance(agents, list) and agents:
first = agents[0]
if isinstance(first, dict):
return first.get("id")
return None
async def _patch_gateway_agent_list(
agent_id: str,
workspace_path: str,
@@ -279,7 +394,9 @@ async def _remove_gateway_agent_list(
if not isinstance(lst, list):
raise OpenClawGatewayError("config agents.list is not a list")
new_list = [entry for entry in lst if not (isinstance(entry, dict) and entry.get("id") == agent_id)]
new_list = [
entry for entry in lst if not (isinstance(entry, dict) and entry.get("id") == agent_id)
]
if len(new_list) == len(lst):
return
patch = {"agents": {"list": new_list}}
@@ -317,6 +434,8 @@ async def provision_agent(
user: User | None,
*,
action: str = "provision",
force_bootstrap: bool = False,
reset_session: bool = False,
) -> None:
if not gateway.url:
return
@@ -332,10 +451,11 @@ async def provision_agent(
await _patch_gateway_agent_list(agent_id, workspace_path, heartbeat, client_config)
context = _build_context(agent, board, gateway, auth_token, user)
supported = await _supported_gateway_files(client_config)
supported = set(await _supported_gateway_files(client_config))
supported.add("USER.md")
existing_files = await _gateway_agent_files_index(agent_id, client_config)
include_bootstrap = True
if action == "update":
if action == "update" and not force_bootstrap:
if not existing_files:
include_bootstrap = False
else:
@@ -357,6 +477,61 @@ async def provision_agent(
{"agentId": agent_id, "name": name, "content": content},
config=client_config,
)
if reset_session:
await _reset_session(session_key, client_config)
async def provision_main_agent(
agent: Agent,
gateway: Gateway,
auth_token: str,
user: User | None,
*,
action: str = "provision",
force_bootstrap: bool = False,
reset_session: bool = False,
) -> None:
if not gateway.url:
return
if not gateway.main_session_key:
raise ValueError("gateway main_session_key is required")
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
await ensure_session(gateway.main_session_key, config=client_config, label="Main Agent")
agent_id = await _gateway_default_agent_id(client_config)
if not agent_id:
raise OpenClawGatewayError("Unable to resolve gateway main agent id")
context = _build_main_context(agent, gateway, auth_token, user)
supported = set(await _supported_gateway_files(client_config))
supported.add("USER.md")
existing_files = await _gateway_agent_files_index(agent_id, client_config)
include_bootstrap = action != "update" or force_bootstrap
if action == "update" and not force_bootstrap:
if not existing_files:
include_bootstrap = False
else:
entry = existing_files.get("BOOTSTRAP.md")
if entry and entry.get("missing") is True:
include_bootstrap = False
rendered = _render_agent_files(
context,
agent,
supported,
include_bootstrap=include_bootstrap,
template_overrides=MAIN_TEMPLATE_MAP,
)
for name, content in rendered.items():
if content == "":
continue
await openclaw_call(
"agents.files.set",
{"agentId": agent_id, "name": name, "content": content},
config=client_config,
)
if reset_session:
await _reset_session(gateway.main_session_key, client_config)
async def cleanup_agent(

View File

@@ -0,0 +1,27 @@
from __future__ import annotations
import hashlib
from typing import Mapping
CONFIDENCE_THRESHOLD = 80
def compute_confidence(rubric_scores: Mapping[str, int]) -> int:
return int(sum(rubric_scores.values()))
def approval_required(*, confidence: int, is_external: bool, is_risky: bool) -> bool:
return is_external or is_risky or confidence < CONFIDENCE_THRESHOLD
def infer_planning(signals: Mapping[str, bool]) -> bool:
# Require at least two planning signals to avoid spam on general boards.
truthy = [key for key, value in signals.items() if value]
return len(truthy) >= 2
def task_fingerprint(title: str, description: str | None, board_id: str) -> str:
normalized_title = title.strip().lower()
normalized_desc = (description or "").strip().lower()
seed = f"{board_id}::{normalized_title}::{normalized_desc}"
return hashlib.sha256(seed.encode()).hexdigest()

View File

@@ -25,6 +25,7 @@ dependencies = [
"redis==5.1.1",
"fastapi-clerk-auth==0.0.9",
"sse-starlette==2.1.3",
"jinja2"
]
[project.optional-dependencies]

View File

@@ -0,0 +1,6 @@
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))

View File

@@ -0,0 +1,50 @@
import pytest
from app.schemas.board_onboarding import BoardOnboardingConfirm
from app.schemas.boards import BoardCreate
def test_goal_board_requires_objective_and_metrics_when_confirmed():
with pytest.raises(ValueError):
BoardCreate(
name="Goal Board",
slug="goal",
board_type="goal",
goal_confirmed=True,
)
BoardCreate(
name="Goal Board",
slug="goal",
board_type="goal",
goal_confirmed=True,
objective="Launch onboarding",
success_metrics={"emails": 3},
)
def test_goal_board_allows_missing_objective_before_confirmation():
BoardCreate(name="Draft", slug="draft", board_type="goal")
def test_general_board_allows_missing_objective():
BoardCreate(name="General", slug="general", board_type="general")
def test_onboarding_confirm_requires_goal_fields():
with pytest.raises(ValueError):
BoardOnboardingConfirm(board_type="goal")
with pytest.raises(ValueError):
BoardOnboardingConfirm(board_type="goal", objective="Ship onboarding")
with pytest.raises(ValueError):
BoardOnboardingConfirm(board_type="goal", success_metrics={"emails": 3})
BoardOnboardingConfirm(
board_type="goal",
objective="Ship onboarding",
success_metrics={"emails": 3},
)
BoardOnboardingConfirm(board_type="general")

View File

@@ -0,0 +1,50 @@
import hashlib
from app.services.lead_policy import (
approval_required,
compute_confidence,
infer_planning,
task_fingerprint,
)
def test_compute_confidence_sums_weights():
rubric = {
"clarity": 20,
"constraints": 15,
"completeness": 10,
"risk": 20,
"dependencies": 10,
"similarity": 5,
}
assert compute_confidence(rubric) == 80
def test_approval_required_for_low_confidence():
assert approval_required(confidence=79, is_external=False, is_risky=False)
assert not approval_required(confidence=85, is_external=False, is_risky=False)
def test_approval_required_for_external_or_risky():
assert approval_required(confidence=90, is_external=True, is_risky=False)
assert approval_required(confidence=90, is_external=False, is_risky=True)
def test_infer_planning_requires_signal_threshold():
signals = {
"goal_gap": True,
"recent_ambiguity": False,
"research_only": False,
"stalled_inbox": False,
}
assert infer_planning(signals) is False
signals["recent_ambiguity"] = True
assert infer_planning(signals) is True
def test_task_fingerprint_deterministic():
fp1 = task_fingerprint("Title", "Desc", "board-1")
fp2 = task_fingerprint("Title", "Desc", "board-1")
assert fp1 == fp2
assert fp1 == hashlib.sha256("board-1::title::desc".encode()).hexdigest()

14
backend/uv.lock generated
View File

@@ -325,6 +325,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/b3/8def84f539e7d2289a02f0524b944b15d7c75dab7628bedf1c4f0992029c/isort-5.13.2-py3-none-any.whl", hash = "sha256:8ca5e72a8d85860d5a3fa69b8745237f2939afe12dbf656afbcb47fe72d947a6", size = 92310, upload-time = "2023-12-13T20:37:23.244Z" },
]
[[package]]
name = "jinja2"
version = "3.1.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markupsafe" },
]
sdist = { url = "https://files.pythonhosted.org/packages/df/bf/f7da0350254c0ed7c72f3e33cef02e048281fec7ecec5f032d4aac52226b/jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d", size = 245115, upload-time = "2025-03-05T20:05:02.478Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" },
]
[[package]]
name = "mako"
version = "1.3.10"
@@ -444,6 +456,7 @@ dependencies = [
{ name = "alembic" },
{ name = "fastapi" },
{ name = "fastapi-clerk-auth" },
{ name = "jinja2" },
{ name = "psycopg", extra = ["binary"] },
{ name = "pydantic-settings" },
{ name = "python-dotenv" },
@@ -475,6 +488,7 @@ requires-dist = [
{ name = "fastapi-clerk-auth", specifier = "==0.0.9" },
{ name = "flake8", marker = "extra == 'dev'", specifier = "==7.1.1" },
{ name = "isort", marker = "extra == 'dev'", specifier = "==5.13.2" },
{ name = "jinja2" },
{ name = "mypy", marker = "extra == 'dev'", specifier = "==1.11.2" },
{ name = "psycopg", extras = ["binary"], specifier = "==3.2.1" },
{ name = "pydantic-settings", specifier = "==2.5.2" },