refactor: improve session handling and streamline agent provisioning logic in agents.py

This commit is contained in:
Abhimanyu Saharan
2026-02-09 02:53:53 +05:30
parent 50e3ff38b2
commit 10d23777be

View File

@@ -192,9 +192,10 @@ async def _ensure_gateway_session(
session_key = _build_session_key(agent_name) session_key = _build_session_key(agent_name)
try: try:
await ensure_session(session_key, config=config, label=agent_name) await ensure_session(session_key, config=config, label=agent_name)
return session_key, None
except OpenClawGatewayError as exc: except OpenClawGatewayError as exc:
return session_key, str(exc) return session_key, str(exc)
else:
return session_key, None
def _with_computed_status(agent: Agent) -> Agent: def _with_computed_status(agent: Agent) -> Agent:
@@ -356,21 +357,20 @@ async def stream_agents(
while True: while True:
if await request.is_disconnected(): if await request.is_disconnected():
break break
async with async_session_maker() as session: async with async_session_maker() as stream_session:
if board_id is not None: if board_id is not None:
agents = await _fetch_agent_events(session, board_id, last_seen) agents = await _fetch_agent_events(stream_session, board_id, last_seen)
elif allowed_ids: elif allowed_ids:
agents = await _fetch_agent_events(session, None, last_seen) agents = await _fetch_agent_events(stream_session, None, last_seen)
agents = [agent for agent in agents if agent.board_id in allowed_ids] agents = [agent for agent in agents if agent.board_id in allowed_ids]
else: else:
agents = [] agents = []
main_session_keys = ( main_session_keys = (
await _get_gateway_main_session_keys(session) if agents else set() await _get_gateway_main_session_keys(stream_session) if agents else set()
) )
for agent in agents: for agent in agents:
updated_at = agent.updated_at or agent.last_seen_at or utcnow() updated_at = agent.updated_at or agent.last_seen_at or utcnow()
if updated_at > last_seen: last_seen = max(updated_at, last_seen)
last_seen = updated_at
payload = {"agent": _serialize_agent(agent, main_session_keys)} payload = {"agent": _serialize_agent(agent, main_session_keys)}
yield {"event": "agent", "data": json.dumps(payload)} yield {"event": "agent", "data": json.dumps(payload)}
await asyncio.sleep(2) await asyncio.sleep(2)
@@ -841,8 +841,7 @@ async def heartbeat_or_create_agent(
except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover
_record_instruction_failure(session, agent, str(exc), "provision") _record_instruction_failure(session, agent, str(exc), "provision")
await session.commit() await session.commit()
else: elif actor.actor_type == "user":
if actor.actor_type == "user":
ctx = await _require_user_context(session, actor.user) ctx = await _require_user_context(session, actor.user)
await _require_agent_access(session, agent=agent, ctx=ctx, write=True) await _require_agent_access(session, agent=agent, ctx=ctx, write=True)