feat: add is_chat field to board memory and task_id to approvals, update pagination and response models

This commit is contained in:
Abhimanyu Saharan
2026-02-06 19:11:11 +05:30
parent d86fe0a7a6
commit 6c14af0451
76 changed files with 2070 additions and 571 deletions

View File

@@ -1,27 +1,27 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends
from sqlalchemy import desc
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.api.deps import ActorContext, require_admin_or_agent
from app.db.pagination import paginate
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.schemas.activity_events import ActivityEventRead
from app.schemas.pagination import DefaultLimitOffsetPage
router = APIRouter(prefix="/activity", tags=["activity"])
@router.get("", response_model=list[ActivityEventRead])
@router.get("", response_model=DefaultLimitOffsetPage[ActivityEventRead])
async def list_activity(
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[ActivityEvent]:
) -> DefaultLimitOffsetPage[ActivityEventRead]:
statement = select(ActivityEvent)
if actor.actor_type == "agent" and actor.agent:
statement = statement.where(ActivityEvent.agent_id == actor.agent.id)
statement = statement.order_by(desc(col(ActivityEvent.created_at))).offset(offset).limit(limit)
return list(await session.exec(statement))
statement = statement.order_by(desc(col(ActivityEvent.created_at)))
return await paginate(session, statement)

View File

@@ -1,9 +1,11 @@
from __future__ import annotations
from collections.abc import Sequence
from typing import Any, cast
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlmodel import select
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.api import agents as agents_api
@@ -13,6 +15,7 @@ from app.api import board_onboarding as onboarding_api
from app.api import tasks as tasks_api
from app.api.deps import ActorContext, get_board_or_404, get_task_or_404
from app.core.agent_auth import AgentAuthContext, get_agent_auth_context
from app.db.pagination import paginate
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
@@ -30,6 +33,7 @@ from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
from app.schemas.board_onboarding import BoardOnboardingAgentUpdate, BoardOnboardingRead
from app.schemas.boards import BoardRead
from app.schemas.common import OkResponse
from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
@@ -54,15 +58,16 @@ async def _gateway_config(session: AsyncSession, board: Board) -> GatewayClientC
return GatewayClientConfig(url=gateway.url, token=gateway.token)
@router.get("/boards", response_model=list[BoardRead])
@router.get("/boards", response_model=DefaultLimitOffsetPage[BoardRead])
async def list_boards(
session: AsyncSession = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[Board]:
) -> DefaultLimitOffsetPage[BoardRead]:
statement = select(Board)
if agent_ctx.agent.board_id:
board = await session.get(Board, agent_ctx.agent.board_id)
return [board] if board else []
return list(await session.exec(select(Board)))
statement = statement.where(col(Board.id) == agent_ctx.agent.board_id)
statement = statement.order_by(col(Board.created_at).desc())
return await paginate(session, statement)
@router.get("/boards/{board_id}", response_model=BoardRead)
@@ -74,13 +79,12 @@ def get_board(
return board
@router.get("/agents", response_model=list[AgentRead])
@router.get("/agents", response_model=DefaultLimitOffsetPage[AgentRead])
async def list_agents(
board_id: UUID | None = Query(default=None),
limit: int | None = Query(default=None, ge=1, le=200),
session: AsyncSession = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[AgentRead]:
) -> DefaultLimitOffsetPage[AgentRead]:
statement = select(Agent)
if agent_ctx.agent.board_id:
if board_id and board_id != agent_ctx.agent.board_id:
@@ -88,32 +92,33 @@ async def list_agents(
statement = statement.where(Agent.board_id == agent_ctx.agent.board_id)
elif board_id:
statement = statement.where(Agent.board_id == board_id)
if limit is not None:
statement = statement.limit(limit)
agents = list(await session.exec(statement))
main_session_keys = await agents_api._get_gateway_main_session_keys(session)
return [
agents_api._to_agent_read(agents_api._with_computed_status(agent), main_session_keys)
for agent in agents
]
statement = statement.order_by(col(Agent.created_at).desc())
def _transform(items: Sequence[Any]) -> Sequence[Any]:
agents = cast(Sequence[Agent], items)
return [
agents_api._to_agent_read(agents_api._with_computed_status(agent), main_session_keys)
for agent in agents
]
return await paginate(session, statement, transformer=_transform)
@router.get("/boards/{board_id}/tasks", response_model=list[TaskRead])
@router.get("/boards/{board_id}/tasks", response_model=DefaultLimitOffsetPage[TaskRead])
async def list_tasks(
status_filter: str | None = Query(default=None, alias="status"),
assigned_agent_id: UUID | None = None,
unassigned: bool | None = None,
limit: int | None = Query(default=None, ge=1, le=200),
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[Task]:
) -> DefaultLimitOffsetPage[TaskRead]:
_guard_board_access(agent_ctx, board)
return await tasks_api.list_tasks(
status_filter=status_filter,
assigned_agent_id=assigned_agent_id,
unassigned=unassigned,
limit=limit,
board=board,
session=session,
actor=_actor(agent_ctx),
@@ -185,12 +190,15 @@ async def update_task(
)
@router.get("/boards/{board_id}/tasks/{task_id}/comments", response_model=list[TaskCommentRead])
@router.get(
"/boards/{board_id}/tasks/{task_id}/comments",
response_model=DefaultLimitOffsetPage[TaskCommentRead],
)
async def list_task_comments(
task: Task = Depends(get_task_or_404),
session: AsyncSession = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[ActivityEvent]:
) -> DefaultLimitOffsetPage[TaskCommentRead]:
if agent_ctx.agent.board_id and task.board_id and agent_ctx.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return await tasks_api.list_task_comments(
@@ -217,18 +225,14 @@ async def create_task_comment(
)
@router.get("/boards/{board_id}/memory", response_model=list[BoardMemoryRead])
@router.get("/boards/{board_id}/memory", response_model=DefaultLimitOffsetPage[BoardMemoryRead])
async def list_board_memory(
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[BoardMemory]:
) -> DefaultLimitOffsetPage[BoardMemoryRead]:
_guard_board_access(agent_ctx, board)
return await board_memory_api.list_board_memory(
limit=limit,
offset=offset,
board=board,
session=session,
actor=_actor(agent_ctx),
@@ -251,13 +255,16 @@ async def create_board_memory(
)
@router.get("/boards/{board_id}/approvals", response_model=list[ApprovalRead])
@router.get(
"/boards/{board_id}/approvals",
response_model=DefaultLimitOffsetPage[ApprovalRead],
)
async def list_approvals(
status_filter: ApprovalStatus | None = Query(default=None, alias="status"),
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
agent_ctx: AgentAuthContext = Depends(get_agent_auth_context),
) -> list[Approval]:
) -> DefaultLimitOffsetPage[ApprovalRead]:
_guard_board_access(agent_ctx, board)
return await approvals_api.list_approvals(
status_filter=status_filter,

View File

@@ -3,8 +3,9 @@ from __future__ import annotations
import asyncio
import json
import re
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Sequence
from datetime import datetime, timedelta, timezone
from typing import Any, cast
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
@@ -17,6 +18,7 @@ from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agen
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import async_session_maker, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
@@ -33,6 +35,7 @@ from app.schemas.agents import (
AgentRead,
AgentUpdate,
)
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.activity_log import record_activity
from app.services.agent_provisioning import (
DEFAULT_HEARTBEAT_CONFIG,
@@ -231,14 +234,28 @@ async def _send_wakeup_message(
await send_message(message, session_key=session_key, config=config, deliver=True)
@router.get("", response_model=list[AgentRead])
@router.get("", response_model=DefaultLimitOffsetPage[AgentRead])
async def list_agents(
board_id: UUID | None = Query(default=None),
gateway_id: UUID | None = Query(default=None),
session: AsyncSession = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> list[AgentRead]:
agents = list(await session.exec(select(Agent)))
) -> DefaultLimitOffsetPage[AgentRead]:
main_session_keys = await _get_gateway_main_session_keys(session)
return [_to_agent_read(_with_computed_status(agent), main_session_keys) for agent in agents]
statement = select(Agent)
if board_id is not None:
statement = statement.where(col(Agent.board_id) == board_id)
if gateway_id is not None:
statement = statement.join(Board, col(Agent.board_id) == col(Board.id)).where(
col(Board.gateway_id) == gateway_id
)
statement = statement.order_by(col(Agent.created_at).desc())
def _transform(items: Sequence[Any]) -> Sequence[Any]:
agents = cast(Sequence[Agent], items)
return [_to_agent_read(_with_computed_status(agent), main_session_keys) for agent in agents]
return await paginate(session, statement, transformer=_transform)
@router.get("/stream")

View File

@@ -7,7 +7,7 @@ from datetime import datetime, timezone
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, or_
from sqlalchemy import asc, case, func, or_
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from sse_starlette.sse import EventSourceResponse
@@ -15,13 +15,32 @@ from sse_starlette.sse import EventSourceResponse
from app.api.deps import ActorContext, get_board_or_404, require_admin_auth, require_admin_or_agent
from app.core.auth import AuthContext
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import async_session_maker, get_session
from app.models.approvals import Approval
from app.models.boards import Board
from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus, ApprovalUpdate
from app.schemas.pagination import DefaultLimitOffsetPage
router = APIRouter(prefix="/boards/{board_id}/approvals", tags=["approvals"])
TASK_ID_KEYS: tuple[str, ...] = ("task_id", "taskId", "taskID")
def _extract_task_id(payload: dict[str, object] | None) -> UUID | None:
if not payload:
return None
for key in TASK_ID_KEYS:
value = payload.get(key)
if isinstance(value, UUID):
return value
if isinstance(value, str):
try:
return UUID(value)
except ValueError:
continue
return None
def _parse_since(value: str | None) -> datetime | None:
if not value:
@@ -66,13 +85,13 @@ async def _fetch_approval_events(
return list(await session.exec(statement))
@router.get("", response_model=list[ApprovalRead])
@router.get("", response_model=DefaultLimitOffsetPage[ApprovalRead])
async def list_approvals(
status_filter: ApprovalStatus | None = Query(default=None, alias="status"),
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[Approval]:
) -> DefaultLimitOffsetPage[ApprovalRead]:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
@@ -80,7 +99,7 @@ async def list_approvals(
if status_filter:
statement = statement.where(col(Approval.status) == status_filter)
statement = statement.order_by(col(Approval.created_at).desc())
return list(await session.exec(statement))
return await paginate(session, statement)
@router.get("/stream")
@@ -103,11 +122,53 @@ async def stream_approvals(
break
async with async_session_maker() as session:
approvals = await _fetch_approval_events(session, board.id, last_seen)
pending_approvals_count = int(
(
await session.exec(
select(func.count(col(Approval.id)))
.where(col(Approval.board_id) == board.id)
.where(col(Approval.status) == "pending")
)
).one()
)
task_ids = {approval.task_id for approval in approvals if approval.task_id is not None}
counts_by_task_id: dict[UUID, tuple[int, int]] = {}
if task_ids:
rows = list(
await session.exec(
select(
col(Approval.task_id),
func.count(col(Approval.id)).label("total"),
func.sum(
case((col(Approval.status) == "pending", 1), else_=0)
).label("pending"),
)
.where(col(Approval.board_id) == board.id)
.where(col(Approval.task_id).in_(task_ids))
.group_by(col(Approval.task_id))
)
)
for task_id, total, pending in rows:
if task_id is None:
continue
counts_by_task_id[task_id] = (int(total or 0), int(pending or 0))
for approval in approvals:
updated_at = _approval_updated_at(approval)
if updated_at > last_seen:
last_seen = updated_at
payload = {"approval": _serialize_approval(approval)}
payload: dict[str, object] = {
"approval": _serialize_approval(approval),
"pending_approvals_count": pending_approvals_count,
}
if approval.task_id is not None:
counts = counts_by_task_id.get(approval.task_id)
if counts is not None:
total, pending = counts
payload["task_counts"] = {
"task_id": str(approval.task_id),
"approvals_count": total,
"approvals_pending_count": pending,
}
yield {"event": "approval", "data": json.dumps(payload)}
await asyncio.sleep(2)
@@ -124,8 +185,10 @@ async def create_approval(
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
task_id = payload.task_id or _extract_task_id(payload.payload)
approval = Approval(
board_id=board.id,
task_id=task_id,
agent_id=payload.agent_id,
action_type=payload.action_type,
payload=payload.payload,

View File

@@ -8,6 +8,7 @@ from datetime import datetime, timezone
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import func
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from sse_starlette.sse import EventSourceResponse
@@ -15,6 +16,7 @@ from sse_starlette.sse import EventSourceResponse
from app.api.deps import ActorContext, get_board_or_404, require_admin_or_agent
from app.core.config import settings
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import async_session_maker, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
@@ -23,6 +25,7 @@ from app.models.board_memory import BoardMemory
from app.models.boards import Board
from app.models.gateways import Gateway
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
from app.schemas.pagination import DefaultLimitOffsetPage
router = APIRouter(prefix="/boards/{board_id}/memory", tags=["board-memory"])
@@ -90,11 +93,19 @@ async def _fetch_memory_events(
session: AsyncSession,
board_id: UUID,
since: datetime,
is_chat: bool | None = None,
) -> list[BoardMemory]:
statement = (
select(BoardMemory)
.where(col(BoardMemory.board_id) == board_id)
.where(col(BoardMemory.created_at) >= since)
# Old/invalid rows (empty/whitespace-only content) can exist; exclude them to
# satisfy the NonEmptyStr response schema.
.where(func.length(func.trim(col(BoardMemory.content))) > 0)
)
if is_chat is not None:
statement = statement.where(col(BoardMemory.is_chat) == is_chat)
statement = (
statement.where(col(BoardMemory.created_at) >= since)
.order_by(col(BoardMemory.created_at))
)
return list(await session.exec(statement))
@@ -159,25 +170,27 @@ async def _notify_chat_targets(
continue
@router.get("", response_model=list[BoardMemoryRead])
@router.get("", response_model=DefaultLimitOffsetPage[BoardMemoryRead])
async def list_board_memory(
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
is_chat: bool | None = Query(default=None),
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[BoardMemory]:
) -> DefaultLimitOffsetPage[BoardMemoryRead]:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
statement = (
select(BoardMemory)
.where(col(BoardMemory.board_id) == board.id)
.order_by(col(BoardMemory.created_at).desc())
.offset(offset)
.limit(limit)
# Old/invalid rows (empty/whitespace-only content) can exist; exclude them to
# satisfy the NonEmptyStr response schema.
.where(func.length(func.trim(col(BoardMemory.content))) > 0)
)
return list(await session.exec(statement))
if is_chat is not None:
statement = statement.where(col(BoardMemory.is_chat) == is_chat)
statement = statement.order_by(col(BoardMemory.created_at).desc())
return await paginate(session, statement)
@router.get("/stream")
@@ -186,6 +199,7 @@ async def stream_board_memory(
board: Board = Depends(get_board_or_404),
actor: ActorContext = Depends(require_admin_or_agent),
since: str | None = Query(default=None),
is_chat: bool | None = Query(default=None),
) -> EventSourceResponse:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
@@ -199,7 +213,12 @@ async def stream_board_memory(
if await request.is_disconnected():
break
async with async_session_maker() as session:
memories = await _fetch_memory_events(session, board.id, last_seen)
memories = await _fetch_memory_events(
session,
board.id,
last_seen,
is_chat=is_chat,
)
for memory in memories:
if memory.created_at > last_seen:
last_seen = memory.created_at
@@ -231,6 +250,7 @@ async def create_board_memory(
board_id=board.id,
content=payload.content,
tags=payload.tags,
is_chat=is_chat,
source=source,
)
session.add(memory)

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import logging
import re
from datetime import datetime
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, status

View File

@@ -1,10 +1,11 @@
from __future__ import annotations
import re
from uuid import uuid4
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import delete
from sqlalchemy import func
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -12,6 +13,7 @@ from app.api.deps import ActorContext, get_board_or_404, require_admin_auth, req
from app.core.auth import AuthContext
from app.core.time import utcnow
from app.db import crud
from app.db.pagination import paginate
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import (
@@ -31,6 +33,9 @@ from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
from app.schemas.common import OkResponse
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.view_models import BoardSnapshot
from app.services.board_snapshot import build_board_snapshot
router = APIRouter(prefix="/boards", tags=["boards"])
@@ -149,12 +154,17 @@ async def _cleanup_agent_on_gateway(
)
@router.get("", response_model=list[BoardRead])
@router.get("", response_model=DefaultLimitOffsetPage[BoardRead])
async def list_boards(
gateway_id: UUID | None = Query(default=None),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[Board]:
return list(await session.exec(select(Board)))
) -> DefaultLimitOffsetPage[BoardRead]:
statement = select(Board)
if gateway_id is not None:
statement = statement.where(col(Board.gateway_id) == gateway_id)
statement = statement.order_by(func.lower(col(Board.name)).asc(), col(Board.created_at).desc())
return await paginate(session, statement)
@router.post("", response_model=BoardRead)
@@ -175,6 +185,18 @@ def get_board(
return board
@router.get("/{board_id}/snapshot", response_model=BoardSnapshot)
async def get_board_snapshot(
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> BoardSnapshot:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return await build_board_snapshot(session, board)
@router.patch("/{board_id}", response_model=BoardRead)
async def update_board(
payload: BoardUpdate,

View File

@@ -1,15 +1,15 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import select
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext, get_auth_context
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
@@ -17,6 +17,7 @@ from app.models.agents import Agent
from app.models.gateways import Gateway
from app.schemas.common import OkResponse
from app.schemas.gateways import GatewayCreate, GatewayRead, GatewayUpdate
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.agent_provisioning import DEFAULT_HEARTBEAT_CONFIG, provision_main_agent
router = APIRouter(prefix="/gateways", tags=["gateways"])
@@ -362,12 +363,13 @@ async def _send_skyll_disable_message(gateway: Gateway) -> None:
)
@router.get("", response_model=list[GatewayRead])
@router.get("", response_model=DefaultLimitOffsetPage[GatewayRead])
async def list_gateways(
session: AsyncSession = Depends(get_session),
auth: AuthContext = Depends(get_auth_context),
) -> list[Gateway]:
return list(await session.exec(select(Gateway)))
) -> DefaultLimitOffsetPage[GatewayRead]:
statement = select(Gateway).order_by(col(Gateway.created_at).desc())
return await paginate(session, statement)
@router.post("", response_model=GatewayRead)

View File

@@ -25,16 +25,19 @@ from app.api.deps import (
)
from app.core.auth import AuthContext
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import async_session_maker, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
from app.schemas.common import OkResponse
from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
@@ -410,16 +413,18 @@ async def stream_tasks(
return EventSourceResponse(event_generator(), ping=15)
@router.get("", response_model=list[TaskRead])
@router.get("", response_model=DefaultLimitOffsetPage[TaskRead])
async def list_tasks(
status_filter: str | None = Query(default=None, alias="status"),
assigned_agent_id: UUID | None = None,
unassigned: bool | None = None,
limit: int | None = Query(default=None, ge=1, le=200),
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[Task]:
) -> DefaultLimitOffsetPage[TaskRead]:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
statement = select(Task).where(Task.board_id == board.id)
if status_filter:
statuses = [s.strip() for s in status_filter.split(",") if s.strip()]
@@ -434,9 +439,8 @@ async def list_tasks(
statement = statement.where(col(Task.assigned_agent_id) == assigned_agent_id)
if unassigned:
statement = statement.where(col(Task.assigned_agent_id).is_(None))
if limit is not None:
statement = statement.limit(limit)
return list(await session.exec(statement))
statement = statement.order_by(col(Task.created_at).desc())
return await paginate(session, statement)
@router.post("", response_model=TaskRead)
@@ -661,17 +665,18 @@ async def delete_task(
) -> OkResponse:
await session.execute(delete(ActivityEvent).where(col(ActivityEvent.task_id) == task.id))
await session.execute(delete(TaskFingerprint).where(col(TaskFingerprint.task_id) == task.id))
await session.execute(delete(Approval).where(col(Approval.task_id) == task.id))
await session.delete(task)
await session.commit()
return OkResponse()
@router.get("/{task_id}/comments", response_model=list[TaskCommentRead])
@router.get("/{task_id}/comments", response_model=DefaultLimitOffsetPage[TaskCommentRead])
async def list_task_comments(
task: Task = Depends(get_task_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[ActivityEvent]:
) -> DefaultLimitOffsetPage[TaskCommentRead]:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and task.board_id and actor.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
@@ -681,7 +686,7 @@ async def list_task_comments(
.where(col(ActivityEvent.event_type) == "task.comment")
.order_by(asc(col(ActivityEvent.created_at)))
)
return list(await session.exec(statement))
return await paginate(session, statement)
@router.post("/{task_id}/comments", response_model=TaskCommentRead)

View File

@@ -1,11 +1,20 @@
from __future__ import annotations
from pathlib import Path
from typing import Self
from pydantic import model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
BACKEND_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_ENV_FILE = BACKEND_ROOT / ".env"
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
# Load `backend/.env` regardless of current working directory.
# (Important when running uvicorn from repo root or via a process manager.)
env_file=[DEFAULT_ENV_FILE, ".env"],
env_file_encoding="utf-8",
extra="ignore",
)
@@ -30,5 +39,13 @@ class Settings(BaseSettings):
log_format: str = "text"
log_use_utc: bool = False
@model_validator(mode="after")
def _defaults(self) -> Self:
# In dev, default to applying Alembic migrations at startup to avoid schema drift
# (e.g. missing newly-added columns).
if "db_auto_migrate" not in self.model_fields_set and self.environment == "dev":
self.db_auto_migrate = True
return self
settings = Settings()

View File

@@ -8,4 +8,3 @@ def utcnow() -> datetime:
# Keep naive UTC values for compatibility with existing DB schema/queries.
return datetime.now(UTC).replace(tzinfo=None)

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable, Sequence
from typing import Any, TypeVar, cast
from fastapi_pagination.ext.sqlalchemy import paginate as _paginate
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.sql.expression import Select, SelectOfScalar
from app.schemas.pagination import DefaultLimitOffsetPage
T = TypeVar("T")
Transformer = Callable[[Sequence[Any]], Sequence[Any] | Awaitable[Sequence[Any]]]
async def paginate(
session: AsyncSession,
statement: Select[Any] | SelectOfScalar[Any],
*,
transformer: Transformer | None = None,
) -> DefaultLimitOffsetPage[T]:
# fastapi-pagination is not fully typed (it returns Any), but response_model validation
# ensures runtime correctness. Centralize casts here to keep strict mypy clean.
return cast(
DefaultLimitOffsetPage[T],
await _paginate(session, statement, transformer=transformer),
)

View File

@@ -5,6 +5,7 @@ from contextlib import asynccontextmanager
from fastapi import APIRouter, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi_pagination import add_pagination
from app.api.activity import router as activity_router
from app.api.agent import router as agent_router
@@ -75,3 +76,5 @@ api_v1.include_router(approvals_router)
api_v1.include_router(tasks_router)
api_v1.include_router(users_router)
app.include_router(api_v1)
add_pagination(app)

View File

@@ -14,6 +14,7 @@ class Approval(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
task_id: UUID | None = Field(default=None, foreign_key="tasks.id", index=True)
agent_id: UUID | None = Field(default=None, foreign_key="agents.id", index=True)
action_type: str
payload: dict[str, object] | None = Field(default=None, sa_column=Column(JSON))

View File

@@ -16,5 +16,6 @@ class BoardMemory(SQLModel, table=True):
board_id: UUID = Field(foreign_key="boards.id", index=True)
content: str
tags: list[str] | None = Field(default=None, sa_column=Column(JSON))
is_chat: bool = Field(default=False, index=True)
source: str | None = None
created_at: datetime = Field(default_factory=utcnow)

View File

@@ -13,6 +13,7 @@ ApprovalStatus = Literal["pending", "approved", "rejected"]
class ApprovalBase(SQLModel):
action_type: str
task_id: UUID | None = None
payload: dict[str, object] | None = None
confidence: int
rubric_scores: dict[str, int] | None = None

View File

@@ -9,12 +9,18 @@ from app.schemas.common import NonEmptyStr
class BoardMemoryCreate(SQLModel):
# For writes, reject blank/whitespace-only content.
content: NonEmptyStr
tags: list[str] | None = None
source: str | None = None
class BoardMemoryRead(BoardMemoryCreate):
class BoardMemoryRead(SQLModel):
id: UUID
board_id: UUID
# For reads, allow legacy rows that may have empty content (avoid response validation 500s).
content: str
tags: list[str] | None = None
source: str | None = None
is_chat: bool = False
created_at: datetime

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from typing import TypeVar
from fastapi import Query
from fastapi_pagination.customization import CustomizedPage, UseParamsFields
from fastapi_pagination.limit_offset import LimitOffsetPage
T = TypeVar("T")
# Project-wide default pagination response model.
# - Keep `limit` / `offset` naming (matches existing API conventions).
# - Cap list endpoints to 200 items per request (matches prior route-level constraints).
DefaultLimitOffsetPage = CustomizedPage[
LimitOffsetPage[T],
UseParamsFields(
limit=Query(200, ge=1, le=200),
offset=Query(0, ge=0),
),
]

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from sqlmodel import SQLModel
from app.schemas.agents import AgentRead
from app.schemas.approvals import ApprovalRead
from app.schemas.board_memory import BoardMemoryRead
from app.schemas.boards import BoardRead
from app.schemas.tasks import TaskRead
class TaskCardRead(TaskRead):
assignee: str | None = None
approvals_count: int = 0
approvals_pending_count: int = 0
class BoardSnapshot(SQLModel):
board: BoardRead
tasks: list[TaskCardRead]
agents: list[AgentRead]
approvals: list[ApprovalRead]
chat_messages: list[BoardMemoryRead]
pending_approvals_count: int = 0

View File

@@ -0,0 +1,158 @@
from __future__ import annotations
from datetime import timedelta
from uuid import UUID
from sqlalchemy import case, func
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.time import utcnow
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.board_memory import BoardMemory
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.agents import AgentRead
from app.schemas.approvals import ApprovalRead
from app.schemas.board_memory import BoardMemoryRead
from app.schemas.boards import BoardRead
from app.schemas.view_models import BoardSnapshot, TaskCardRead
OFFLINE_AFTER = timedelta(minutes=10)
def _computed_agent_status(agent: Agent) -> str:
now = utcnow()
if agent.status in {"deleting", "updating"}:
return agent.status
if agent.last_seen_at is None:
return "provisioning"
if now - agent.last_seen_at > OFFLINE_AFTER:
return "offline"
return agent.status
async def _gateway_main_session_keys(session: AsyncSession) -> set[str]:
keys = (await session.exec(select(Gateway.main_session_key))).all()
return {key for key in keys if key}
def _agent_to_read(agent: Agent, main_session_keys: set[str]) -> AgentRead:
model = AgentRead.model_validate(agent, from_attributes=True)
computed_status = _computed_agent_status(agent)
is_gateway_main = bool(agent.openclaw_session_id and agent.openclaw_session_id in main_session_keys)
return model.model_copy(update={"status": computed_status, "is_gateway_main": is_gateway_main})
def _memory_to_read(memory: BoardMemory) -> BoardMemoryRead:
return BoardMemoryRead.model_validate(memory, from_attributes=True)
def _approval_to_read(approval: Approval) -> ApprovalRead:
return ApprovalRead.model_validate(approval, from_attributes=True)
def _task_to_card(
task: Task,
*,
agent_name_by_id: dict[UUID, str],
counts_by_task_id: dict[UUID, tuple[int, int]],
) -> TaskCardRead:
card = TaskCardRead.model_validate(task, from_attributes=True)
approvals_count, approvals_pending_count = counts_by_task_id.get(task.id, (0, 0))
assignee = (
agent_name_by_id.get(task.assigned_agent_id) if task.assigned_agent_id is not None else None
)
return card.model_copy(
update={
"assignee": assignee,
"approvals_count": approvals_count,
"approvals_pending_count": approvals_pending_count,
}
)
async def build_board_snapshot(session: AsyncSession, board: Board) -> BoardSnapshot:
board_read = BoardRead.model_validate(board, from_attributes=True)
tasks = list(
await session.exec(
select(Task).where(col(Task.board_id) == board.id).order_by(col(Task.created_at).desc())
)
)
main_session_keys = await _gateway_main_session_keys(session)
agents = list(
await session.exec(
select(Agent).where(col(Agent.board_id) == board.id).order_by(col(Agent.created_at).desc())
)
)
agent_reads = [_agent_to_read(agent, main_session_keys) for agent in agents]
agent_name_by_id = {agent.id: agent.name for agent in agents}
pending_approvals_count = int(
(await session.exec(
select(func.count(col(Approval.id)))
.where(col(Approval.board_id) == board.id)
.where(col(Approval.status) == "pending")
)).one()
)
approvals = list(
await session.exec(
select(Approval)
.where(col(Approval.board_id) == board.id)
.order_by(col(Approval.created_at).desc())
.limit(200)
)
)
approval_reads = [_approval_to_read(approval) for approval in approvals]
counts_by_task_id: dict[UUID, tuple[int, int]] = {}
rows = list(
await session.exec(
select(
col(Approval.task_id),
func.count(col(Approval.id)).label("total"),
func.sum(case((col(Approval.status) == "pending", 1), else_=0)).label("pending"),
)
.where(col(Approval.board_id) == board.id)
.where(col(Approval.task_id).is_not(None))
.group_by(col(Approval.task_id))
)
)
for task_id, total, pending in rows:
if task_id is None:
continue
counts_by_task_id[task_id] = (int(total or 0), int(pending or 0))
task_cards = [
_task_to_card(task, agent_name_by_id=agent_name_by_id, counts_by_task_id=counts_by_task_id)
for task in tasks
]
chat_messages = list(
await session.exec(
select(BoardMemory)
.where(col(BoardMemory.board_id) == board.id)
.where(col(BoardMemory.is_chat).is_(True))
# Old/invalid rows (empty/whitespace-only content) can exist; exclude them to
# satisfy the NonEmptyStr response schema.
.where(func.length(func.trim(col(BoardMemory.content))) > 0)
.order_by(col(BoardMemory.created_at).desc())
.limit(200)
)
)
chat_messages.sort(key=lambda item: item.created_at)
chat_reads = [_memory_to_read(memory) for memory in chat_messages]
return BoardSnapshot(
board=board_read,
tasks=task_cards,
agents=agent_reads,
approvals=approval_reads,
chat_messages=chat_reads,
pending_approvals_count=pending_approvals_count,
)