fix(notify): isolate background notifications in new DB session + reduce timeout spam

This commit is contained in:
Abhimanyu Saharan
2026-02-02 21:53:47 +05:30
parent 4b31b4ada8
commit c0b0f26984
2 changed files with 170 additions and 168 deletions

View File

@@ -99,8 +99,7 @@ def create_task(
session.refresh(task) session.refresh(task)
background.add_task( background.add_task(
notify_openclaw, notify_openclaw,
session, NotifyContext(event="task.created", actor_employee_id=actor_employee_id, task_id=task.id),
NotifyContext(event="task.created", actor_employee_id=actor_employee_id, task=task),
) )
# Explicitly return a serializable payload (guards against empty {} responses) # Explicitly return a serializable payload (guards against empty {} responses)
return Task.model_validate(task) return Task.model_validate(task)
@@ -143,7 +142,7 @@ def dispatch_task(
background.add_task( background.add_task(
notify_openclaw, notify_openclaw,
session, session,
NotifyContext(event="task.assigned", actor_employee_id=actor_employee_id, task=task), NotifyContext(event="task.assigned", actor_employee_id=actor_employee_id, task_id=task.id),
) )
return {"ok": True} return {"ok": True}
@@ -228,11 +227,10 @@ def review_task(
if before_status != task.status: if before_status != task.status:
background.add_task( background.add_task(
notify_openclaw, notify_openclaw,
session,
NotifyContext( NotifyContext(
event="status.changed", event="status.changed",
actor_employee_id=actor_employee_id, actor_employee_id=actor_employee_id,
task=task, task_id=task.id,
changed_fields={"status": {"from": before_status, "to": task.status}}, changed_fields={"status": {"from": before_status, "to": task.status}},
), ),
) )
@@ -321,11 +319,10 @@ def update_task(
} }
background.add_task( background.add_task(
notify_openclaw, notify_openclaw,
session,
NotifyContext( NotifyContext(
event="task.assigned", event="task.assigned",
actor_employee_id=actor_employee_id, actor_employee_id=actor_employee_id,
task=task, task_id=task.id,
changed_fields=changed, changed_fields=changed,
), ),
) )
@@ -333,22 +330,20 @@ def update_task(
changed["status"] = {"from": before.get("status"), "to": task.status} changed["status"] = {"from": before.get("status"), "to": task.status}
background.add_task( background.add_task(
notify_openclaw, notify_openclaw,
session,
NotifyContext( NotifyContext(
event="status.changed", event="status.changed",
actor_employee_id=actor_employee_id, actor_employee_id=actor_employee_id,
task=task, task_id=task.id,
changed_fields=changed, changed_fields=changed,
), ),
) )
if not changed and data: if not changed and data:
background.add_task( background.add_task(
notify_openclaw, notify_openclaw,
session,
NotifyContext( NotifyContext(
event="task.updated", event="task.updated",
actor_employee_id=actor_employee_id, actor_employee_id=actor_employee_id,
task=task, task_id=task.id,
changed_fields=data, changed_fields=data,
), ),
) )
@@ -426,9 +421,11 @@ def create_task_comment(
if task is not None: if task is not None:
background.add_task( background.add_task(
notify_openclaw, notify_openclaw,
session,
NotifyContext( NotifyContext(
event="comment.created", actor_employee_id=actor_employee_id, task=task, comment=c event="comment.created",
actor_employee_id=actor_employee_id,
task_id=task.id,
comment_id=c.id,
), ),
) )
return TaskComment.model_validate(c) return TaskComment.model_validate(c)

View File

@@ -6,6 +6,7 @@ from typing import Iterable
from sqlmodel import Session, select from sqlmodel import Session, select
from app.db.session import engine
from app.integrations.openclaw import OpenClawClient from app.integrations.openclaw import OpenClawClient
from app.models.org import Employee from app.models.org import Employee
from app.models.projects import ProjectMember from app.models.projects import ProjectMember
@@ -14,6 +15,83 @@ from app.models.work import Task, TaskComment
logger = logging.getLogger("app.notify") logger = logging.getLogger("app.notify")
@dataclass(frozen=True)
class NotifyContext:
"""Notification context.
IMPORTANT: this is passed into FastAPI BackgroundTasks.
Do not store live SQLAlchemy/SQLModel objects here; only ids/primitive data.
"""
event: str # task.created | task.updated | task.assigned | comment.created | status.changed
actor_employee_id: int
task_id: int
comment_id: int | None = None
changed_fields: dict | None = None
def _employees_with_session_keys(session: Session, employee_ids: Iterable[int]) -> list[Employee]:
ids = sorted({i for i in employee_ids if i is not None})
if not ids:
return []
emps = session.exec(select(Employee).where(Employee.id.in_(ids))).all()
out: list[Employee] = []
for e in emps:
if not getattr(e, "notify_enabled", True):
continue
if getattr(e, "openclaw_session_key", None):
out.append(e)
return out
def _project_pm_employee_ids(session: Session, project_id: int) -> set[int]:
pms = session.exec(select(ProjectMember).where(ProjectMember.project_id == project_id)).all()
pm_ids: set[int] = set()
for m in pms:
role = (m.role or "").lower()
if role in {"pm", "product", "product_manager", "manager"}:
pm_ids.add(m.employee_id)
return pm_ids
def resolve_recipients(
session: Session, ctx: NotifyContext, task: Task, comment: TaskComment | None
) -> set[int]:
recipients: set[int] = set()
if ctx.event == "task.created":
if task.assignee_employee_id:
recipients.add(task.assignee_employee_id)
recipients |= _project_pm_employee_ids(session, task.project_id)
elif ctx.event == "task.assigned":
if task.assignee_employee_id:
recipients.add(task.assignee_employee_id)
recipients |= _project_pm_employee_ids(session, task.project_id)
elif ctx.event == "comment.created":
if task.assignee_employee_id:
recipients.add(task.assignee_employee_id)
if task.reviewer_employee_id:
recipients.add(task.reviewer_employee_id)
recipients |= _project_pm_employee_ids(session, task.project_id)
if comment and comment.author_employee_id:
recipients.discard(comment.author_employee_id)
elif ctx.event == "status.changed":
new_status = (getattr(task, "status", None) or "").lower()
if new_status in {"review", "ready_for_review"} and task.reviewer_employee_id:
recipients.add(task.reviewer_employee_id)
recipients |= _project_pm_employee_ids(session, task.project_id)
elif ctx.event == "task.updated":
recipients |= _project_pm_employee_ids(session, task.project_id)
recipients.discard(ctx.actor_employee_id)
return recipients
def ensure_employee_provisioned(session: Session, employee_id: int) -> None: def ensure_employee_provisioned(session: Session, employee_id: int) -> None:
"""Best-effort provisioning of a reviewer/manager so notifications can be delivered.""" """Best-effort provisioning of a reviewer/manager so notifications can be delivered."""
@@ -32,7 +110,6 @@ def ensure_employee_provisioned(session: Session, employee_id: int) -> None:
) )
return return
# Deterministic minimal prompt; agents already have detailed guidance in their own provisioning prompt.
prompt = ( prompt = (
f"You are {emp.name} (employee_id={emp.id}).\n" f"You are {emp.name} (employee_id={emp.id}).\n"
"You are a reviewer/manager in Mission Control.\n" "You are a reviewer/manager in Mission Control.\n"
@@ -45,7 +122,6 @@ def ensure_employee_provisioned(session: Session, employee_id: int) -> None:
{"task": prompt, "label": f"employee:{emp.id}:{emp.name}"}, {"task": prompt, "label": f"employee:{emp.id}:{emp.name}"},
timeout_s=20.0, timeout_s=20.0,
) )
# Extract childSessionKey
details = (res.get("result") or {}).get("details") or {} details = (res.get("result") or {}).get("details") or {}
sk = details.get("childSessionKey") or details.get("sessionKey") sk = details.get("childSessionKey") or details.get("sessionKey")
if sk: if sk:
@@ -61,196 +137,125 @@ def ensure_employee_provisioned(session: Session, employee_id: int) -> None:
logger.exception("ensure_employee_provisioned: failed", extra={"employee_id": employee_id}) logger.exception("ensure_employee_provisioned: failed", extra={"employee_id": employee_id})
@dataclass(frozen=True) def build_message(
class NotifyContext: *,
event: str # task.created | task.updated | task.assigned | comment.created | status.changed ctx: NotifyContext,
actor_employee_id: int task: Task,
task: Task comment: TaskComment | None,
comment: TaskComment | None = None recipient: Employee,
changed_fields: dict | None = None base_url: str,
) -> str:
base = f"Task #{task.id}: {task.title}" if task.id is not None else f"Task: {task.title}"
def _employees_with_session_keys(session: Session, employee_ids: Iterable[int]) -> list[Employee]:
ids = sorted({i for i in employee_ids if i is not None})
if not ids:
return []
emps = session.exec(select(Employee).where(Employee.id.in_(ids))).all()
out: list[Employee] = []
for e in emps:
if not getattr(e, "notify_enabled", True):
continue
if getattr(e, "openclaw_session_key", None):
out.append(e)
return out
def _project_pm_employee_ids(session: Session, project_id: int) -> set[int]:
# Generic, data-driven: PMs are determined by project_members.role.
pms = session.exec(select(ProjectMember).where(ProjectMember.project_id == project_id)).all()
pm_ids: set[int] = set()
for m in pms:
role = (m.role or "").lower()
if role in {"pm", "product", "product_manager", "manager"}:
pm_ids.add(m.employee_id)
return pm_ids
def resolve_recipients(session: Session, ctx: NotifyContext) -> set[int]:
t = ctx.task
recipients: set[int] = set()
if ctx.event == "task.created":
# notify assignee + PMs
if t.assignee_employee_id:
recipients.add(t.assignee_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
elif ctx.event == "task.assigned":
if t.assignee_employee_id:
recipients.add(t.assignee_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
elif ctx.event == "comment.created":
# notify assignee + reviewer + PMs, excluding author
if t.assignee_employee_id:
recipients.add(t.assignee_employee_id)
if t.reviewer_employee_id:
recipients.add(t.reviewer_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
if ctx.comment and ctx.comment.author_employee_id:
recipients.discard(ctx.comment.author_employee_id)
elif ctx.event == "status.changed":
new_status = (getattr(t, "status", None) or "").lower()
if new_status in {"review", "ready_for_review"} and t.reviewer_employee_id:
recipients.add(t.reviewer_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
elif ctx.event == "task.updated":
# conservative: PMs only
recipients |= _project_pm_employee_ids(session, t.project_id)
recipients.discard(ctx.actor_employee_id)
return recipients
def build_message(ctx: NotifyContext, recipient: Employee) -> str:
t = ctx.task
base = f"Task #{t.id}: {t.title}" if t.id is not None else f"Task: {t.title}"
# Agent-specific dispatch instructions. These notifications should result in the agent
# taking concrete actions in Mission Control, not just acknowledging.
if ctx.event in {"task.created", "task.assigned"} and recipient.employee_type == "agent": if ctx.event in {"task.created", "task.assigned"} and recipient.employee_type == "agent":
desc = (t.description or "").strip() desc = (task.description or "").strip()
if len(desc) > 500: if len(desc) > 500:
desc = desc[:497] + "..." desc = desc[:497] + "..."
desc_block = f"\n\nDescription:\n{desc}" if desc else "" desc_block = f"\n\nDescription:\n{desc}" if desc else ""
# Keep this deterministic: agents already have base URL + header guidance in their prompt.
base_url = __import__(
"app.core.urls", fromlist=["public_api_base_url"]
).public_api_base_url()
return ( return (
f"{base}\n\n" f"{base}\n\n"
f"Set BASE={base_url}\n\n" f"Set BASE={base_url}\n\n"
"You are the assignee. Start NOW (use the exec tool to run these curl commands):\n" "You are the assignee. Start NOW (use the exec tool to run these curl commands):\n"
f"1) curl -sS -X PATCH $BASE/tasks/{t.id} -H 'X-Actor-Employee-Id: {recipient.id}' " f"1) curl -sS -X PATCH $BASE/tasks/{task.id} -H 'X-Actor-Employee-Id: {recipient.id}' -H 'Content-Type: application/json' -d '{{\"status\":\"in_progress\"}}'\n"
"-H 'Content-Type: application/json' -d '{\"status\":\"in_progress\"}'\n" f"2) curl -sS -X POST $BASE/task-comments -H 'X-Actor-Employee-Id: {recipient.id}' -H 'Content-Type: application/json' -d '{{\"task_id\":{task.id},\"body\":\"Plan: ... Next: ...\"}}'\n"
f"2) curl -sS -X POST $BASE/task-comments -H 'X-Actor-Employee-Id: {recipient.id}' "
f'-H \'Content-Type: application/json\' -d \'{{"task_id":{t.id},"body":"Plan: ... Next: ..."}}\'\n'
"3) Do the work\n" "3) Do the work\n"
f"4) Post progress updates via POST $BASE/task-comments (same headers)\n" "4) Post progress updates via POST $BASE/task-comments (same headers)\n"
f"5) When complete: curl -sS -X PATCH $BASE/tasks/{t.id} -H 'X-Actor-Employee-Id: {recipient.id}' " f"5) When complete: set status=review (assignee cannot set done) and wait for manager approval\n"
"-H 'Content-Type: application/json' -d '{\"status\":\"done\"}' and post a final summary comment"
f"{desc_block}" f"{desc_block}"
) )
if ctx.event == "task.assigned":
return (
f"Assigned: {base}.\n"
"Work ONE task only; update Mission Control with a comment when you make progress."
)
if ctx.event == "comment.created": if ctx.event == "comment.created":
snippet = "" snippet = ""
if ctx.comment and ctx.comment.body: if comment and comment.body:
snippet = ctx.comment.body.strip().replace("\n", " ") snippet = comment.body.strip().replace("\n", " ")
if len(snippet) > 180: if len(snippet) > 180:
snippet = snippet[:177] + "..." snippet = snippet[:177] + "..."
snippet = f"\nComment: {snippet}" snippet = f"\nComment: {snippet}"
return ( return f"New comment on {base}.{snippet}\nPlease review and respond in Mission Control."
f"New comment on {base}.{snippet}\nWork ONE task only; reply/update in Mission Control."
)
if ctx.event == "status.changed": if ctx.event == "status.changed":
return ( return f"Status changed on {base}{task.status}.\nPlease review and respond in Mission Control."
f"Status changed on {base}{t.status}.\n"
"Work ONE task only; update Mission Control with next step."
)
if ctx.event == "task.created": if ctx.event == "task.created":
return ( return f"New task created: {base}.\nPlease review and respond in Mission Control."
f"New task created: {base}.\n"
"Work ONE task only; add acceptance criteria / next step in Mission Control."
)
return f"Update on {base}.\nWork ONE task only; update Mission Control." if ctx.event == "task.assigned":
return f"Assigned: {base}.\nPlease review and respond in Mission Control."
return f"Update on {base}.\nPlease review and respond in Mission Control."
def notify_openclaw(session: Session, ctx: NotifyContext) -> None: def notify_openclaw(ctx: NotifyContext) -> None:
"""Send OpenClaw notifications.
Runs in BackgroundTasks; opens its own DB session for safety.
"""
client = OpenClawClient.from_env() client = OpenClawClient.from_env()
logger.info( logger.info(
"notify_openclaw: start", "notify_openclaw: start",
extra={ extra={"event": ctx.event, "task_id": ctx.task_id, "actor": ctx.actor_employee_id},
"event": ctx.event,
"task_id": getattr(ctx.task, "id", None),
"actor": ctx.actor_employee_id,
},
) )
if client is None: if client is None:
logger.warning("notify_openclaw: skipped (missing OpenClaw env)") logger.warning("notify_openclaw: skipped (missing OpenClaw env)")
return return
recipient_ids = resolve_recipients(session, ctx) with Session(engine) as session:
# If the event requires notifying the reviewer, ensure they are provisioned so the notification is deliverable. task = session.get(Task, ctx.task_id)
if ctx.event == "status.changed": if task is None:
new_status = (getattr(ctx.task, "status", None) or "").lower() logger.warning("notify_openclaw: task not found", extra={"task_id": ctx.task_id})
if new_status in {"review", "ready_for_review"} and getattr( return
ctx.task, "reviewer_employee_id", None
):
ensure_employee_provisioned(session, int(ctx.task.reviewer_employee_id))
logger.info( comment = session.get(TaskComment, ctx.comment_id) if ctx.comment_id else None
"notify_openclaw: recipients resolved", extra={"recipient_ids": sorted(recipient_ids)}
)
recipients = _employees_with_session_keys(session, recipient_ids)
if not recipients:
logger.info("notify_openclaw: no recipients with session keys")
return
for e in recipients: if ctx.event == "status.changed":
new_status = (getattr(task, "status", None) or "").lower()
if new_status in {"review", "ready_for_review"} and task.reviewer_employee_id:
ensure_employee_provisioned(session, int(task.reviewer_employee_id))
recipient_ids = resolve_recipients(session, ctx, task, comment)
logger.info( logger.info(
"notify_openclaw: sending", "notify_openclaw: recipients resolved", extra={"recipient_ids": sorted(recipient_ids)}
extra={
"to_employee_id": getattr(e, "id", None),
"session_key": getattr(e, "openclaw_session_key", None),
"event": ctx.event,
},
) )
sk = getattr(e, "openclaw_session_key", None) recipients = _employees_with_session_keys(session, recipient_ids)
if not sk: if not recipients:
continue logger.info("notify_openclaw: no recipients with session keys")
return
message = build_message(ctx, recipient=e) # base URL used in agent messages
try: base_url = __import__(
client.tools_invoke( "app.core.urls", fromlist=["public_api_base_url"]
"sessions_send", ).public_api_base_url()
{"sessionKey": sk, "message": message},
timeout_s=15.0, for e in recipients:
sk = getattr(e, "openclaw_session_key", None)
if not sk:
continue
message = build_message(
ctx=ctx,
task=task,
comment=comment,
recipient=e,
base_url=base_url,
) )
except Exception:
logger.exception("notify_openclaw: sessions_send failed") try:
# best-effort; never break Mission Control writes client.tools_invoke(
continue "sessions_send",
{"sessionKey": sk, "message": message},
timeout_s=15.0,
)
except Exception:
# keep the log, but avoid giant stack spam unless debugging
logger.warning(
"notify_openclaw: sessions_send failed",
extra={
"event": ctx.event,
"task_id": ctx.task_id,
"to_employee_id": getattr(e, "id", None),
"session_key": sk,
},
)
continue