feat(agent): Refactor agent cleanup and provisioning logic for improved clarity and functionality

This commit is contained in:
Abhimanyu Saharan
2026-02-05 01:40:28 +05:30
parent 2c24d8993f
commit 6c4c97d2ea
3 changed files with 57 additions and 136 deletions

View File

@@ -9,7 +9,7 @@ from sqlalchemy import update
from sqlmodel import Session, col, select from sqlmodel import Session, col, select
from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agent from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agent
from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext from app.core.auth import AuthContext
from app.db.session import get_session from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
@@ -20,17 +20,15 @@ from app.models.boards import Board
from app.models.gateways import Gateway from app.models.gateways import Gateway
from app.schemas.agents import ( from app.schemas.agents import (
AgentCreate, AgentCreate,
AgentDeleteConfirm,
AgentHeartbeat, AgentHeartbeat,
AgentHeartbeatCreate, AgentHeartbeatCreate,
AgentProvisionConfirm,
AgentRead, AgentRead,
AgentUpdate, AgentUpdate,
) )
from app.services.activity_log import record_activity from app.services.activity_log import record_activity
from app.services.agent_provisioning import ( from app.services.agent_provisioning import (
DEFAULT_HEARTBEAT_CONFIG, DEFAULT_HEARTBEAT_CONFIG,
cleanup_agent_direct, cleanup_agent,
provision_agent, provision_agent,
) )
@@ -143,15 +141,6 @@ def _record_instruction_failure(session: Session, agent: Agent, error: str, acti
) )
def _record_wakeup_failure(session: Session, agent: Agent, error: str) -> None:
record_activity(
session,
event_type="agent.wakeup.failed",
message=f"Wakeup message failed: {error}",
agent_id=agent.id,
)
async def _send_wakeup_message( async def _send_wakeup_message(
agent: Agent, config: GatewayClientConfig, verb: str = "provisioned" agent: Agent, config: GatewayClientConfig, verb: str = "provisioned"
) -> None: ) -> None:
@@ -522,15 +511,13 @@ def delete_agent(
agent = session.get(Agent, agent_id) agent = session.get(Agent, agent_id)
if agent is None: if agent is None:
return {"ok": True} return {"ok": True}
if agent.status == "deleting" and agent.delete_confirm_token_hash:
return {"ok": True}
board = _require_board(session, str(agent.board_id) if agent.board_id else None) board = _require_board(session, str(agent.board_id) if agent.board_id else None)
gateway, _ = _require_gateway(session, board) gateway, client_config = _require_gateway(session, board)
try: try:
import asyncio import asyncio
asyncio.run(cleanup_agent_direct(agent, gateway, delete_workspace=True)) workspace_path = asyncio.run(cleanup_agent(agent, gateway))
except OpenClawGatewayError as exc: except OpenClawGatewayError as exc:
_record_instruction_failure(session, agent, str(exc), "delete") _record_instruction_failure(session, agent, str(exc), "delete")
session.commit() session.commit()
@@ -559,99 +546,34 @@ def delete_agent(
) )
session.delete(agent) session.delete(agent)
session.commit() session.commit()
return {"ok": True}
@router.post("/{agent_id}/provision/confirm")
def confirm_provision_agent(
agent_id: str,
payload: AgentProvisionConfirm,
session: Session = Depends(get_session),
) -> dict[str, bool]:
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if not agent.provision_confirm_token_hash:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Provisioning confirmation not requested.",
)
if not verify_agent_token(payload.token, agent.provision_confirm_token_hash):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token.")
if agent.board_id is None:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
board = _require_board(session, str(agent.board_id))
_, client_config = _require_gateway(session, board)
action = payload.action or agent.provision_action or "provision"
verb = "updated" if action == "update" else "provisioned"
# Always ask the main agent to confirm workspace cleanup.
try: try:
main_session = gateway.main_session_key
if main_session and workspace_path:
cleanup_message = (
"Cleanup request for deleted agent.\n\n"
f"Agent name: {agent.name}\n"
f"Agent id: {agent.id}\n"
f"Workspace path: {workspace_path}\n\n"
"Actions:\n"
"1) Remove the workspace directory.\n"
"2) Reply NO_REPLY.\n"
)
async def _request_cleanup() -> None:
await ensure_session(main_session, config=client_config, label="Main Agent")
await send_message(
cleanup_message,
session_key=main_session,
config=client_config,
deliver=False,
)
import asyncio import asyncio
asyncio.run(_send_wakeup_message(agent, client_config, verb=verb)) asyncio.run(_request_cleanup())
except OpenClawGatewayError as exc: except Exception:
_record_wakeup_failure(session, agent, str(exc)) # Cleanup request is best-effort; deletion already completed.
session.commit() pass
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Wakeup message failed: {exc}",
) from exc
agent.provision_confirm_token_hash = None
agent.provision_requested_at = None
agent.provision_action = None
if action == "update":
agent.status = "online"
agent.updated_at = datetime.utcnow()
session.add(agent)
record_activity(
session,
event_type=f"agent.{action}.confirmed",
message=f"{action.capitalize()} confirmed for {agent.name}.",
agent_id=agent.id,
)
record_activity(
session,
event_type="agent.wakeup.sent",
message=f"Wakeup message sent to {agent.name}.",
agent_id=agent.id,
)
session.commit()
return {"ok": True}
@router.post("/{agent_id}/delete/confirm")
def confirm_delete_agent(
agent_id: str,
payload: AgentDeleteConfirm,
session: Session = Depends(get_session),
) -> dict[str, bool]:
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if agent.status != "deleting":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Agent is not pending deletion.",
)
if not agent.delete_confirm_token_hash:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Delete confirmation not requested.",
)
if not verify_agent_token(payload.token, agent.delete_confirm_token_hash):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token.")
record_activity(
session,
event_type="agent.delete.confirmed",
message=f"Deleted agent {agent.name}.",
agent_id=None,
)
session.execute(
update(ActivityEvent).where(col(ActivityEvent.agent_id) == agent.id).values(agent_id=None)
)
session.delete(agent)
session.commit()
return {"ok": True} return {"ok": True}

View File

@@ -45,11 +45,3 @@ class AgentHeartbeatCreate(AgentHeartbeat):
name: str name: str
board_id: UUID | None = None board_id: UUID | None = None
class AgentDeleteConfirm(SQLModel):
token: str
class AgentProvisionConfirm(SQLModel):
token: str
action: str | None = None

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import json import json
import re import re
import shutil
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from uuid import uuid4 from uuid import uuid4
@@ -78,16 +77,6 @@ def _workspace_path(agent_name: str, workspace_root: str) -> str:
return f"{root}/workspace-{_slugify(agent_name)}" return f"{root}/workspace-{_slugify(agent_name)}"
def _resolve_workspace_dir(workspace_root: str, agent_name: str) -> Path:
if not workspace_root:
raise ValueError("gateway_workspace_root is required")
root = Path(workspace_root).expanduser().resolve()
workspace = Path(_workspace_path(agent_name, workspace_root)).expanduser().resolve()
if workspace == root or root not in workspace.parents:
raise ValueError("workspace path is not under workspace root")
return workspace
def _build_context( def _build_context(
agent: Agent, agent: Agent,
board: Board, board: Board,
@@ -268,6 +257,26 @@ async def _remove_gateway_agent_list(
await openclaw_call("config.patch", params, config=config) await openclaw_call("config.patch", params, config=config)
async def _get_gateway_agent_entry(
agent_id: str,
config: GatewayClientConfig,
) -> dict[str, Any] | None:
cfg = await openclaw_call("config.get", config=config)
if not isinstance(cfg, dict):
return None
data = cfg.get("config") or cfg.get("parsed") or {}
if not isinstance(data, dict):
return None
agents = data.get("agents") or {}
lst = agents.get("list") or []
if not isinstance(lst, list):
return None
for entry in lst:
if isinstance(entry, dict) and entry.get("id") == agent_id:
return entry
return None
async def provision_agent( async def provision_agent(
agent: Agent, agent: Agent,
board: Board, board: Board,
@@ -318,12 +327,10 @@ async def provision_agent(
) )
async def cleanup_agent_direct( async def cleanup_agent(
agent: Agent, agent: Agent,
gateway: Gateway, gateway: Gateway,
*, ) -> str | None:
delete_workspace: bool = True,
) -> None:
if not gateway.url: if not gateway.url:
return return
if not gateway.workspace_root: if not gateway.workspace_root:
@@ -331,13 +338,13 @@ async def cleanup_agent_direct(
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
agent_id = _agent_key(agent) agent_id = _agent_key(agent)
entry = await _get_gateway_agent_entry(agent_id, client_config)
await _remove_gateway_agent_list(agent_id, client_config) await _remove_gateway_agent_list(agent_id, client_config)
session_key = _session_key(agent) session_key = _session_key(agent)
await openclaw_call("sessions.delete", {"key": session_key}, config=client_config) await openclaw_call("sessions.delete", {"key": session_key}, config=client_config)
if delete_workspace: workspace_path = entry.get("workspace") if entry else None
workspace_dir = _resolve_workspace_dir(gateway.workspace_root, agent.name) if not workspace_path:
if workspace_dir.exists(): workspace_path = _workspace_path(agent.name, gateway.workspace_root)
shutil.rmtree(workspace_dir) return workspace_path