@@ -4,61 +4,32 @@ import type { ActionCtx } from '../_generated/server'
44import { createOpenAI } from '@ai-sdk/openai'
55import RateLimiter , { MINUTE } from '@convex-dev/rate-limiter'
66import { zValidator } from '@hono/zod-validator'
7- import { randomStr , sample , sleep } from '@namesmt/utils'
7+ import { randomStr , sleep } from '@namesmt/utils'
88import { createOpenRouter } from '@openrouter/ai-sdk-provider'
99import { streamText } from 'ai'
1010import { ConvexError } from 'convex/values'
1111import { Hono } from 'hono'
1212import { cors } from 'hono/cors'
1313import { throttle } from 'kontroll'
1414import { z } from 'zod'
15+ import { getAgentModel } from '../../utils/agent'
1516import { buildAiSdkMessage } from '../../utils/message'
1617import { api , components , internal } from '../_generated/api'
1718
18- const orModels = process . env . AI_MODELS_LIST ?. split ( ',' ) ?? [ 'qwen/qwen3-32b:free' ]
19- const openrouter = createOpenRouter ( {
20- apiKey : process . env . OPENROUTER_API_KEY ,
21- } )
22-
23- const aiMessagesSchema = z . array (
24- z . object ( {
25- role : z . enum ( [ 'user' , 'assistant' ] ) ,
26- content : z . string ( ) ,
27- } ) ,
28- )
29-
3019const rateLimiter = new RateLimiter ( components . rateLimiter , {
3120 aiChat : { kind : 'token bucket' , rate : 10 , period : MINUTE , capacity : 3 } ,
3221} )
3322
3423export const aiApp : HonoWithConvex < ActionCtx > = new Hono ( )
3524aiApp
3625 . use ( cors ( ) )
37- // Old endpoint for minimal testing, should use `/chat/stream` in consumer side
38- . post ( '/chat' , zValidator ( 'json' , z . object ( { messages : aiMessagesSchema } ) ) , async ( c ) => {
39- const userIdentity = await c . env . auth . getUserIdentity ( )
40- if ( userIdentity === null )
41- throw new ConvexError ( { msg : 'Not authenticated' } )
42-
43- await rateLimiter . limit ( c . env , 'aiChat' , { key : userIdentity . subject , throws : true } )
44-
45- const { messages } = c . req . valid ( 'json' )
46-
47- const result = streamText ( {
48- model : openrouter ( sample ( orModels , 1 ) [ 0 ] ) ,
49- messages,
50- system : 'Return response in markdown format, remember to response cleanly with linebreaks instead of endless paragraphs.' ,
51- } )
52-
53- return result . toDataStreamResponse ( )
54- } )
5526 . post (
5627 '/chat/stream' ,
5728 zValidator ( 'json' , z . object ( {
5829 threadId : z . string ( ) ,
5930 provider : z . string ( ) ,
6031 model : z . string ( ) ,
61- apiKey : z . string ( ) ,
32+ apiKey : z . optional ( z . string ( ) ) ,
6233 content : z . optional ( z . string ( ) ) ,
6334 context : z . optional ( z . object ( {
6435 from : z . optional ( z . string ( ) ) ,
@@ -174,18 +145,46 @@ aiApp
174145 // Get conversation history
175146 const messages = await c . env . runQuery ( api . messages . listByThread , { threadId, lockerKey } )
176147
177- // Prepare messages for AI API (exclude the streaming messages)
148+ // Prepare messages for AI API
178149 const messagesContext = messages
179- . filter ( msg => ! msg . isStreaming )
150+ . filter ( msg => msg . _id !== streamingMessageId )
180151 . map ( buildAiSdkMessage )
181152
153+ let aiResponse = existingMessage ? existingMessage . content : ''
154+
155+ let pendingSave = false
156+ function doSave ( ) {
157+ pendingSave = true
158+ throttle (
159+ 500 ,
160+ async ( ) => {
161+ await c . env . runMutation ( internal . messages . updateStreamingMessage , {
162+ messageId : streamingMessageId ,
163+ content : aiResponse ,
164+ isStreaming : true ,
165+ lockerKey,
166+ } ) . finally ( ( ) => {
167+ pendingSave = false
168+ } )
169+ } ,
170+ { trailing : true } ,
171+ )
172+ }
173+
174+ async function waitForSave ( ) {
175+ if ( pendingSave )
176+ await sleep ( 1000 )
177+ if ( pendingSave )
178+ await sleep ( 5000 )
179+ if ( pendingSave )
180+ console . error ( 'Save was stuck' )
181+ }
182+
182183 // Create streaming response
183184 const encoder = new TextEncoder ( )
184185 const stream = new ReadableStream ( {
185186 async start ( controller ) {
186187 try {
187- let aiResponse = existingMessage ? existingMessage . content : ''
188-
189188 // Send session ID first
190189 controller . enqueue ( encoder . encode ( `o: ${ JSON . stringify ( {
191190 messageId : streamingMessageId ,
@@ -201,45 +200,27 @@ aiApp
201200 } ) } \n`) )
202201 }
203202
204- // Call AI provider
205-
206- // Using hosted provider and model for now, switch to user configured BYOK when UI is ready for it
207- const providerStream = streamText ( {
208- model : openrouter ( sample ( orModels , 1 ) [ 0 ] ) ,
203+ const aiStream = streamText ( {
204+ model : getAgentModel ( { provider, model, apiKey } ) ,
205+ system : 'You are inside a chat room of multiple users and multiple agents, each message have an auto-added `System Context` block, which contains important information of each message, for example: which agent, which user sent which message,..., You can use it for context.\nIMPORTANT: NEVER add / include the `System Context` block in your response yourself, it will be automatically added later.' ,
209206 messages : messagesContext ,
210- onError : ( e ) => {
211- throw e . error
207+ onError : ( ev ) => {
208+ console . error ( ev . error )
209+ throw ev . error
212210 } ,
213211 } )
214212
215- let pendingSave = false
216- for await ( const textDelta of providerStream . textStream ) {
213+ for await ( const textDelta of aiStream . textStream ) {
217214 if ( textDelta ) {
218215 aiResponse += textDelta
219216 controller . enqueue ( encoder . encode ( `t: ${ textDelta } ` ) )
220217
221- pendingSave = true
222- throttle (
223- 500 ,
224- async ( ) => {
225- await c . env . runMutation ( internal . messages . updateStreamingMessage , {
226- messageId : streamingMessageId ,
227- content : aiResponse ,
228- isStreaming : true ,
229- lockerKey,
230- } ) . finally ( ( ) => {
231- pendingSave = false
232- } )
233- } ,
234- { trailing : true } ,
235- )
218+ doSave ( )
236219 }
237220 }
238- // eslint-disable-next-line no-unmodified-loop-condition
239- while ( pendingSave )
240- await sleep ( 1000 )
241221
242222 // Finish streaming
223+ await waitForSave ( )
243224 await c . env . runMutation ( internal . messages . finishStreaming , { streamId } )
244225 await c . env . runMutation ( internal . threads . updateThreadInfo , { threadId, timestamp : Date . now ( ) } )
245226
@@ -250,7 +231,12 @@ aiApp
250231 controller . close ( )
251232 }
252233 catch ( error : any ) {
253- // Mark streaming as finished on error
234+ console . error ( error )
235+
236+ aiResponse += `\n\nError encountered, stream stopped`
237+
238+ doSave ( )
239+ await waitForSave ( )
254240 await c . env . runMutation ( internal . messages . finishStreaming , { streamId } )
255241
256242 controller . enqueue ( encoder . encode ( `o: ${ JSON . stringify ( { error : error . message } ) } \n` ) )
0 commit comments