feat: enhance agent creation with human-like naming and improve task assignment notifications

This commit is contained in:
Abhimanyu Saharan
2026-02-05 22:51:46 +05:30
parent cbf9fd1b0a
commit e09460a881
10 changed files with 1212 additions and 203 deletions

View File

@@ -332,7 +332,11 @@ async def agent_heartbeat(
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> AgentRead:
if agent_ctx.agent.name != payload.name:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
payload = AgentHeartbeatCreate(
name=agent_ctx.agent.name,
status=payload.status,
board_id=payload.board_id,
)
return await agents_api.heartbeat_or_create_agent( # type: ignore[attr-defined]
payload=payload,
session=session,

View File

@@ -1,17 +1,21 @@
from __future__ import annotations
import re
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import asyncio
import json
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import update
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, or_, update
from sqlmodel import Session, col, select
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agent
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext
from app.db.session import get_session
from app.db.session import engine, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.activity_events import ActivityEvent
@@ -34,6 +38,22 @@ OFFLINE_AFTER = timedelta(minutes=10)
AGENT_SESSION_PREFIX = "agent"
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
def _normalize_identity_profile(
profile: dict[str, object] | None,
) -> dict[str, str] | None:
@@ -172,6 +192,30 @@ def _with_computed_status(agent: Agent) -> Agent:
return agent
def _serialize_agent(agent: Agent, main_session_keys: set[str]) -> dict[str, object]:
return _to_agent_read(_with_computed_status(agent), main_session_keys).model_dump()
def _fetch_agent_events(
board_id: UUID | None,
since: datetime,
) -> list[Agent]:
with Session(engine) as session:
statement = select(Agent)
if board_id:
statement = statement.where(col(Agent.board_id) == board_id)
statement = (
statement.where(
or_(
col(Agent.updated_at) >= since,
col(Agent.last_seen_at) >= since,
)
)
.order_by(asc(col(Agent.updated_at)))
)
return list(session.exec(statement))
def _record_heartbeat(session: Session, agent: Agent) -> None:
record_activity(
session,
@@ -217,6 +261,36 @@ def list_agents(
]
@router.get("/stream")
async def stream_agents(
request: Request,
board_id: UUID | None = Query(default=None),
since: str | None = Query(default=None),
auth: AuthContext = Depends(require_admin_auth),
) -> EventSourceResponse:
since_dt = _parse_since(since) or datetime.utcnow()
last_seen = since_dt
async def event_generator():
nonlocal last_seen
while True:
if await request.is_disconnected():
break
agents = await run_in_threadpool(_fetch_agent_events, board_id, last_seen)
if agents:
with Session(engine) as session:
main_session_keys = _get_gateway_main_session_keys(session)
for agent in agents:
updated_at = agent.updated_at or agent.last_seen_at or datetime.utcnow()
if updated_at > last_seen:
last_seen = updated_at
payload = {"agent": _serialize_agent(agent, main_session_keys)}
yield {"event": "agent", "data": json.dumps(payload)}
await asyncio.sleep(2)
return EventSourceResponse(event_generator(), ping=15)
@router.post("", response_model=AgentRead)
async def create_agent(
payload: AgentCreate,

View File

@@ -1,12 +1,18 @@
from __future__ import annotations
from datetime import datetime
from datetime import datetime, timezone
import asyncio
import json
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, or_
from sqlmodel import Session, col, select
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from app.api.deps import ActorContext, get_board_or_404, require_admin_auth, require_admin_or_agent
from app.db.session import get_session
from app.db.session import engine, get_session
from app.models.approvals import Approval
from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalUpdate
@@ -15,6 +21,49 @@ router = APIRouter(prefix="/boards/{board_id}/approvals", tags=["approvals"])
ALLOWED_STATUSES = {"pending", "approved", "rejected"}
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
def _approval_updated_at(approval: Approval) -> datetime:
return approval.resolved_at or approval.created_at
def _serialize_approval(approval: Approval) -> dict[str, object]:
return ApprovalRead.model_validate(approval, from_attributes=True).model_dump()
def _fetch_approval_events(
board_id: UUID,
since: datetime,
) -> list[Approval]:
with Session(engine) as session:
statement = (
select(Approval)
.where(col(Approval.board_id) == board_id)
.where(
or_(
col(Approval.created_at) >= since,
col(Approval.resolved_at) >= since,
)
)
.order_by(asc(col(Approval.created_at)))
)
return list(session.exec(statement))
@router.get("", response_model=list[ApprovalRead])
def list_approvals(
status_filter: str | None = Query(default=None, alias="status"),
@@ -34,6 +83,38 @@ def list_approvals(
return list(session.exec(statement))
@router.get("/stream")
async def stream_approvals(
request: Request,
board=Depends(get_board_or_404),
actor: ActorContext = Depends(require_admin_or_agent),
since: str | None = Query(default=None),
) -> EventSourceResponse:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
since_dt = _parse_since(since) or datetime.utcnow()
last_seen = since_dt
async def event_generator():
nonlocal last_seen
while True:
if await request.is_disconnected():
break
approvals = await run_in_threadpool(
_fetch_approval_events, board.id, last_seen
)
for approval in approvals:
updated_at = _approval_updated_at(approval)
if updated_at > last_seen:
last_seen = updated_at
payload = {"approval": _serialize_approval(approval)}
yield {"event": "approval", "data": json.dumps(payload)}
await asyncio.sleep(2)
return EventSourceResponse(event_generator(), ping=15)
@router.post("", response_model=ApprovalRead)
def create_approval(
payload: ApprovalCreate,

View File

@@ -9,7 +9,7 @@ from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from sqlalchemy import asc, desc
from sqlalchemy import asc, desc, delete
from sqlmodel import Session, col, select
from app.api.deps import (
@@ -32,6 +32,7 @@ from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.tasks import Task
from app.models.task_fingerprints import TaskFingerprint
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
@@ -150,6 +151,73 @@ async def _send_lead_task_message(
await send_message(message, session_key=session_key, config=config, deliver=False)
async def _send_agent_task_message(
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
) -> None:
await ensure_session(session_key, config=config, label=agent_name)
await send_message(message, session_key=session_key, config=config, deliver=False)
def _notify_agent_on_task_assign(
*,
session: Session,
board: Board,
task: Task,
agent: Agent,
) -> None:
if not agent.openclaw_session_id:
return
config = _gateway_config(session, board)
if config is None:
return
description = (task.description or "").strip()
if len(description) > 500:
description = f"{description[:497]}..."
details = [
f"Board: {board.name}",
f"Task: {task.title}",
f"Task ID: {task.id}",
f"Status: {task.status}",
]
if description:
details.append(f"Description: {description}")
message = (
"TASK ASSIGNED\n"
+ "\n".join(details)
+ "\n\nTake action: open the task and begin work. Post updates as task comments."
)
try:
asyncio.run(
_send_agent_task_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
message=message,
)
)
record_activity(
session,
event_type="task.assignee_notified",
message=f"Agent notified for assignment: {agent.name}.",
agent_id=agent.id,
task_id=task.id,
)
session.commit()
except OpenClawGatewayError as exc:
record_activity(
session,
event_type="task.assignee_notify_failed",
message=f"Assignee notify failed: {exc}",
agent_id=agent.id,
task_id=task.id,
)
session.commit()
def _notify_lead_on_task_create(
*,
session: Session,
@@ -300,6 +368,15 @@ def create_task(
)
session.commit()
_notify_lead_on_task_create(session=session, board=board, task=task)
if task.assigned_agent_id:
assigned_agent = session.get(Agent, task.assigned_agent_id)
if assigned_agent:
_notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
return task
@@ -311,6 +388,7 @@ def update_task(
actor: ActorContext = Depends(require_admin_or_agent),
) -> Task:
previous_status = task.status
previous_assigned = task.assigned_agent_id
updates = payload.model_dump(exclude_unset=True)
comment = updates.pop("comment", None)
if comment is not None and not comment.strip():
@@ -431,6 +509,23 @@ def update_task(
agent_id=actor.agent.id if actor.actor_type == "agent" and actor.agent else None,
)
session.commit()
if task.assigned_agent_id and task.assigned_agent_id != previous_assigned:
if (
actor.actor_type == "agent"
and actor.agent
and task.assigned_agent_id == actor.agent.id
):
return task
assigned_agent = session.get(Agent, task.assigned_agent_id)
if assigned_agent:
board = session.get(Board, task.board_id) if task.board_id else None
if board:
_notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
return task
@@ -440,6 +535,8 @@ def delete_task(
task: Task = Depends(get_task_or_404),
auth: AuthContext = Depends(require_admin_auth),
) -> dict[str, bool]:
session.execute(delete(ActivityEvent).where(col(ActivityEvent.task_id) == task.id))
session.execute(delete(TaskFingerprint).where(col(TaskFingerprint.task_id) == task.id))
session.delete(task)
session.commit()
return {"ok": True}

View File

@@ -144,6 +144,9 @@ def _build_context(
context_key: normalized_identity.get(field, DEFAULT_IDENTITY_PROFILE[field])
for field, context_key in IDENTITY_PROFILE_FIELDS.items()
}
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
return {
"agent_name": agent.name,
"agent_id": agent_id,
@@ -162,7 +165,7 @@ def _build_context(
"main_session_key": main_session_key,
"workspace_root": workspace_root,
"user_name": (user.name or "") if user else "",
"user_preferred_name": (user.preferred_name or "") if user else "",
"user_preferred_name": preferred_name,
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
@@ -198,6 +201,9 @@ def _build_main_context(
context_key: normalized_identity.get(field, DEFAULT_IDENTITY_PROFILE[field])
for field, context_key in IDENTITY_PROFILE_FIELDS.items()
}
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
return {
"agent_name": agent.name,
"agent_id": str(agent.id),
@@ -207,7 +213,7 @@ def _build_main_context(
"main_session_key": gateway.main_session_key or "",
"workspace_root": gateway.workspace_root or "",
"user_name": (user.name or "") if user else "",
"user_preferred_name": (user.preferred_name or "") if user else "",
"user_preferred_name": preferred_name,
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
@@ -449,7 +455,8 @@ async def provision_agent(
await _patch_gateway_agent_list(agent_id, workspace_path, heartbeat, client_config)
context = _build_context(agent, board, gateway, auth_token, user)
supported = await _supported_gateway_files(client_config)
supported = set(await _supported_gateway_files(client_config))
supported.add("USER.md")
existing_files = await _gateway_agent_files_index(agent_id, client_config)
include_bootstrap = True
if action == "update" and not force_bootstrap:
@@ -500,7 +507,8 @@ async def provision_main_agent(
raise OpenClawGatewayError("Unable to resolve gateway main agent id")
context = _build_main_context(agent, gateway, auth_token, user)
supported = await _supported_gateway_files(client_config)
supported = set(await _supported_gateway_files(client_config))
supported.add("USER.md")
existing_files = await _gateway_agent_files_index(agent_id, client_config)
include_bootstrap = action != "update" or force_bootstrap
if action == "update" and not force_bootstrap: