diff --git a/core/config/model_config.go b/core/config/model_config.go index 41664f2a3dd4..e7b4567c10af 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -9,6 +9,7 @@ import ( "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/downloader" "github.com/mudler/LocalAI/pkg/functions" + "github.com/mudler/cogito" "gopkg.in/yaml.v3" ) @@ -668,3 +669,40 @@ func (c *ModelConfig) GuessUsecases(u ModelConfigUsecases) bool { return true } + +// BuildCogitoOptions generates cogito options from the model configuration +// It accepts a context, MCP sessions, and optional callback functions for status, reasoning, tool calls, and tool results +func (c *ModelConfig) BuildCogitoOptions() []cogito.Option { + cogitoOpts := []cogito.Option{ + cogito.WithIterations(3), // default to 3 iterations + cogito.WithMaxAttempts(3), // default to 3 attempts + cogito.WithForceReasoning(), + } + + // Apply agent configuration options + if c.Agent.EnableReasoning { + cogitoOpts = append(cogitoOpts, cogito.EnableToolReasoner) + } + + if c.Agent.EnablePlanning { + cogitoOpts = append(cogitoOpts, cogito.EnableAutoPlan) + } + + if c.Agent.EnableMCPPrompts { + cogitoOpts = append(cogitoOpts, cogito.EnableMCPPrompts) + } + + if c.Agent.EnablePlanReEvaluator { + cogitoOpts = append(cogitoOpts, cogito.EnableAutoPlanReEvaluator) + } + + if c.Agent.MaxIterations != 0 { + cogitoOpts = append(cogitoOpts, cogito.WithIterations(c.Agent.MaxIterations)) + } + + if c.Agent.MaxAttempts != 0 { + cogitoOpts = append(cogitoOpts, cogito.WithMaxAttempts(c.Agent.MaxAttempts)) + } + + return cogitoOpts +} diff --git a/core/http/app.go b/core/http/app.go index 731e69df565c..7497a5d611fa 100644 --- a/core/http/app.go +++ b/core/http/app.go @@ -205,7 +205,7 @@ func API(application *application.Application) (*echo.Echo, error) { opcache = services.NewOpCache(application.GalleryService()) } - routes.RegisterLocalAIRoutes(e, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache) + routes.RegisterLocalAIRoutes(e, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache, application.TemplatesEvaluator()) routes.RegisterOpenAIRoutes(e, requestExtractor, application) if !application.ApplicationConfig().DisableWebUI { routes.RegisterUIAPIRoutes(e, application.ModelConfigLoader(), application.ApplicationConfig(), application.GalleryService(), opcache) diff --git a/core/http/endpoints/localai/mcp.go b/core/http/endpoints/localai/mcp.go new file mode 100644 index 000000000000..c13a388064f1 --- /dev/null +++ b/core/http/endpoints/localai/mcp.go @@ -0,0 +1,323 @@ +package localai + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/labstack/echo/v4" + "github.com/mudler/LocalAI/core/config" + mcpTools "github.com/mudler/LocalAI/core/http/endpoints/mcp" + "github.com/mudler/LocalAI/core/http/middleware" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/templates" + "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/cogito" + "github.com/rs/zerolog/log" +) + +// MCP SSE Event Types +type MCPReasoningEvent struct { + Type string `json:"type"` + Content string `json:"content"` +} + +type MCPToolCallEvent struct { + Type string `json:"type"` + Name string `json:"name"` + Arguments map[string]interface{} `json:"arguments"` + Reasoning string `json:"reasoning"` +} + +type MCPToolResultEvent struct { + Type string `json:"type"` + Name string `json:"name"` + Result string `json:"result"` +} + +type MCPStatusEvent struct { + Type string `json:"type"` + Message string `json:"message"` +} + +type MCPAssistantEvent struct { + Type string `json:"type"` + Content string `json:"content"` +} + +type MCPErrorEvent struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// MCPStreamEndpoint is the SSE streaming endpoint for MCP chat completions +// @Summary Stream MCP chat completions with reasoning, tool calls, and results +// @Param request body schema.OpenAIRequest true "query params" +// @Success 200 {object} schema.OpenAIResponse "Response" +// @Router /v1/mcp/chat/completions [post] +func MCPStreamEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator *templates.Evaluator, appConfig *config.ApplicationConfig) echo.HandlerFunc { + return func(c echo.Context) error { + ctx := c.Request().Context() + created := int(time.Now().Unix()) + + // Handle Correlation + id := c.Request().Header.Get("X-Correlation-ID") + if id == "" { + id = fmt.Sprintf("mcp-%d", time.Now().UnixNano()) + } + + input, ok := c.Get(middleware.CONTEXT_LOCALS_KEY_LOCALAI_REQUEST).(*schema.OpenAIRequest) + if !ok || input.Model == "" { + return echo.ErrBadRequest + } + + config, ok := c.Get(middleware.CONTEXT_LOCALS_KEY_MODEL_CONFIG).(*config.ModelConfig) + if !ok || config == nil { + return echo.ErrBadRequest + } + + if config.MCP.Servers == "" && config.MCP.Stdio == "" { + return fmt.Errorf("no MCP servers configured") + } + + // Get MCP config from model config + remote, stdio, err := config.MCP.MCPConfigFromYAML() + if err != nil { + return fmt.Errorf("failed to get MCP config: %w", err) + } + + // Check if we have tools in cache, or we have to have an initial connection + sessions, err := mcpTools.SessionsFromMCPConfig(config.Name, remote, stdio) + if err != nil { + return fmt.Errorf("failed to get MCP sessions: %w", err) + } + + if len(sessions) == 0 { + return fmt.Errorf("no working MCP servers found") + } + + // Build fragment from messages + fragment := cogito.NewEmptyFragment() + for _, message := range input.Messages { + fragment = fragment.AddMessage(message.Role, message.StringContent) + } + + port := appConfig.APIAddress[strings.LastIndex(appConfig.APIAddress, ":")+1:] + apiKey := "" + if len(appConfig.ApiKeys) > 0 { + apiKey = appConfig.ApiKeys[0] + } + + ctxWithCancellation, cancel := context.WithCancel(ctx) + defer cancel() + + // TODO: instead of connecting to the API, we should just wire this internally + // and act like completion.go. + // We can do this as cogito expects an interface and we can create one that + // we satisfy to just call internally ComputeChoices + defaultLLM := cogito.NewOpenAILLM(config.Name, apiKey, "http://127.0.0.1:"+port) + + // Build cogito options using the consolidated method + cogitoOpts := config.BuildCogitoOptions() + cogitoOpts = append( + cogitoOpts, + cogito.WithContext(ctxWithCancellation), + cogito.WithMCPs(sessions...), + ) + // Check if streaming is requested + toStream := input.Stream + + if !toStream { + // Non-streaming mode: execute synchronously and return JSON response + cogitoOpts = append( + cogitoOpts, + cogito.WithStatusCallback(func(s string) { + log.Debug().Msgf("[model agent] [model: %s] Status: %s", config.Name, s) + }), + cogito.WithReasoningCallback(func(s string) { + log.Debug().Msgf("[model agent] [model: %s] Reasoning: %s", config.Name, s) + }), + cogito.WithToolCallBack(func(t *cogito.ToolChoice) bool { + log.Debug().Str("model", config.Name).Str("tool", t.Name).Str("reasoning", t.Reasoning).Interface("arguments", t.Arguments).Msg("[model agent] Tool call") + return true + }), + cogito.WithToolCallResultCallback(func(t cogito.ToolStatus) { + log.Debug().Str("model", config.Name).Str("tool", t.Name).Str("result", t.Result).Interface("tool_arguments", t.ToolArguments).Msg("[model agent] Tool call result") + }), + ) + + f, err := cogito.ExecuteTools( + defaultLLM, fragment, + cogitoOpts..., + ) + if err != nil && !errors.Is(err, cogito.ErrNoToolSelected) { + return err + } + + f, err = defaultLLM.Ask(ctxWithCancellation, f) + if err != nil { + return err + } + + resp := &schema.OpenAIResponse{ + ID: id, + Created: created, + Model: input.Model, // we have to return what the user sent here, due to OpenAI spec. + Choices: []schema.Choice{{Message: &schema.Message{Role: "assistant", Content: &f.LastMessage().Content}}}, + Object: "chat.completion", + } + + jsonResult, _ := json.Marshal(resp) + log.Debug().Msgf("Response: %s", jsonResult) + + // Return the prediction in the response body + return c.JSON(200, resp) + } + + // Streaming mode: use SSE + // Set up SSE headers + c.Response().Header().Set("Content-Type", "text/event-stream") + c.Response().Header().Set("Cache-Control", "no-cache") + c.Response().Header().Set("Connection", "keep-alive") + c.Response().Header().Set("X-Correlation-ID", id) + + // Create channel for streaming events + events := make(chan interface{}) + ended := make(chan error, 1) + + // Set up callbacks for streaming + statusCallback := func(s string) { + events <- MCPStatusEvent{ + Type: "status", + Message: s, + } + } + + reasoningCallback := func(s string) { + events <- MCPReasoningEvent{ + Type: "reasoning", + Content: s, + } + } + + toolCallCallback := func(t *cogito.ToolChoice) bool { + events <- MCPToolCallEvent{ + Type: "tool_call", + Name: t.Name, + Arguments: t.Arguments, + Reasoning: t.Reasoning, + } + return true + } + + toolCallResultCallback := func(t cogito.ToolStatus) { + events <- MCPToolResultEvent{ + Type: "tool_result", + Name: t.Name, + Result: t.Result, + } + } + + cogitoOpts = append(cogitoOpts, + cogito.WithStatusCallback(statusCallback), + cogito.WithReasoningCallback(reasoningCallback), + cogito.WithToolCallBack(toolCallCallback), + cogito.WithToolCallResultCallback(toolCallResultCallback), + ) + + // Execute tools in a goroutine + go func() { + defer close(events) + + f, err := cogito.ExecuteTools( + defaultLLM, fragment, + cogitoOpts..., + ) + if err != nil && !errors.Is(err, cogito.ErrNoToolSelected) { + events <- MCPErrorEvent{ + Type: "error", + Message: fmt.Sprintf("Failed to execute tools: %v", err), + } + ended <- err + return + } + + // Get final response + f, err = defaultLLM.Ask(ctxWithCancellation, f) + if err != nil { + events <- MCPErrorEvent{ + Type: "error", + Message: fmt.Sprintf("Failed to get response: %v", err), + } + ended <- err + return + } + + // Stream final assistant response + content := f.LastMessage().Content + events <- MCPAssistantEvent{ + Type: "assistant", + Content: content, + } + + ended <- nil + }() + + // Stream events to client + LOOP: + for { + select { + case <-ctx.Done(): + // Context was cancelled (client disconnected or request cancelled) + log.Debug().Msgf("Request context cancelled, stopping stream") + cancel() + break LOOP + case event := <-events: + if event == nil { + // Channel closed + break LOOP + } + eventData, err := json.Marshal(event) + if err != nil { + log.Debug().Msgf("Failed to marshal event: %v", err) + continue + } + log.Debug().Msgf("Sending event: %s", string(eventData)) + _, err = fmt.Fprintf(c.Response().Writer, "data: %s\n\n", string(eventData)) + if err != nil { + log.Debug().Msgf("Sending event failed: %v", err) + cancel() + return err + } + c.Response().Flush() + case err := <-ended: + if err == nil { + // Send done signal + fmt.Fprintf(c.Response().Writer, "data: [DONE]\n\n") + c.Response().Flush() + break LOOP + } + log.Error().Msgf("Stream ended with error: %v", err) + errorEvent := MCPErrorEvent{ + Type: "error", + Message: err.Error(), + } + errorData, marshalErr := json.Marshal(errorEvent) + if marshalErr != nil { + fmt.Fprintf(c.Response().Writer, "data: {\"type\":\"error\",\"message\":\"Internal error\"}\n\n") + } else { + fmt.Fprintf(c.Response().Writer, "data: %s\n\n", string(errorData)) + } + fmt.Fprintf(c.Response().Writer, "data: [DONE]\n\n") + c.Response().Flush() + return nil + } + } + + log.Debug().Msgf("Stream ended") + return nil + } +} diff --git a/core/http/endpoints/openai/mcp.go b/core/http/endpoints/openai/mcp.go index a91706f51d10..264403c31d87 100644 --- a/core/http/endpoints/openai/mcp.go +++ b/core/http/endpoints/openai/mcp.go @@ -90,40 +90,27 @@ func MCPCompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, // we satisfy to just call internally ComputeChoices defaultLLM := cogito.NewOpenAILLM(config.Name, apiKey, "http://127.0.0.1:"+port) - cogitoOpts := []cogito.Option{ + // Build cogito options using the consolidated method + cogitoOpts := config.BuildCogitoOptions() + + cogitoOpts = append( + cogitoOpts, + cogito.WithContext(ctxWithCancellation), + cogito.WithMCPs(sessions...), cogito.WithStatusCallback(func(s string) { log.Debug().Msgf("[model agent] [model: %s] Status: %s", config.Name, s) }), - cogito.WithContext(ctxWithCancellation), - cogito.WithMCPs(sessions...), - cogito.WithIterations(3), // default to 3 iterations - cogito.WithMaxAttempts(3), // default to 3 attempts - cogito.WithForceReasoning(), - } - - if config.Agent.EnableReasoning { - cogitoOpts = append(cogitoOpts, cogito.EnableToolReasoner) - } - - if config.Agent.EnablePlanning { - cogitoOpts = append(cogitoOpts, cogito.EnableAutoPlan) - } - - if config.Agent.EnableMCPPrompts { - cogitoOpts = append(cogitoOpts, cogito.EnableMCPPrompts) - } - - if config.Agent.EnablePlanReEvaluator { - cogitoOpts = append(cogitoOpts, cogito.EnableAutoPlanReEvaluator) - } - - if config.Agent.MaxIterations != 0 { - cogitoOpts = append(cogitoOpts, cogito.WithIterations(config.Agent.MaxIterations)) - } - - if config.Agent.MaxAttempts != 0 { - cogitoOpts = append(cogitoOpts, cogito.WithMaxAttempts(config.Agent.MaxAttempts)) - } + cogito.WithReasoningCallback(func(s string) { + log.Debug().Msgf("[model agent] [model: %s] Reasoning: %s", config.Name, s) + }), + cogito.WithToolCallBack(func(t *cogito.ToolChoice) bool { + log.Debug().Msgf("[model agent] [model: %s] Tool call: %s, reasoning: %s, arguments: %+v", t.Name, t.Reasoning, t.Arguments) + return true + }), + cogito.WithToolCallResultCallback(func(t cogito.ToolStatus) { + log.Debug().Msgf("[model agent] [model: %s] Tool call result: %s, tool arguments: %+v", t.Name, t.Result, t.ToolArguments) + }), + ) f, err := cogito.ExecuteTools( defaultLLM, fragment, diff --git a/core/http/routes/localai.go b/core/http/routes/localai.go index 7b1c003ca021..bf8a7bfb8f16 100644 --- a/core/http/routes/localai.go +++ b/core/http/routes/localai.go @@ -7,6 +7,7 @@ import ( "github.com/mudler/LocalAI/core/http/middleware" "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/core/services" + "github.com/mudler/LocalAI/core/templates" "github.com/mudler/LocalAI/internal" "github.com/mudler/LocalAI/pkg/model" echoswagger "github.com/swaggo/echo-swagger" @@ -18,7 +19,8 @@ func RegisterLocalAIRoutes(router *echo.Echo, ml *model.ModelLoader, appConfig *config.ApplicationConfig, galleryService *services.GalleryService, - opcache *services.OpCache) { + opcache *services.OpCache, + evaluator *templates.Evaluator) { router.GET("/swagger/*", echoswagger.WrapHandler) // default @@ -133,4 +135,23 @@ func RegisterLocalAIRoutes(router *echo.Echo, requestExtractor.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_TOKENIZE)), requestExtractor.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.TokenizeRequest) })) + // MCP Stream endpoint + if evaluator != nil { + mcpStreamHandler := localai.MCPStreamEndpoint(cl, ml, evaluator, appConfig) + mcpStreamMiddleware := []echo.MiddlewareFunc{ + requestExtractor.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_CHAT)), + requestExtractor.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), + func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + if err := requestExtractor.SetOpenAIRequest(c); err != nil { + return err + } + return next(c) + } + }, + } + router.POST("/v1/mcp/chat/completions", mcpStreamHandler, mcpStreamMiddleware...) + router.POST("/mcp/v1/chat/completions", mcpStreamHandler, mcpStreamMiddleware...) + } + } diff --git a/core/http/static/chat.js b/core/http/static/chat.js index 993c956ac91a..1307b1b5486c 100644 --- a/core/http/static/chat.js +++ b/core/http/static/chat.js @@ -267,7 +267,15 @@ function processAndSendMessage(inputValue) { const input = document.getElementById("input"); if (input) input.value = ""; const systemPrompt = localStorage.getItem("system_prompt"); - Alpine.nextTick(() => { document.getElementById('messages').scrollIntoView(false); }); + Alpine.nextTick(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }); // Reset token tracking before starting new request requestStartTime = Date.now(); @@ -379,16 +387,14 @@ async function promptGPT(systemPrompt, input) { document.getElementById("fileName").innerHTML = ""; // Choose endpoint based on MCP mode - const endpoint = mcpMode ? "mcp/v1/chat/completions" : "v1/chat/completions"; + const endpoint = mcpMode ? "v1/mcp/chat/completions" : "v1/chat/completions"; const requestBody = { model: model, messages: messages, }; - // Only add stream parameter for regular chat (MCP doesn't support streaming) - if (!mcpMode) { - requestBody.stream = true; - } + // Add stream parameter for both regular chat and MCP (MCP now supports SSE streaming) + requestBody.stream = true; let response; try { @@ -444,64 +450,441 @@ async function promptGPT(systemPrompt, input) { return; } + // Handle streaming response (both regular and MCP mode now use SSE) if (mcpMode) { - // Handle MCP non-streaming response + // Handle MCP SSE streaming with new event types + const reader = response.body + ?.pipeThrough(new TextDecoderStream()) + .getReader(); + + if (!reader) { + Alpine.store("chat").add( + "assistant", + `Error: Failed to decode MCP API response`, + ); + toggleLoader(false); + return; + } + + // Store reader globally so stop button can cancel it + currentReader = reader; + + let buffer = ""; + let assistantContent = ""; + let assistantContentBuffer = []; + let thinkingContent = ""; + let isThinking = false; + let lastAssistantMessageIndex = -1; + let lastThinkingMessageIndex = -1; + let lastThinkingScrollTime = 0; + const THINKING_SCROLL_THROTTLE = 200; // Throttle scrolling to every 200ms + try { - const data = await response.json(); - - // Update token usage if present - if (data.usage) { - Alpine.store("chat").updateTokenUsage(data.usage); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + + buffer += value; + + let lines = buffer.split("\n"); + buffer = lines.pop(); // Retain any incomplete line in the buffer + + lines.forEach((line) => { + if (line.length === 0 || line.startsWith(":")) return; + if (line === "data: [DONE]") { + return; + } + + if (line.startsWith("data: ")) { + try { + const eventData = JSON.parse(line.substring(6)); + + // Handle different event types + switch (eventData.type) { + case "reasoning": + if (eventData.content) { + const chatStore = Alpine.store("chat"); + // Insert reasoning before assistant message if it exists + if (lastAssistantMessageIndex >= 0 && chatStore.history[lastAssistantMessageIndex]?.role === "assistant") { + chatStore.history.splice(lastAssistantMessageIndex, 0, { + role: "reasoning", + content: eventData.content, + html: DOMPurify.sanitize(marked.parse(eventData.content)), + image: [], + audio: [], + expanded: false // Reasoning is always collapsed + }); + lastAssistantMessageIndex++; // Adjust index since we inserted + // Scroll smoothly after adding reasoning + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 100); + } else { + // No assistant message yet, just add normally + chatStore.add("reasoning", eventData.content); + } + } + break; + + case "tool_call": + if (eventData.name) { + // Store as JSON for better formatting + const toolCallData = { + name: eventData.name, + arguments: eventData.arguments || {}, + reasoning: eventData.reasoning || "" + }; + Alpine.store("chat").add("tool_call", JSON.stringify(toolCallData, null, 2)); + // Scroll smoothly after adding tool call + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 100); + } + break; + + case "tool_result": + if (eventData.name) { + // Store as JSON for better formatting + const toolResultData = { + name: eventData.name, + result: eventData.result || "" + }; + Alpine.store("chat").add("tool_result", JSON.stringify(toolResultData, null, 2)); + // Scroll smoothly after adding tool result + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 100); + } + break; + + case "status": + // Status messages can be logged but not necessarily displayed + console.log("[MCP Status]", eventData.message); + break; + + case "assistant": + if (eventData.content) { + assistantContent += eventData.content; + const contentChunk = eventData.content; + + // Count tokens for rate calculation + tokensReceived += Math.ceil(contentChunk.length / 4); + updateTokensPerSecond(); + + // Check for thinking tags in the chunk (incremental detection) + if (contentChunk.includes("") || contentChunk.includes("")) { + isThinking = true; + thinkingContent = ""; + lastThinkingMessageIndex = -1; + } + + if (contentChunk.includes("") || contentChunk.includes("")) { + isThinking = false; + // When closing tag is detected, process the accumulated thinking content + if (thinkingContent.trim()) { + // Extract just the thinking part from the accumulated content + const thinkingMatch = thinkingContent.match(/<(?:thinking|redacted_reasoning)>(.*?)<\/(?:thinking|redacted_reasoning)>/s); + if (thinkingMatch && thinkingMatch[1]) { + const extractedThinking = thinkingMatch[1]; + const chatStore = Alpine.store("chat"); + const isMCPMode = chatStore.mcpMode || false; + const shouldExpand = !isMCPMode; // Expanded in non-MCP mode, collapsed in MCP mode + if (lastThinkingMessageIndex === -1) { + // Insert thinking before the last assistant message if it exists + if (lastAssistantMessageIndex >= 0 && chatStore.history[lastAssistantMessageIndex]?.role === "assistant") { + // Insert before assistant message + chatStore.history.splice(lastAssistantMessageIndex, 0, { + role: "thinking", + content: extractedThinking, + html: DOMPurify.sanitize(marked.parse(extractedThinking)), + image: [], + audio: [], + expanded: shouldExpand + }); + lastThinkingMessageIndex = lastAssistantMessageIndex; + lastAssistantMessageIndex++; // Adjust index since we inserted + } else { + // No assistant message yet, just add normally + chatStore.add("thinking", extractedThinking); + lastThinkingMessageIndex = chatStore.history.length - 1; + } + } else { + // Update existing thinking message + const lastMessage = chatStore.history[lastThinkingMessageIndex]; + if (lastMessage && lastMessage.role === "thinking") { + lastMessage.content = extractedThinking; + lastMessage.html = DOMPurify.sanitize(marked.parse(extractedThinking)); + } + } + // Scroll when thinking is finalized in non-MCP mode + if (!isMCPMode) { + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 50); + } + } + thinkingContent = ""; + } + } + + // Handle content based on thinking state + if (isThinking) { + thinkingContent += contentChunk; + const chatStore = Alpine.store("chat"); + const isMCPMode = chatStore.mcpMode || false; + const shouldExpand = !isMCPMode; // Expanded in non-MCP mode, collapsed in MCP mode + // Update the last thinking message or create a new one (incremental) + if (lastThinkingMessageIndex === -1) { + // Insert thinking before the last assistant message if it exists + if (lastAssistantMessageIndex >= 0 && chatStore.history[lastAssistantMessageIndex]?.role === "assistant") { + // Insert before assistant message + chatStore.history.splice(lastAssistantMessageIndex, 0, { + role: "thinking", + content: thinkingContent, + html: DOMPurify.sanitize(marked.parse(thinkingContent)), + image: [], + audio: [], + expanded: shouldExpand + }); + lastThinkingMessageIndex = lastAssistantMessageIndex; + lastAssistantMessageIndex++; // Adjust index since we inserted + } else { + // No assistant message yet, just add normally + chatStore.add("thinking", thinkingContent); + lastThinkingMessageIndex = chatStore.history.length - 1; + } + } else { + // Update existing thinking message + const lastMessage = chatStore.history[lastThinkingMessageIndex]; + if (lastMessage && lastMessage.role === "thinking") { + lastMessage.content = thinkingContent; + lastMessage.html = DOMPurify.sanitize(marked.parse(thinkingContent)); + } + } + // Scroll when thinking is updated in non-MCP mode (throttled) + if (!isMCPMode) { + const now = Date.now(); + if (now - lastThinkingScrollTime > THINKING_SCROLL_THROTTLE) { + lastThinkingScrollTime = now; + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 100); + } + } + } else { + // Regular assistant content - buffer it for batch processing + assistantContentBuffer.push(contentChunk); + } + } + break; + + case "error": + Alpine.store("chat").add( + "assistant", + `MCP Error: ${eventData.message}`, + ); + break; + } + } catch (error) { + console.error("Failed to parse MCP event:", line, error); + } + } + }); + + // Efficiently update assistant message in batch + if (assistantContentBuffer.length > 0) { + const regularContent = assistantContentBuffer.join(""); + + // Process any thinking tags that might be in the accumulated content + // This handles cases where tags are split across chunks + const { regularContent: processedRegular, thinkingContent: processedThinking } = processThinkingTags(regularContent); + + // Update or create assistant message with processed regular content + if (lastAssistantMessageIndex === -1) { + if (processedRegular && processedRegular.trim()) { + Alpine.store("chat").add("assistant", processedRegular); + lastAssistantMessageIndex = Alpine.store("chat").history.length - 1; + } + } else { + const chatStore = Alpine.store("chat"); + const lastMessage = chatStore.history[lastAssistantMessageIndex]; + if (lastMessage && lastMessage.role === "assistant") { + lastMessage.content = (lastMessage.content || "") + (processedRegular || ""); + lastMessage.html = DOMPurify.sanitize(marked.parse(lastMessage.content)); + } + } + + // Add any extracted thinking content from the processed buffer BEFORE assistant message + if (processedThinking && processedThinking.trim()) { + const chatStore = Alpine.store("chat"); + const isMCPMode = chatStore.mcpMode || false; + const shouldExpand = !isMCPMode; // Expanded in non-MCP mode, collapsed in MCP mode + // Insert thinking before assistant message if it exists + if (lastAssistantMessageIndex >= 0 && chatStore.history[lastAssistantMessageIndex]?.role === "assistant") { + chatStore.history.splice(lastAssistantMessageIndex, 0, { + role: "thinking", + content: processedThinking, + html: DOMPurify.sanitize(marked.parse(processedThinking)), + image: [], + audio: [], + expanded: shouldExpand + }); + lastAssistantMessageIndex++; // Adjust index since we inserted + } else { + // No assistant message yet, just add normally + chatStore.add("thinking", processedThinking); + } + } + + assistantContentBuffer = []; + } } - - // MCP endpoint returns content in choices[0].message.content (chat completion format) - // Fallback to choices[0].text for backward compatibility (completion format) - const content = data.choices[0]?.message?.content || data.choices[0]?.text || ""; - - if (!content && (!data.choices || data.choices.length === 0)) { - Alpine.store("chat").add( - "assistant", - `Error: Empty response from MCP endpoint`, - ); - toggleLoader(false); - return; + + // Final assistant content flush if any data remains + if (assistantContentBuffer.length > 0) { + const regularContent = assistantContentBuffer.join(""); + // Process any remaining thinking tags that might be in the buffer + const { regularContent: processedRegular, thinkingContent: processedThinking } = processThinkingTags(regularContent); + + const chatStore = Alpine.store("chat"); + + // First, add any extracted thinking content BEFORE assistant message + if (processedThinking && processedThinking.trim()) { + const isMCPMode = chatStore.mcpMode || false; + const shouldExpand = !isMCPMode; // Expanded in non-MCP mode, collapsed in MCP mode + // Insert thinking before assistant message if it exists + if (lastAssistantMessageIndex >= 0 && chatStore.history[lastAssistantMessageIndex]?.role === "assistant") { + chatStore.history.splice(lastAssistantMessageIndex, 0, { + role: "thinking", + content: processedThinking, + html: DOMPurify.sanitize(marked.parse(processedThinking)), + image: [], + audio: [], + expanded: shouldExpand + }); + lastAssistantMessageIndex++; // Adjust index since we inserted + } else { + // No assistant message yet, just add normally + chatStore.add("thinking", processedThinking); + } + } + + // Then update or create assistant message + if (lastAssistantMessageIndex !== -1) { + const lastMessage = chatStore.history[lastAssistantMessageIndex]; + if (lastMessage && lastMessage.role === "assistant") { + lastMessage.content = (lastMessage.content || "") + (processedRegular || ""); + lastMessage.html = DOMPurify.sanitize(marked.parse(lastMessage.content)); + } + } else if (processedRegular && processedRegular.trim()) { + chatStore.add("assistant", processedRegular); + lastAssistantMessageIndex = chatStore.history.length - 1; + } } - if (content) { - // Count tokens for rate calculation (MCP mode - full content at once) - // Prefer actual token count from API if available - if (data.usage && data.usage.completion_tokens) { - tokensReceived = data.usage.completion_tokens; + // Final thinking content flush if any data remains (from incremental detection) + if (thinkingContent.trim() && lastThinkingMessageIndex === -1) { + // Extract thinking content if tags are present + const thinkingMatch = thinkingContent.match(/<(?:thinking|redacted_reasoning)>(.*?)<\/(?:thinking|redacted_reasoning)>/s); + if (thinkingMatch && thinkingMatch[1]) { + const chatStore = Alpine.store("chat"); + const isMCPMode = chatStore.mcpMode || false; + const shouldExpand = !isMCPMode; // Expanded in non-MCP mode, collapsed in MCP mode + // Insert thinking before assistant message if it exists + if (lastAssistantMessageIndex >= 0 && chatStore.history[lastAssistantMessageIndex]?.role === "assistant") { + chatStore.history.splice(lastAssistantMessageIndex, 0, { + role: "thinking", + content: thinkingMatch[1], + html: DOMPurify.sanitize(marked.parse(thinkingMatch[1])), + image: [], + audio: [], + expanded: shouldExpand + }); + } else { + // No assistant message yet, just add normally + chatStore.add("thinking", thinkingMatch[1]); + } } else { - tokensReceived += Math.ceil(content.length / 4); + Alpine.store("chat").add("thinking", thinkingContent); } - updateTokensPerSecond(); - - // Process thinking tags using shared function - const { regularContent, thinkingContent } = processThinkingTags(content); + } + + // Final pass: process the entire assistantContent to catch any missed thinking tags + // This ensures we don't miss tags that were split across chunks + if (assistantContent.trim()) { + const { regularContent: finalRegular, thinkingContent: finalThinking } = processThinkingTags(assistantContent); - // Add thinking content if present - if (thinkingContent) { - Alpine.store("chat").add("thinking", thinkingContent); + // Update assistant message with final processed content (without thinking tags) + if (finalRegular && finalRegular.trim()) { + if (lastAssistantMessageIndex !== -1) { + const chatStore = Alpine.store("chat"); + const lastMessage = chatStore.history[lastAssistantMessageIndex]; + if (lastMessage && lastMessage.role === "assistant") { + lastMessage.content = finalRegular; + lastMessage.html = DOMPurify.sanitize(marked.parse(lastMessage.content)); + } + } else { + Alpine.store("chat").add("assistant", finalRegular); + } } - // Add regular content if present - if (regularContent) { - Alpine.store("chat").add("assistant", regularContent); + // Add any extracted thinking content (only if not already added) + if (finalThinking && finalThinking.trim()) { + const hasThinking = Alpine.store("chat").history.some(msg => + msg.role === "thinking" && msg.content.trim() === finalThinking.trim() + ); + if (!hasThinking) { + Alpine.store("chat").add("thinking", finalThinking); + } } } - - // Highlight all code blocks + + // Highlight all code blocks once at the end hljs.highlightAll(); } catch (error) { // Don't show error if request was aborted by user - if (error.name !== 'AbortError' || currentAbortController) { + if (error.name !== 'AbortError' || !currentAbortController) { Alpine.store("chat").add( "assistant", - `Error: Failed to parse MCP response`, + `Error: Failed to process MCP stream`, ); } } finally { + // Perform any cleanup if necessary + if (reader) { + reader.releaseLock(); + } + currentReader = null; currentAbortController = null; } } else { @@ -539,6 +922,8 @@ async function promptGPT(systemPrompt, input) { let thinkingContent = ""; let isThinking = false; let lastThinkingMessageIndex = -1; + let lastThinkingScrollTime = 0; + const THINKING_SCROLL_THROTTLE = 200; // Throttle scrolling to every 200ms try { while (true) { @@ -606,6 +991,20 @@ async function promptGPT(systemPrompt, input) { lastMessage.html = DOMPurify.sanitize(marked.parse(thinkingContent)); } } + // Scroll when thinking is updated (throttled) + const now = Date.now(); + if (now - lastThinkingScrollTime > THINKING_SCROLL_THROTTLE) { + lastThinkingScrollTime = now; + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 100); + } } else { contentBuffer.push(token); } @@ -620,6 +1019,16 @@ async function promptGPT(systemPrompt, input) { if (contentBuffer.length > 0) { addToChat(contentBuffer.join("")); contentBuffer = []; + // Scroll when assistant content is updated (this will also show thinking messages above) + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 50); } } @@ -654,8 +1063,17 @@ async function promptGPT(systemPrompt, input) { // Remove class "loader" from the element with "loader" id toggleLoader(false); - // scroll to the bottom of the chat - document.getElementById('messages').scrollIntoView(false) + // scroll to the bottom of the chat consistently + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 100); + // set focus to the input document.getElementById("input").focus(); } @@ -784,7 +1202,13 @@ document.addEventListener("alpine:init", () => { audio: audio || [] }); } - document.getElementById('messages').scrollIntoView(false); + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } const parser = new DOMParser(); const html = parser.parseFromString( this.history[this.history.length - 1].html, @@ -812,3 +1236,4 @@ document.addEventListener("alpine:init", () => { }); } }); + diff --git a/core/http/views/chat.html b/core/http/views/chat.html index 5aa45e3e30c5..7e042edd6776 100644 --- a/core/http/views/chat.html +++ b/core/http/views/chat.html @@ -111,14 +111,36 @@ }, add(role, content, image, audio) { const N = this.history.length - 1; - // For thinking messages, always create a new message - if (role === "thinking") { + // For thinking, reasoning, tool_call, and tool_result messages, always create a new message + if (role === "thinking" || role === "reasoning" || role === "tool_call" || role === "tool_result") { let c = ""; - const lines = content.split("\n"); - lines.forEach((line) => { - c += DOMPurify.sanitize(marked.parse(line)); - }); - this.history.push({ role, content, html: c, image, audio }); + if (role === "tool_call" || role === "tool_result") { + // For tool calls and results, try to parse as JSON and format nicely + try { + const parsed = typeof content === 'string' ? JSON.parse(content) : content; + // Format JSON with proper indentation + const formatted = JSON.stringify(parsed, null, 2); + c = DOMPurify.sanitize('
' + formatted + '
'); + } catch (e) { + // If not JSON, treat as markdown + const lines = content.split("\n"); + lines.forEach((line) => { + c += DOMPurify.sanitize(marked.parse(line)); + }); + } + } else { + // For thinking and reasoning, format as markdown + const lines = content.split("\n"); + lines.forEach((line) => { + c += DOMPurify.sanitize(marked.parse(line)); + }); + } + // Set expanded state: thinking is expanded by default in non-MCP mode, collapsed in MCP mode + // Reasoning, tool_call, and tool_result are always collapsed by default + const isMCPMode = this.mcpMode || false; + const shouldExpand = (role === "thinking" && !isMCPMode) || false; + this.history.push({ role, content, html: c, image, audio, expanded: shouldExpand }); + } // For other messages, merge if same role else if (this.history.length && this.history[N].role === role) { @@ -147,7 +169,16 @@ audio: audio || [] }); } - document.getElementById('messages').scrollIntoView(false); + // Scroll to bottom consistently for all messages (use #chat as it's the scrollable container) + setTimeout(() => { + const chatContainer = document.getElementById('chat'); + if (chatContainer) { + chatContainer.scrollTo({ + top: chatContainer.scrollHeight, + behavior: 'smooth' + }); + } + }, 100); const parser = new DOMParser(); const html = parser.parseFromString( this.history[this.history.length - 1].html, @@ -160,9 +191,33 @@ if (this.languages.includes(language)) return; const script = document.createElement("script"); script.src = `https://cdn.jsdelivr.net/gh/highlightjs/cdn-release@11.8.0/build/languages/${language}.min.js`; + script.onload = () => { + // Re-highlight after language script loads + if (window.hljs) { + const container = document.getElementById('messages'); + if (container) { + container.querySelectorAll('pre code.language-json').forEach(block => { + window.hljs.highlightElement(block); + }); + } + } + }; document.head.appendChild(script); this.languages.push(language); }); + // Highlight code blocks immediately if hljs is available + if (window.hljs) { + setTimeout(() => { + const container = document.getElementById('messages'); + if (container) { + container.querySelectorAll('pre code.language-json').forEach(block => { + if (!block.classList.contains('hljs')) { + window.hljs.highlightElement(block); + } + }); + } + }, 100); + } }, messages() { return this.history.map((message) => ({ @@ -484,9 +539,108 @@

  • To send a text, markdown or PDF file, click the icon.
  • -
    -