Skip to content

Commit d0fea42

Browse files
committed
feat: refactor and fixes client streaming flow, user can send messages at the same time seamlessly
1 parent d9bb7fb commit d0fea42

6 files changed

Lines changed: 69 additions & 39 deletions

File tree

apps/backend-convex/convex/_generated/api.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
import type * as authInfo from "../authInfo.js";
1212
import type * as crons from "../crons.js";
13-
import type * as http_ai from "../http/ai.js";
13+
import type * as http_chat from "../http/chat.js";
1414
import type * as http from "../http.js";
1515
import type * as messages from "../messages.js";
1616
import type * as tasks from "../tasks.js";
@@ -33,7 +33,7 @@ import type {
3333
declare const fullApi: ApiFromModules<{
3434
authInfo: typeof authInfo;
3535
crons: typeof crons;
36-
"http/ai": typeof http_ai;
36+
"http/chat": typeof http_chat;
3737
http: typeof http;
3838
messages: typeof messages;
3939
tasks: typeof tasks;

apps/backend-convex/convex/http.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import type { HonoWithConvex } from 'convex-helpers/server/hono'
22
import type { ActionCtx } from './_generated/server'
33
import { HttpRouterWithHono } from 'convex-helpers/server/hono'
44
import { Hono } from 'hono'
5-
import { aiApp } from './http/ai'
5+
import { chatApp } from './http/chat'
66

77
const app: HonoWithConvex<ActionCtx> = new Hono()
8-
app.route('/api/ai', aiApp)
8+
app.route('/api/chat', chatApp)
99

1010
export default new HttpRouterWithHono(app)
Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,23 @@ const rateLimiter = new RateLimiter(components.rateLimiter, {
1919
aiChat: { kind: 'token bucket', rate: 10, period: MINUTE, capacity: 3 },
2020
})
2121

22-
export const aiApp: HonoWithConvex<ActionCtx> = new Hono()
23-
aiApp
22+
export const chatApp: HonoWithConvex<ActionCtx> = new Hono()
23+
chatApp
2424
.use(cors())
2525
.post(
26-
'/chat/stream',
26+
'/stream',
2727
zValidator('json', z.object({
2828
threadId: z.string(),
2929
provider: z.string(),
3030
model: z.string(),
3131
apiKey: z.optional(z.string()),
3232
content: z.optional(z.string()),
33+
// Optionally set the stream id when creating a new stream message for identification
34+
streamId: z.optional(z.string()),
3335
context: z.optional(z.object({
3436
from: z.optional(z.string()),
3537
})),
38+
// Will bypass `content` and `streamId`, resumes the target streamId.
3639
resumeStreamId: z.optional(z.string()),
3740
finishOnly: z.optional(z.boolean()),
3841
lockerKey: z.optional(z.string()),
@@ -52,6 +55,7 @@ aiApp
5255
finishOnly,
5356
lockerKey,
5457
} = c.req.valid('json')
58+
let { streamId } = c.req.valid('json')
5559

5660
// getUserIdentity on HTTP Action will throw if not authenticated 🤦‍♂️
5761
const userIdentity = await c.env.auth.getUserIdentity().catch(() => null)
@@ -65,21 +69,19 @@ aiApp
6569
const threadId = _threadId as Id<'threads'>
6670
const thread = await c.env.runQuery(api.threads.get, { threadId, lockerKey })
6771

68-
let streamId: string
6972
let streamingMessageId: Id<'messages'>
7073
let existingMessage: Doc<'messages'> | null = null
7174

72-
// Disable SSE resume for now until we have pub sub
75+
// Disable SSE resume, if you want SSE resume, implement a pub-sub.
7376
if (resumeStreamId)
7477
throw new ConvexError('SSE stream resume is disabled')
7578

79+
// On SSE resume
7680
if (resumeStreamId) {
7781
streamId = resumeStreamId
7882

7983
// Check if there's an existing streaming message to resume
80-
existingMessage = await c.env.runQuery(internal.messages.getStreamingMessage, {
81-
streamId,
82-
})
84+
existingMessage = await c.env.runQuery(internal.messages.getStreamingMessage, { streamId })
8385

8486
if (!existingMessage) {
8587
// If no streaming message found, just return success
@@ -96,12 +98,17 @@ aiApp
9698
c.text('OK')
9799
}
98100
}
101+
// On new stream
99102
else if (content) {
100103
if (thread.frozen)
101104
throw new ConvexError(`Can't send new messages to frozen thread`)
102105

103-
// Create new streaming session
104-
streamId = `${Date.now()}_${randomStr(10)}`
106+
// If user provides a streamId, check if its properly unused
107+
if (streamId) {
108+
if (await c.env.runQuery(internal.messages.getStreamingMessage, { streamId }))
109+
throw new ConvexError('streamId is already in use')
110+
}
111+
streamId = streamId ?? `stream-${Date.now()}_${randomStr(4)}`
105112

106113
// Add user message to thread
107114
await c.env.runMutation(internal.messages.internalAdd, {
@@ -114,7 +121,7 @@ aiApp
114121
lockerKey,
115122
})
116123

117-
// Add assistant message to thread (initial streaming)
124+
// Add assistant message to thread
118125
streamingMessageId = await c.env.runMutation(internal.messages.internalAdd, {
119126
threadId,
120127
role: 'assistant',

apps/frontend/app/components/chat/ChatInterface.vue

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<script setup lang="ts">
33
import type { Doc, Id } from 'backend-convex/convex/_generated/dataModel'
44
import type Lenis from 'lenis'
5-
import { keyBy, randomStr, sleep, uniquePromise } from '@namesmt/utils'
5+
import { keyBy, objectPick, randomStr, sleep, uniquePromise } from '@namesmt/utils'
66
import { api } from 'backend-convex/convex/_generated/api'
77
import { useConvexClient } from 'convex-vue'
88
import { countdown, debounce, getInstance, throttle } from 'kontroll'
@@ -43,7 +43,7 @@ const cachedThreadsMessages: {
4343
} = {}
4444
const messages = ref<Array<CustomMessage>>([])
4545
const messagesKeyed = computed(() => keyBy(messages.value, 'id'))
46-
const streamingMessages = ref(0)
46+
const streamingMessagesMap = reactive<Record<string, true>>({ })
4747
const isFetching = ref(false)
4848
const chatInput = ref('')
4949
@@ -56,14 +56,25 @@ const { ignoreUpdates: ignorePathUpdate } = watchIgnorable(
5656
5757
messages.value = cachedThreadsMessages[threadId as string] ?? []
5858
59-
doScrollBottom({ smooth: false })
59+
nextTick(() => { doScrollBottom({ smooth: false, maybe: true }) })
6060
6161
if (threadId) {
6262
isFetching.value = true
6363
await convex.query(api.messages.listByThread, { threadId: threadId as Doc<'threads'>['_id'], lockerKey: getLockerKey(threadId) })
64-
.then((existingMessages) => {
64+
.then((messagesFromConvex) => {
6565
if (threadIdRef.value === threadId) {
66-
messages.value = existingMessages.map(customMessageTransform)
66+
if (threadId === oldThreadId) {
67+
for (const index in messagesFromConvex) {
68+
const m = messagesFromConvex[index]!
69+
if (m.streamId && !streamingMessagesMap[m.streamId]) {
70+
messages.value.push(customMessageTransform(messagesFromConvex[+index - 1]!))
71+
messages.value.push(customMessageTransform(m))
72+
}
73+
}
74+
}
75+
else {
76+
messages.value = messagesFromConvex.map(customMessageTransform)
77+
}
6778
}
6879
})
6980
.catch((e) => {
@@ -93,11 +104,12 @@ const { ignoreUpdates: ignorePathUpdate } = watchIgnorable(
93104
&& message.streamId
94105
) {
95106
console.log('Attempting to resume stream for session:', message.streamId)
96-
uniquePromise(message.streamId, () => resumeStreamProcess(message.streamId!, message.id))
107+
nextTick(() => { uniquePromise(message.streamId!, () => resumeStreamProcess(message.streamId!, message.id)) })
97108
}
98109
}
99110
100-
nextTick(() => doScrollBottom({ tries: 6 }))
111+
if (threadId !== oldThreadId)
112+
nextTick(() => doScrollBottom({ tries: 6 }))
101113
}
102114
},
103115
{ immediate: true },
@@ -140,6 +152,8 @@ async function handleSubmit({ input }: HandleSubmitArgs) {
140152
.then(() => { sleep(500).then(() => handleSubmit({ input })) })
141153
}
142154
155+
const streamId = `stream-${Date.now()}_${randomStr(4)}`
156+
143157
// Optimistically add the messages
144158
messages.value.push({
145159
id: `user-${Date.now()}_${randomStr(4)}`,
@@ -153,7 +167,7 @@ async function handleSubmit({ input }: HandleSubmitArgs) {
153167
model: chatContext.activeAgent.value.model,
154168
content: '',
155169
isStreaming: true,
156-
streamId: undefined,
170+
streamId,
157171
} as any as CustomMessage)
158172
159173
// For some reason creating object reference first does not work, so we push and then get last message
@@ -187,8 +201,8 @@ async function handleSubmit({ input }: HandleSubmitArgs) {
187201
// Wraps in a kontroller to make sure there is only one stream on the same message
188202
throttle(
189203
1,
190-
() => streamToMessage({ message: targetMessage, content: userInput }),
191-
{ key: `messageStream-${targetMessage.id}` },
204+
() => streamToMessage({ message: targetMessage, content: userInput, streamId }),
205+
{ key: `messageStream-${streamId}` },
192206
)
193207
}
194208
@@ -200,10 +214,10 @@ async function resumeStreamProcess(streamSessionId: string, messageId: string) {
200214
if (getInstance(threadIdRef.value))
201215
return console.warn('Trying to resume stream for message that is currently streaming:', messageId)
202216
203-
// Currently we doesn't support SSE resume yet
217+
// Currently SSE resume not implemented yet
204218
// await streamToMessage({ message, resumeStreamId: streamSessionId })
205219
206-
// Using custom convex polling instead
220+
// Using custom convex polling resume instead
207221
await pollToMessage({ message, resumeStreamId: streamSessionId })
208222
}
209223
@@ -218,22 +232,26 @@ async function pollToMessage({ message, resumeStreamId, threadId = threadIdRef.v
218232
return
219233
}
220234
235+
streamingMessagesMap[resumeStreamId] = true
236+
console.log(`Polling: ${message.id}`)
237+
221238
const messageFromConvex = await convex.query(api.messages.get, {
222239
messageId: message._id,
223240
lockerKey: getLockerKey(threadId),
224241
})
225-
Object.assign(message, customMessageTransform(messageFromConvex))
242+
Object.assign(message, objectPick(messageFromConvex, ['content', 'context', 'isStreaming']))
226243
227244
if (message.isStreaming) {
228245
// Wraps in a kontroller to make sure there is only one stream on the same message
229246
countdown(
230247
500,
231-
() => pollToMessage({ message, resumeStreamId, threadId }),
232-
{ key: `messageStream-${message.id}` },
248+
() => { nextTick(() => { pollToMessage({ message, resumeStreamId, threadId }) }) },
249+
{ key: `messageStream-${resumeStreamId}` },
233250
)
234251
}
235252
else {
236-
console.log('Poll completed')
253+
console.log(`Poll completed: ${message.id}`)
254+
delete streamingMessagesMap[resumeStreamId]
237255
}
238256
239257
nextTick(() => { doScrollBottom({ maybe: true }) })
@@ -242,17 +260,19 @@ async function pollToMessage({ message, resumeStreamId, threadId = threadIdRef.v
242260
interface StreamToMessageArgs {
243261
message: CustomMessage
244262
content?: string
263+
streamId?: string
245264
resumeStreamId?: string
246265
}
247-
async function streamToMessage({ message, content, resumeStreamId }: StreamToMessageArgs) {
266+
async function streamToMessage({ message, content, streamId, resumeStreamId }: StreamToMessageArgs) {
248267
try {
249-
++streamingMessages.value
268+
streamingMessagesMap[(streamId ?? resumeStreamId)!] = true
250269
251270
const currentThreadId = threadIdRef.value
252271
const { response, abortController } = await postChatStream({
253272
threadId: currentThreadId as Id<'threads'>,
254273
...chatContext.activeAgent.value,
255274
content,
275+
streamId,
256276
resumeStreamId,
257277
})
258278
@@ -315,6 +335,8 @@ async function streamToMessage({ message, content, resumeStreamId }: StreamToMes
315335
if (state.content)
316336
message.content += state.content
317337
338+
console.log({ chunk, write: state.content })
339+
318340
nextTick(() => { doScrollBottom({ maybe: true }) })
319341
}
320342
@@ -325,14 +347,14 @@ async function streamToMessage({ message, content, resumeStreamId }: StreamToMes
325347
message!.content += `\nError: ${(error as Error).message}`
326348
}
327349
finally {
328-
--streamingMessages.value
350+
delete streamingMessagesMap[(streamId ?? resumeStreamId)!]
329351
}
330352
331353
console.log('Stream completed')
332354
}
333355
334356
async function _branchThreadFromMessage({ messageId, lockerKey }: BranchThreadFromMessageArgs) {
335-
if (streamingMessages.value > 0)
357+
if (Object.keys(streamingMessagesMap).length > 0)
336358
throw new Error('Can not branch while streaming')
337359
338360
const messagesLte = messages.value.slice(0, messages.value.findIndex(m => m._id === messageId) + 1)
@@ -417,7 +439,7 @@ function doScrollBottom({ smooth = true, maybe = false, tries = 0, lastScrollTop
417439
</VueLenis>
418440

419441
<PrompterArea
420-
v-bind="{ nearTopBottom, lenisRef, streamingMessages }"
442+
v-bind="{ nearTopBottom, lenisRef, streamingMessagesMap }"
421443
v-model:chat-input="chatInput"
422444
@submit="(input) => handleSubmit({ input })"
423445
/>

apps/frontend/app/components/chat/interface/PrompterArea.vue

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type Lenis from 'lenis'
44
const props = defineProps<{
55
nearTopBottom: Array<null | boolean | number>
66
lenisRef: undefined | { $el: HTMLElement, lenis: Lenis }
7-
streamingMessages: number
7+
streamingMessagesMap: Record<string, true>
88
}>()
99
1010
const emit = defineEmits<{
@@ -28,7 +28,7 @@ function handleSubmit({ confirmMultiStream = false }) {
2828
if (!userInput)
2929
return
3030
31-
if (!confirmMultiStream && props.streamingMessages > 0) {
31+
if (!confirmMultiStream && Object.keys(props.streamingMessagesMap).length > 0) {
3232
multiStreamConfirmDialogOpen.value = true
3333
return
3434
}

apps/frontend/app/utils/chat.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ export interface PostChatStreamArgs {
8181
model: string
8282
apiKey?: string
8383
content?: string
84+
streamId?: string
8485
resumeStreamId?: string
8586
finishOnly?: boolean
8687
abortController?: AbortController
@@ -93,7 +94,7 @@ export async function postChatStream(args: PostChatStreamArgs) {
9394
const { convexApiUrl } = useRuntimeConfig().public
9495
const { $auth } = useNuxtApp()
9596

96-
const response = await fetch(`${convexApiUrl}/api/ai/chat/stream`, {
97+
const response = await fetch(`${convexApiUrl}/api/chat/stream`, {
9798
method: 'POST',
9899
headers: {
99100
'Content-Type': 'application/json',

0 commit comments

Comments
 (0)