diff --git a/backend/app/api/agent.py b/backend/app/api/agent.py index 83cd1c4..b7f9474 100644 --- a/backend/app/api/agent.py +++ b/backend/app/api/agent.py @@ -5,6 +5,7 @@ from typing import Any, cast from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import func from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession @@ -15,6 +16,7 @@ from app.api import board_onboarding as onboarding_api from app.api import tasks as tasks_api from app.api.deps import ActorContext, get_board_or_404, get_task_or_404 from app.core.agent_auth import AgentAuthContext, get_agent_auth_context +from app.core.config import settings from app.db.pagination import paginate from app.db.session import get_session from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig @@ -40,9 +42,19 @@ from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead from app.schemas.board_onboarding import BoardOnboardingAgentUpdate, BoardOnboardingRead from app.schemas.boards import BoardRead from app.schemas.common import OkResponse +from app.schemas.gateway_coordination import ( + GatewayBoardEnsureRequest, + GatewayBoardEnsureResponse, + GatewayLeadBroadcastBoardResult, + GatewayLeadBroadcastRequest, + GatewayLeadBroadcastResponse, + GatewayLeadMessageRequest, + GatewayLeadMessageResponse, +) from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate from app.services.activity_log import record_activity +from app.services.board_leads import ensure_board_lead_agent from app.services.task_dependencies import ( blocked_by_dependency_ids, dependency_status_by_id, @@ -70,6 +82,50 @@ async def _gateway_config(session: AsyncSession, board: Board) -> GatewayClientC return GatewayClientConfig(url=gateway.url, token=gateway.token) +def _slugify(value: str) -> str: + import re + + slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") + return slug or "board" + + +async def _require_gateway_main( + session: AsyncSession, + agent: Agent, +) -> tuple[Gateway, GatewayClientConfig]: + session_key = (agent.openclaw_session_id or "").strip() + if not session_key: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Agent missing session key") + gateway = ( + await session.exec(select(Gateway).where(col(Gateway.main_session_key) == session_key)) + ).first() + if gateway is None: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only the gateway main agent may call this endpoint.", + ) + if not gateway.url: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway url is required", + ) + return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token) + + +async def _require_gateway_board( + session: AsyncSession, + *, + gateway: Gateway, + board_id: UUID | str, +) -> Board: + board = await session.get(Board, board_id) + if board is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Board not found") + if board.gateway_id != gateway.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + return board + + @router.get("/boards", response_model=DefaultLimitOffsetPage[BoardRead]) async def list_boards( session: AsyncSession = Depends(get_session), @@ -440,3 +496,258 @@ async def agent_heartbeat( session=session, actor=_actor(agent_ctx), ) + + +@router.post("/gateway/boards/ensure", response_model=GatewayBoardEnsureResponse) +async def ensure_gateway_board( + payload: GatewayBoardEnsureRequest, + session: AsyncSession = Depends(get_session), + agent_ctx: AgentAuthContext = Depends(get_agent_auth_context), +) -> GatewayBoardEnsureResponse: + gateway, config = await _require_gateway_main(session, agent_ctx.agent) + + requested_name = payload.name.strip() + requested_slug = _slugify(payload.slug.strip() if payload.slug else requested_name) + + # Try slug match first, then case-insensitive name match. + existing = ( + await session.exec( + select(Board) + .where(col(Board.gateway_id) == gateway.id) + .where(col(Board.slug) == requested_slug) + ) + ).first() + if existing is None: + existing = ( + await session.exec( + select(Board) + .where(col(Board.gateway_id) == gateway.id) + .where(func.lower(col(Board.name)) == requested_name.lower()) + ) + ).first() + + created = False + board = existing + if board is None: + slug = requested_slug + suffix = 2 + while True: + conflict = ( + await session.exec( + select(Board.id) + .where(col(Board.gateway_id) == gateway.id) + .where(col(Board.slug) == slug) + ) + ).first() + if conflict is None: + break + slug = f"{requested_slug}-{suffix}" + suffix += 1 + + board = Board( + name=requested_name, + slug=slug, + gateway_id=gateway.id, + board_type=payload.board_type, + objective=payload.objective.strip() if payload.objective else None, + success_metrics=payload.success_metrics, + target_date=payload.target_date, + goal_confirmed=False, + goal_source="gateway_main_agent", + ) + session.add(board) + await session.commit() + await session.refresh(board) + created = True + + lead, lead_created = await ensure_board_lead_agent( + session, + board=board, + gateway=gateway, + config=config, + user=None, + agent_name=payload.lead_agent_name.strip() if payload.lead_agent_name else None, + identity_profile=payload.lead_identity_profile, + action="provision", + ) + + return GatewayBoardEnsureResponse( + created=created, + lead_created=lead_created, + board_id=board.id, + lead_agent_id=lead.id, + board_name=board.name, + board_slug=board.slug, + lead_agent_name=lead.name, + ) + + +@router.post( + "/gateway/boards/{board_id}/lead/message", + response_model=GatewayLeadMessageResponse, +) +async def message_gateway_board_lead( + board_id: UUID, + payload: GatewayLeadMessageRequest, + session: AsyncSession = Depends(get_session), + agent_ctx: AgentAuthContext = Depends(get_agent_auth_context), +) -> GatewayLeadMessageResponse: + import json + + gateway, config = await _require_gateway_main(session, agent_ctx.agent) + board = await _require_gateway_board(session, gateway=gateway, board_id=board_id) + lead, lead_created = await ensure_board_lead_agent( + session, + board=board, + gateway=gateway, + config=config, + user=None, + action="provision", + ) + if not lead.openclaw_session_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Lead agent has no session key", + ) + + base_url = settings.base_url or "http://localhost:8000" + header = "GATEWAY MAIN QUESTION" if payload.kind == "question" else "GATEWAY MAIN HANDOFF" + correlation = payload.correlation_id.strip() if payload.correlation_id else "" + correlation_line = f"Correlation ID: {correlation}\n" if correlation else "" + tags = payload.reply_tags or ["gateway_main", "lead_reply"] + tags_json = json.dumps(tags) + reply_source = payload.reply_source or "lead_to_gateway_main" + + message = ( + f"{header}\n" + f"Board: {board.name}\n" + f"Board ID: {board.id}\n" + f"From agent: {agent_ctx.agent.name}\n" + f"{correlation_line}\n" + f"{payload.content.strip()}\n\n" + "Reply to the gateway main by writing a NON-chat memory item on this board:\n" + f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n" + f'Body: {{"content":"...","tags":{tags_json},"source":"{reply_source}"}}\n' + "Do NOT reply in OpenClaw chat." + ) + + try: + await ensure_session(lead.openclaw_session_id, config=config, label=lead.name) + await send_message(message, session_key=lead.openclaw_session_id, config=config) + except OpenClawGatewayError as exc: + record_activity( + session, + event_type="gateway.main.lead_message.failed", + message=f"Lead message failed for {board.name}: {exc}", + agent_id=agent_ctx.agent.id, + ) + await session.commit() + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc + + record_activity( + session, + event_type="gateway.main.lead_message.sent", + message=f"Sent {payload.kind} to lead for board: {board.name}.", + agent_id=agent_ctx.agent.id, + ) + await session.commit() + + return GatewayLeadMessageResponse( + board_id=board.id, + lead_agent_id=lead.id, + lead_agent_name=lead.name, + lead_created=lead_created, + ) + + +@router.post( + "/gateway/leads/broadcast", + response_model=GatewayLeadBroadcastResponse, +) +async def broadcast_gateway_lead_message( + payload: GatewayLeadBroadcastRequest, + session: AsyncSession = Depends(get_session), + agent_ctx: AgentAuthContext = Depends(get_agent_auth_context), +) -> GatewayLeadBroadcastResponse: + import json + + gateway, config = await _require_gateway_main(session, agent_ctx.agent) + + statement = select(Board).where(col(Board.gateway_id) == gateway.id).order_by( + col(Board.created_at).desc() + ) + if payload.board_ids: + statement = statement.where(col(Board.id).in_(payload.board_ids)) + boards = list(await session.exec(statement)) + + base_url = settings.base_url or "http://localhost:8000" + header = "GATEWAY MAIN QUESTION" if payload.kind == "question" else "GATEWAY MAIN HANDOFF" + correlation = payload.correlation_id.strip() if payload.correlation_id else "" + correlation_line = f"Correlation ID: {correlation}\n" if correlation else "" + tags = payload.reply_tags or ["gateway_main", "lead_reply"] + tags_json = json.dumps(tags) + reply_source = payload.reply_source or "lead_to_gateway_main" + + results: list[GatewayLeadBroadcastBoardResult] = [] + sent = 0 + failed = 0 + + for board in boards: + try: + lead, _lead_created = await ensure_board_lead_agent( + session, + board=board, + gateway=gateway, + config=config, + user=None, + action="provision", + ) + if not lead.openclaw_session_id: + raise ValueError("Lead agent has no session key") + message = ( + f"{header}\n" + f"Board: {board.name}\n" + f"Board ID: {board.id}\n" + f"From agent: {agent_ctx.agent.name}\n" + f"{correlation_line}\n" + f"{payload.content.strip()}\n\n" + "Reply to the gateway main by writing a NON-chat memory item on this board:\n" + f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n" + f'Body: {{"content":"...","tags":{tags_json},"source":"{reply_source}"}}\n' + "Do NOT reply in OpenClaw chat." + ) + await ensure_session(lead.openclaw_session_id, config=config, label=lead.name) + await send_message(message, session_key=lead.openclaw_session_id, config=config) + results.append( + GatewayLeadBroadcastBoardResult( + board_id=board.id, + lead_agent_id=lead.id, + lead_agent_name=lead.name, + ok=True, + ) + ) + sent += 1 + except Exception as exc: + results.append( + GatewayLeadBroadcastBoardResult( + board_id=board.id, + ok=False, + error=str(exc), + ) + ) + failed += 1 + + record_activity( + session, + event_type="gateway.main.lead_broadcast.sent", + message=f"Broadcast {payload.kind} to {sent} board leads (failed: {failed}).", + agent_id=agent_ctx.agent.id, + ) + await session.commit() + + return GatewayLeadBroadcastResponse( + ok=True, + sent=sent, + failed=failed, + results=results, + ) diff --git a/backend/app/schemas/gateway_coordination.py b/backend/app/schemas/gateway_coordination.py new file mode 100644 index 0000000..55a5b1e --- /dev/null +++ b/backend/app/schemas/gateway_coordination.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Literal +from uuid import UUID + +from sqlmodel import Field, SQLModel + +from app.schemas.common import NonEmptyStr + + +class GatewayBoardEnsureRequest(SQLModel): + name: NonEmptyStr + slug: str | None = None + board_type: Literal["goal", "general"] = "goal" + objective: str | None = None + success_metrics: dict[str, object] | None = None + target_date: datetime | None = None + lead_agent_name: str | None = None + lead_identity_profile: dict[str, str] | None = None + + +class GatewayBoardEnsureResponse(SQLModel): + created: bool = False + lead_created: bool = False + board_id: UUID + lead_agent_id: UUID | None = None + + # Convenience fields for callers that don't want to re-fetch. + board_name: str + board_slug: str + lead_agent_name: str | None = None + + +class GatewayLeadMessageRequest(SQLModel): + kind: Literal["question", "handoff"] = "question" + correlation_id: str | None = None + content: NonEmptyStr + + # How the lead should reply (defaults are interpreted by templates). + reply_tags: list[str] = Field(default_factory=lambda: ["gateway_main", "lead_reply"]) + reply_source: str | None = "lead_to_gateway_main" + + +class GatewayLeadMessageResponse(SQLModel): + ok: bool = True + board_id: UUID + lead_agent_id: UUID | None = None + lead_agent_name: str | None = None + lead_created: bool = False + + +class GatewayLeadBroadcastRequest(SQLModel): + kind: Literal["question", "handoff"] = "question" + correlation_id: str | None = None + content: NonEmptyStr + board_ids: list[UUID] | None = None + reply_tags: list[str] = Field(default_factory=lambda: ["gateway_main", "lead_reply"]) + reply_source: str | None = "lead_to_gateway_main" + + +class GatewayLeadBroadcastBoardResult(SQLModel): + board_id: UUID + lead_agent_id: UUID | None = None + lead_agent_name: str | None = None + ok: bool = False + error: str | None = None + + +class GatewayLeadBroadcastResponse(SQLModel): + ok: bool = True + sent: int = 0 + failed: int = 0 + results: list[GatewayLeadBroadcastBoardResult] = Field(default_factory=list) + diff --git a/backend/app/services/board_leads.py b/backend/app/services/board_leads.py new file mode 100644 index 0000000..89208a7 --- /dev/null +++ b/backend/app/services/board_leads.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +from typing import Any + +from sqlmodel import col, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.agent_tokens import generate_agent_token, hash_agent_token +from app.core.time import utcnow +from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig +from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message +from app.models.agents import Agent +from app.models.boards import Board +from app.models.gateways import Gateway +from app.models.users import User +from app.services.agent_provisioning import DEFAULT_HEARTBEAT_CONFIG, provision_agent + + +def lead_session_key(board: Board) -> str: + return f"agent:lead-{board.id}:main" + + +def lead_agent_name(_: Board) -> str: + return "Lead Agent" + + +async def ensure_board_lead_agent( + session: AsyncSession, + *, + board: Board, + gateway: Gateway, + config: GatewayClientConfig, + user: User | None, + agent_name: str | None = None, + identity_profile: dict[str, str] | None = None, + action: str = "provision", +) -> tuple[Agent, bool]: + existing = ( + await session.exec( + select(Agent) + .where(Agent.board_id == board.id) + .where(col(Agent.is_board_lead).is_(True)) + ) + ).first() + if existing: + desired_name = agent_name or lead_agent_name(board) + changed = False + if existing.name != desired_name: + existing.name = desired_name + changed = True + desired_session_key = lead_session_key(board) + if not existing.openclaw_session_id: + existing.openclaw_session_id = desired_session_key + changed = True + if changed: + existing.updated_at = utcnow() + session.add(existing) + await session.commit() + await session.refresh(existing) + return existing, False + + merged_identity_profile: dict[str, Any] = { + "role": "Board Lead", + "communication_style": "direct, concise, practical", + "emoji": ":gear:", + } + if identity_profile: + merged_identity_profile.update( + {key: value.strip() for key, value in identity_profile.items() if value.strip()} + ) + + agent = Agent( + name=agent_name or lead_agent_name(board), + status="provisioning", + board_id=board.id, + is_board_lead=True, + heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), + identity_profile=merged_identity_profile, + openclaw_session_id=lead_session_key(board), + provision_requested_at=utcnow(), + provision_action=action, + ) + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + session.add(agent) + await session.commit() + await session.refresh(agent) + + try: + await provision_agent(agent, board, gateway, raw_token, user, action=action) + if agent.openclaw_session_id: + await ensure_session(agent.openclaw_session_id, config=config, label=agent.name) + await send_message( + ( + f"Hello {agent.name}. Your workspace has been provisioned.\n\n" + "Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once " + "then delete it. Begin heartbeats after startup." + ), + session_key=agent.openclaw_session_id, + config=config, + deliver=True, + ) + except OpenClawGatewayError: + # Best-effort provisioning. The board/agent rows should still exist. + pass + + return agent, True + diff --git a/frontend/src/app/boards/[boardId]/page.tsx b/frontend/src/app/boards/[boardId]/page.tsx index 84f3789..0778e73 100644 --- a/frontend/src/app/boards/[boardId]/page.tsx +++ b/frontend/src/app/boards/[boardId]/page.tsx @@ -50,7 +50,10 @@ import { streamApprovalsApiV1BoardsBoardIdApprovalsStreamGet, updateApprovalApiV1BoardsBoardIdApprovalsApprovalIdPatch, } from "@/api/generated/approvals/approvals"; -import { listTaskCommentFeedApiV1ActivityTaskCommentsGet } from "@/api/generated/activity/activity"; +import { + listTaskCommentFeedApiV1ActivityTaskCommentsGet, + streamTaskCommentFeedApiV1ActivityTaskCommentsStreamGet, +} from "@/api/generated/activity/activity"; import { getBoardSnapshotApiV1BoardsBoardIdSnapshotGet } from "@/api/generated/boards/boards"; import { createBoardMemoryApiV1BoardsBoardIdMemoryPost, @@ -219,6 +222,7 @@ const LiveFeedCard = memo(function LiveFeedCard({ authorRole, authorAvatar, onViewTask, + isNew, }: { comment: TaskComment; taskTitle: string; @@ -226,10 +230,18 @@ const LiveFeedCard = memo(function LiveFeedCard({ authorRole?: string | null; authorAvatar: string; onViewTask?: () => void; + isNew?: boolean; }) { const message = (comment.message ?? "").trim(); return ( -