diff --git a/backend/app/api/work.py b/backend/app/api/work.py index 8f66cae..8f96936 100644 --- a/backend/app/api/work.py +++ b/backend/app/api/work.py @@ -99,8 +99,7 @@ def create_task( session.refresh(task) background.add_task( notify_openclaw, - session, - NotifyContext(event="task.created", actor_employee_id=actor_employee_id, task=task), + NotifyContext(event="task.created", actor_employee_id=actor_employee_id, task_id=task.id), ) # Explicitly return a serializable payload (guards against empty {} responses) return Task.model_validate(task) @@ -143,7 +142,7 @@ def dispatch_task( background.add_task( notify_openclaw, session, - NotifyContext(event="task.assigned", actor_employee_id=actor_employee_id, task=task), + NotifyContext(event="task.assigned", actor_employee_id=actor_employee_id, task_id=task.id), ) return {"ok": True} @@ -228,11 +227,10 @@ def review_task( if before_status != task.status: background.add_task( notify_openclaw, - session, NotifyContext( event="status.changed", actor_employee_id=actor_employee_id, - task=task, + task_id=task.id, changed_fields={"status": {"from": before_status, "to": task.status}}, ), ) @@ -321,11 +319,10 @@ def update_task( } background.add_task( notify_openclaw, - session, NotifyContext( event="task.assigned", actor_employee_id=actor_employee_id, - task=task, + task_id=task.id, changed_fields=changed, ), ) @@ -333,22 +330,20 @@ def update_task( changed["status"] = {"from": before.get("status"), "to": task.status} background.add_task( notify_openclaw, - session, NotifyContext( event="status.changed", actor_employee_id=actor_employee_id, - task=task, + task_id=task.id, changed_fields=changed, ), ) if not changed and data: background.add_task( notify_openclaw, - session, NotifyContext( event="task.updated", actor_employee_id=actor_employee_id, - task=task, + task_id=task.id, changed_fields=data, ), ) @@ -426,9 +421,11 @@ def create_task_comment( if task is not None: background.add_task( notify_openclaw, - session, NotifyContext( - event="comment.created", actor_employee_id=actor_employee_id, task=task, comment=c + event="comment.created", + actor_employee_id=actor_employee_id, + task_id=task.id, + comment_id=c.id, ), ) return TaskComment.model_validate(c) diff --git a/backend/app/integrations/notify.py b/backend/app/integrations/notify.py index 2eef5a7..2ee70c6 100644 --- a/backend/app/integrations/notify.py +++ b/backend/app/integrations/notify.py @@ -6,6 +6,7 @@ from typing import Iterable from sqlmodel import Session, select +from app.db.session import engine from app.integrations.openclaw import OpenClawClient from app.models.org import Employee from app.models.projects import ProjectMember @@ -14,6 +15,83 @@ from app.models.work import Task, TaskComment logger = logging.getLogger("app.notify") +@dataclass(frozen=True) +class NotifyContext: + """Notification context. + + IMPORTANT: this is passed into FastAPI BackgroundTasks. + Do not store live SQLAlchemy/SQLModel objects here; only ids/primitive data. + """ + + event: str # task.created | task.updated | task.assigned | comment.created | status.changed + actor_employee_id: int + task_id: int + comment_id: int | None = None + changed_fields: dict | None = None + + +def _employees_with_session_keys(session: Session, employee_ids: Iterable[int]) -> list[Employee]: + ids = sorted({i for i in employee_ids if i is not None}) + if not ids: + return [] + + emps = session.exec(select(Employee).where(Employee.id.in_(ids))).all() + out: list[Employee] = [] + for e in emps: + if not getattr(e, "notify_enabled", True): + continue + if getattr(e, "openclaw_session_key", None): + out.append(e) + return out + + +def _project_pm_employee_ids(session: Session, project_id: int) -> set[int]: + pms = session.exec(select(ProjectMember).where(ProjectMember.project_id == project_id)).all() + pm_ids: set[int] = set() + for m in pms: + role = (m.role or "").lower() + if role in {"pm", "product", "product_manager", "manager"}: + pm_ids.add(m.employee_id) + return pm_ids + + +def resolve_recipients( + session: Session, ctx: NotifyContext, task: Task, comment: TaskComment | None +) -> set[int]: + recipients: set[int] = set() + + if ctx.event == "task.created": + if task.assignee_employee_id: + recipients.add(task.assignee_employee_id) + recipients |= _project_pm_employee_ids(session, task.project_id) + + elif ctx.event == "task.assigned": + if task.assignee_employee_id: + recipients.add(task.assignee_employee_id) + recipients |= _project_pm_employee_ids(session, task.project_id) + + elif ctx.event == "comment.created": + if task.assignee_employee_id: + recipients.add(task.assignee_employee_id) + if task.reviewer_employee_id: + recipients.add(task.reviewer_employee_id) + recipients |= _project_pm_employee_ids(session, task.project_id) + if comment and comment.author_employee_id: + recipients.discard(comment.author_employee_id) + + elif ctx.event == "status.changed": + new_status = (getattr(task, "status", None) or "").lower() + if new_status in {"review", "ready_for_review"} and task.reviewer_employee_id: + recipients.add(task.reviewer_employee_id) + recipients |= _project_pm_employee_ids(session, task.project_id) + + elif ctx.event == "task.updated": + recipients |= _project_pm_employee_ids(session, task.project_id) + + recipients.discard(ctx.actor_employee_id) + return recipients + + def ensure_employee_provisioned(session: Session, employee_id: int) -> None: """Best-effort provisioning of a reviewer/manager so notifications can be delivered.""" @@ -32,7 +110,6 @@ def ensure_employee_provisioned(session: Session, employee_id: int) -> None: ) return - # Deterministic minimal prompt; agents already have detailed guidance in their own provisioning prompt. prompt = ( f"You are {emp.name} (employee_id={emp.id}).\n" "You are a reviewer/manager in Mission Control.\n" @@ -45,7 +122,6 @@ def ensure_employee_provisioned(session: Session, employee_id: int) -> None: {"task": prompt, "label": f"employee:{emp.id}:{emp.name}"}, timeout_s=20.0, ) - # Extract childSessionKey details = (res.get("result") or {}).get("details") or {} sk = details.get("childSessionKey") or details.get("sessionKey") if sk: @@ -61,196 +137,125 @@ def ensure_employee_provisioned(session: Session, employee_id: int) -> None: logger.exception("ensure_employee_provisioned: failed", extra={"employee_id": employee_id}) -@dataclass(frozen=True) -class NotifyContext: - event: str # task.created | task.updated | task.assigned | comment.created | status.changed - actor_employee_id: int - task: Task - comment: TaskComment | None = None - changed_fields: dict | None = None +def build_message( + *, + ctx: NotifyContext, + task: Task, + comment: TaskComment | None, + recipient: Employee, + base_url: str, +) -> str: + base = f"Task #{task.id}: {task.title}" if task.id is not None else f"Task: {task.title}" - -def _employees_with_session_keys(session: Session, employee_ids: Iterable[int]) -> list[Employee]: - ids = sorted({i for i in employee_ids if i is not None}) - if not ids: - return [] - - emps = session.exec(select(Employee).where(Employee.id.in_(ids))).all() - out: list[Employee] = [] - for e in emps: - if not getattr(e, "notify_enabled", True): - continue - if getattr(e, "openclaw_session_key", None): - out.append(e) - return out - - -def _project_pm_employee_ids(session: Session, project_id: int) -> set[int]: - # Generic, data-driven: PMs are determined by project_members.role. - pms = session.exec(select(ProjectMember).where(ProjectMember.project_id == project_id)).all() - pm_ids: set[int] = set() - for m in pms: - role = (m.role or "").lower() - if role in {"pm", "product", "product_manager", "manager"}: - pm_ids.add(m.employee_id) - return pm_ids - - -def resolve_recipients(session: Session, ctx: NotifyContext) -> set[int]: - t = ctx.task - recipients: set[int] = set() - - if ctx.event == "task.created": - # notify assignee + PMs - if t.assignee_employee_id: - recipients.add(t.assignee_employee_id) - recipients |= _project_pm_employee_ids(session, t.project_id) - - elif ctx.event == "task.assigned": - if t.assignee_employee_id: - recipients.add(t.assignee_employee_id) - recipients |= _project_pm_employee_ids(session, t.project_id) - - elif ctx.event == "comment.created": - # notify assignee + reviewer + PMs, excluding author - if t.assignee_employee_id: - recipients.add(t.assignee_employee_id) - if t.reviewer_employee_id: - recipients.add(t.reviewer_employee_id) - recipients |= _project_pm_employee_ids(session, t.project_id) - if ctx.comment and ctx.comment.author_employee_id: - recipients.discard(ctx.comment.author_employee_id) - - elif ctx.event == "status.changed": - new_status = (getattr(t, "status", None) or "").lower() - if new_status in {"review", "ready_for_review"} and t.reviewer_employee_id: - recipients.add(t.reviewer_employee_id) - recipients |= _project_pm_employee_ids(session, t.project_id) - - elif ctx.event == "task.updated": - # conservative: PMs only - recipients |= _project_pm_employee_ids(session, t.project_id) - - recipients.discard(ctx.actor_employee_id) - return recipients - - -def build_message(ctx: NotifyContext, recipient: Employee) -> str: - t = ctx.task - base = f"Task #{t.id}: {t.title}" if t.id is not None else f"Task: {t.title}" - - # Agent-specific dispatch instructions. These notifications should result in the agent - # taking concrete actions in Mission Control, not just acknowledging. if ctx.event in {"task.created", "task.assigned"} and recipient.employee_type == "agent": - desc = (t.description or "").strip() + desc = (task.description or "").strip() if len(desc) > 500: desc = desc[:497] + "..." desc_block = f"\n\nDescription:\n{desc}" if desc else "" - # Keep this deterministic: agents already have base URL + header guidance in their prompt. - base_url = __import__( - "app.core.urls", fromlist=["public_api_base_url"] - ).public_api_base_url() - return ( f"{base}\n\n" f"Set BASE={base_url}\n\n" "You are the assignee. Start NOW (use the exec tool to run these curl commands):\n" - f"1) curl -sS -X PATCH $BASE/tasks/{t.id} -H 'X-Actor-Employee-Id: {recipient.id}' " - "-H 'Content-Type: application/json' -d '{\"status\":\"in_progress\"}'\n" - f"2) curl -sS -X POST $BASE/task-comments -H 'X-Actor-Employee-Id: {recipient.id}' " - f'-H \'Content-Type: application/json\' -d \'{{"task_id":{t.id},"body":"Plan: ... Next: ..."}}\'\n' + f"1) curl -sS -X PATCH $BASE/tasks/{task.id} -H 'X-Actor-Employee-Id: {recipient.id}' -H 'Content-Type: application/json' -d '{{\"status\":\"in_progress\"}}'\n" + f"2) curl -sS -X POST $BASE/task-comments -H 'X-Actor-Employee-Id: {recipient.id}' -H 'Content-Type: application/json' -d '{{\"task_id\":{task.id},\"body\":\"Plan: ... Next: ...\"}}'\n" "3) Do the work\n" - f"4) Post progress updates via POST $BASE/task-comments (same headers)\n" - f"5) When complete: curl -sS -X PATCH $BASE/tasks/{t.id} -H 'X-Actor-Employee-Id: {recipient.id}' " - "-H 'Content-Type: application/json' -d '{\"status\":\"done\"}' and post a final summary comment" + "4) Post progress updates via POST $BASE/task-comments (same headers)\n" + f"5) When complete: set status=review (assignee cannot set done) and wait for manager approval\n" f"{desc_block}" ) - if ctx.event == "task.assigned": - return ( - f"Assigned: {base}.\n" - "Work ONE task only; update Mission Control with a comment when you make progress." - ) - if ctx.event == "comment.created": snippet = "" - if ctx.comment and ctx.comment.body: - snippet = ctx.comment.body.strip().replace("\n", " ") + if comment and comment.body: + snippet = comment.body.strip().replace("\n", " ") if len(snippet) > 180: snippet = snippet[:177] + "..." snippet = f"\nComment: {snippet}" - return ( - f"New comment on {base}.{snippet}\nWork ONE task only; reply/update in Mission Control." - ) + return f"New comment on {base}.{snippet}\nPlease review and respond in Mission Control." if ctx.event == "status.changed": - return ( - f"Status changed on {base} → {t.status}.\n" - "Work ONE task only; update Mission Control with next step." - ) + return f"Status changed on {base} → {task.status}.\nPlease review and respond in Mission Control." if ctx.event == "task.created": - return ( - f"New task created: {base}.\n" - "Work ONE task only; add acceptance criteria / next step in Mission Control." - ) + return f"New task created: {base}.\nPlease review and respond in Mission Control." - return f"Update on {base}.\nWork ONE task only; update Mission Control." + if ctx.event == "task.assigned": + return f"Assigned: {base}.\nPlease review and respond in Mission Control." + + return f"Update on {base}.\nPlease review and respond in Mission Control." -def notify_openclaw(session: Session, ctx: NotifyContext) -> None: +def notify_openclaw(ctx: NotifyContext) -> None: + """Send OpenClaw notifications. + + Runs in BackgroundTasks; opens its own DB session for safety. + """ + client = OpenClawClient.from_env() logger.info( "notify_openclaw: start", - extra={ - "event": ctx.event, - "task_id": getattr(ctx.task, "id", None), - "actor": ctx.actor_employee_id, - }, + extra={"event": ctx.event, "task_id": ctx.task_id, "actor": ctx.actor_employee_id}, ) if client is None: logger.warning("notify_openclaw: skipped (missing OpenClaw env)") return - recipient_ids = resolve_recipients(session, ctx) - # If the event requires notifying the reviewer, ensure they are provisioned so the notification is deliverable. - if ctx.event == "status.changed": - new_status = (getattr(ctx.task, "status", None) or "").lower() - if new_status in {"review", "ready_for_review"} and getattr( - ctx.task, "reviewer_employee_id", None - ): - ensure_employee_provisioned(session, int(ctx.task.reviewer_employee_id)) + with Session(engine) as session: + task = session.get(Task, ctx.task_id) + if task is None: + logger.warning("notify_openclaw: task not found", extra={"task_id": ctx.task_id}) + return - logger.info( - "notify_openclaw: recipients resolved", extra={"recipient_ids": sorted(recipient_ids)} - ) - recipients = _employees_with_session_keys(session, recipient_ids) - if not recipients: - logger.info("notify_openclaw: no recipients with session keys") - return + comment = session.get(TaskComment, ctx.comment_id) if ctx.comment_id else None - for e in recipients: + if ctx.event == "status.changed": + new_status = (getattr(task, "status", None) or "").lower() + if new_status in {"review", "ready_for_review"} and task.reviewer_employee_id: + ensure_employee_provisioned(session, int(task.reviewer_employee_id)) + + recipient_ids = resolve_recipients(session, ctx, task, comment) logger.info( - "notify_openclaw: sending", - extra={ - "to_employee_id": getattr(e, "id", None), - "session_key": getattr(e, "openclaw_session_key", None), - "event": ctx.event, - }, + "notify_openclaw: recipients resolved", extra={"recipient_ids": sorted(recipient_ids)} ) - sk = getattr(e, "openclaw_session_key", None) - if not sk: - continue + recipients = _employees_with_session_keys(session, recipient_ids) + if not recipients: + logger.info("notify_openclaw: no recipients with session keys") + return - message = build_message(ctx, recipient=e) - try: - client.tools_invoke( - "sessions_send", - {"sessionKey": sk, "message": message}, - timeout_s=15.0, + # base URL used in agent messages + base_url = __import__( + "app.core.urls", fromlist=["public_api_base_url"] + ).public_api_base_url() + + for e in recipients: + sk = getattr(e, "openclaw_session_key", None) + if not sk: + continue + + message = build_message( + ctx=ctx, + task=task, + comment=comment, + recipient=e, + base_url=base_url, ) - except Exception: - logger.exception("notify_openclaw: sessions_send failed") - # best-effort; never break Mission Control writes - continue + + try: + client.tools_invoke( + "sessions_send", + {"sessionKey": sk, "message": message}, + timeout_s=15.0, + ) + except Exception: + # keep the log, but avoid giant stack spam unless debugging + logger.warning( + "notify_openclaw: sessions_send failed", + extra={ + "event": ctx.event, + "task_id": ctx.task_id, + "to_employee_id": getattr(e, "id", None), + "session_key": sk, + }, + ) + continue