feat(api): add agent message persistence and simplified event stream#202
feat(api): add agent message persistence and simplified event stream#202
Conversation
… single-agent chat For tinyoffice, instead of listening to the full chain_step_start → chain_step_done → response_ready lifecycle, clients can now subscribe to a single `agent_message` SSE event that fires once per agent response with full content. - Add `agent_messages` SQLite table to persist all agent chat history (user + assistant) - Emit `agent_message` event after each agent response (includes isTeamMessage flag) - Add REST API: GET /api/agent-messages, GET /api/agents/:id/messages - Update SSE-EVENTS.md with new event and API docs https://claude.ai/code/session_01GJsNQwdZvdVPrAkWswf5BJ
Greptile SummaryThis PR adds an Key issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant Server as API Server (Hono)
participant Main as Queue Processor
participant DB as SQLite (agent_messages)
participant SSE as SSE Stream
Client->>Server: POST /api/messages (enqueue)
Server->>Main: queueEvents "message:enqueued"
Main->>Main: processMessage()
Main->>Main: invokeAgent()
Main->>DB: insertAgentMessage(role=user)
Main->>DB: insertAgentMessage(role=assistant)
Main->>SSE: emitEvent("agent_message", {content, agentId, ...})
SSE-->>Client: SSE: agent_message event
Client->>Server: GET /api/agents/:id/messages?since_id=42
Server->>DB: getAgentMessages(agentId, limit, sinceId)
DB-->>Server: rows[]
Server-->>Client: JSON response
Client->>Server: GET /api/agents/:id/messages?channel=whatsapp&since_id=42
Server->>DB: getAgentMessagesByChannel(agentId, channel, limit)
Note over Server,DB: ⚠️ since_id is silently dropped here
DB-->>Server: rows[] (from row 0, not since_id)
Server-->>Client: JSON response (pagination broken)
Last reviewed commit: 652d606 |
| if (channel) { | ||
| return c.json(getAgentMessagesByChannel(agentId, channel, limit)); | ||
| } |
There was a problem hiding this comment.
since_id silently dropped when channel filter is used
When a channel query param is provided, sinceId is parsed from the request but never passed to getAgentMessagesByChannel, which has no sinceId parameter. This means pagination cursors are silently discarded and clients will always receive messages starting from the beginning of the history, breaking the documented ?limit=100&since_id=0&channel=whatsapp interface.
getAgentMessagesByChannel in packages/core/src/queues.ts needs a sinceId parameter, and the query should include AND id > ? to honour the cursor:
| if (channel) { | |
| return c.json(getAgentMessagesByChannel(agentId, channel, limit)); | |
| } | |
| if (channel) { | |
| return c.json(getAgentMessagesByChannel(agentId, channel, limit, sinceId)); | |
| } |
And in queues.ts:
export function getAgentMessagesByChannel(agentId: string, channel: string, limit = 100, sinceId = 0): any[] {
return getDb().prepare(
`SELECT * FROM agent_messages WHERE agent_id=? AND channel=? AND id>? ORDER BY created_at DESC LIMIT ?`
).all(agentId, channel, sinceId, limit);
}| // ── Agent messages (per-agent chat history) ───────────────────────────────── | ||
|
|
||
| export function insertAgentMessage(data: { | ||
| agentId: string; role: 'user' | 'assistant'; | ||
| channel: string; sender: string; messageId: string; content: string; | ||
| }): number { | ||
| return getDb().prepare( | ||
| `INSERT INTO agent_messages (agent_id,role,channel,sender,message_id,content,created_at) VALUES (?,?,?,?,?,?,?)` | ||
| ).run(data.agentId, data.role, data.channel, data.sender, data.messageId, data.content, Date.now()).lastInsertRowid as number; | ||
| } | ||
|
|
||
| export function getAgentMessages(agentId: string, limit = 100, sinceId = 0): any[] { | ||
| return getDb().prepare( | ||
| `SELECT * FROM agent_messages WHERE agent_id=? AND id>? ORDER BY created_at DESC LIMIT ?` | ||
| ).all(agentId, sinceId, limit); | ||
| } | ||
|
|
||
| export function getAgentMessagesByChannel(agentId: string, channel: string, limit = 100): any[] { | ||
| return getDb().prepare( | ||
| `SELECT * FROM agent_messages WHERE agent_id=? AND channel=? ORDER BY created_at DESC LIMIT ?` | ||
| ).all(agentId, channel, limit); | ||
| } | ||
|
|
||
| export function getAllAgentMessages(limit = 100, sinceId = 0): any[] { | ||
| return getDb().prepare( | ||
| `SELECT * FROM agent_messages WHERE id>? ORDER BY created_at DESC LIMIT ?` | ||
| ).all(sinceId, limit); | ||
| } | ||
|
|
There was a problem hiding this comment.
agent_messages table has no pruning function
Every other table in this file has a corresponding pruning function (pruneCompletedMessages, pruneAckedResponses), and the maintenance loop in packages/main/src/index.ts calls both every 60 seconds. agent_messages has no equivalent, so the table will grow without bound in long-running deployments.
Consider adding a pruning function and wiring it into the maintenance interval:
export function pruneAgentMessages(olderThanMs = 7 * 86400000): number {
return getDb().prepare(`DELETE FROM agent_messages WHERE created_at<?`).run(Date.now() - olderThanMs).changes;
}| // ── Persist & emit simplified agent_message event ──────────────────── | ||
| if (!isInternal) { | ||
| insertAgentMessage({ agentId, role: 'user', channel, sender, messageId, content: rawMessage }); | ||
| } | ||
| insertAgentMessage({ agentId, role: 'assistant', channel, sender, messageId, content: response }); | ||
| emitEvent('agent_message', { | ||
| agentId, agentName: agent.name, role: 'assistant', | ||
| channel, sender, messageId, | ||
| content: response, | ||
| isTeamMessage: isInternal || isTeamRouted, | ||
| }); |
There was a problem hiding this comment.
agent_message event / persistence fires for error-fallback responses
When invokeAgent throws, the catch block (lines 105–110) sets response to a static error string ("Sorry, I encountered an error..."). The new persistence and SSE emit code runs unconditionally after chain_step_done, so:
- The error string gets written to
agent_messagesas an'assistant'message. - An
agent_messageSSE event is emitted with the same error string ascontent.
Clients subscribing to agent_message as the "simplified" interface will receive (and store, if they're building their own UI) fabricated assistant messages. Consider skipping insertion and emission when the response originated from the error handler, e.g. by introducing a flag:
let responseIsError = false;
try {
response = await invokeAgent(...);
} catch (error) {
...
response = "Sorry, I encountered an error processing your request.";
responseIsError = true;
}
if (!responseIsError) {
if (!isInternal) {
insertAgentMessage({ agentId, role: 'user', channel, sender, messageId, content: rawMessage });
}
insertAgentMessage({ agentId, role: 'assistant', channel, sender, messageId, content: response });
emitEvent('agent_message', { ... });
}…e/simplify-agent-events-QbDpg
Description
Add a new
agent_messagestable and REST API endpoints to persist and retrieve agent chat history. This enables simplified single-agent chat workflows by providing a dedicatedagent_messageSSE event that fires once per agent response, eliminating the need to listen to the fullchain_step_start → chain_step_done → response_readylifecycle.Changes
agent_messagestable with indexed queries for agent-specific and channel-filtered message retrievalinsertAgentMessage()and three query functions (getAgentMessages,getAgentMessagesByChannel,getAllAgentMessages)agent_messageSSE event after each agent response; persist both user inputs and assistant responses to theagent_messagestable/api/agent-messagesand/api/agents/:id/messagesendpoints withlimitandsince_idpaginationagent_messageevent schema and simplified event flow diagramsTesting
Checklist
https://claude.ai/code/session_01GJsNQwdZvdVPrAkWswf5BJ