diff --git a/framework/fel/java/plugins/tool-mcp-server/README.md b/framework/fel/java/plugins/tool-mcp-server/README.md new file mode 100644 index 000000000..b9222ebdd --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-server/README.md @@ -0,0 +1,322 @@ +# FitMcpStreamableServerTransportProvider类维护文档 + +## 文档概述 + +本文档用于记录 `FitMcpStreamableServerTransportProvider` 类的设计、实现细节以及维护更新指南。该类是基于 MCP SDK 中的 +`HttpServletStreamableServerTransportProvider` 类改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。 + +**原始参考类**: MCP SDK 中的 `HttpServletStreamableServerTransportProvider` + +**创建时间**: 2025-11-04 + +--- + +## 类的作用和职责 + +`FitMcpStreamableServerTransportProvider` 是 MCP 服务端传输层的核心实现类,负责: + +1. **HTTP 端点处理**: 处理 GET、POST、DELETE 请求,实现 MCP 协议的 HTTP 传输层 +2. **会话管理**: 管理客户端会话的生命周期(创建、维护、销毁) +3. **SSE 通信**: 通过 Server-Sent Events (SSE) 实现服务端到客户端的实时消息推送 +4. **消息序列化**: 处理 JSON-RPC 消息的序列化和反序列化 +5. **连接保活**: 支持可选的 Keep-Alive 机制 +6. **优雅关闭**: 支持服务的优雅关闭和资源清理 + +--- + +## 类结构概览 + +### 主要成员变量 + +| 变量名 | 类型 | 来源 | 说明 | +|----------------------|----------------------------------------------------------|------------|---------------------------------| +| `MESSAGE_ENDPOINT` | `String` | SDK 原始 | 消息端点路径 `/mcp/streamable` | +| `disallowDelete` | `boolean` | SDK 原始 | 是否禁用 DELETE 请求 | +| `jsonMapper` | `McpJsonMapper` | SDK 原始 | JSON 序列化器 | +| `contextExtractor` | `McpTransportContextExtractor` | **FIT 改造** | 上下文提取器(泛型参数改为 FIT 的 Request 类型) | +| `keepAliveScheduler` | `KeepAliveScheduler` | SDK 原始 | Keep-Alive 调度器 | +| `sessionFactory` | `McpStreamableServerSession.Factory` | SDK 原始 | 会话工厂 | +| `sessions` | `Map` | SDK 原始 | 活跃会话映射表 | +| `isClosing` | `volatile boolean` | SDK 原始 | 关闭标志 | + +### 主要方法 + +| 方法名 | 来源 | 说明 | +| --------------------- | ------------ | ------------------------------- | +| `protocolVersions()` | SDK 原始 | 返回支持的 MCP 协议版本 | +| `setSessionFactory()` | SDK 原始 | 设置会话工厂 | +| `notifyClients()` | SDK 原始 | 广播通知到所有客户端 | +| `closeGracefully()` | SDK 原始 | 优雅关闭传输层 | +| `handleGet()` | **FIT 改造** | 处理 GET 请求(SSE 连接) | +| `handlePost()` | **FIT 改造** | 处理 POST 请求(JSON-RPC 消息) | +| `handleDelete()` | **FIT 改造** | 处理 DELETE 请求(会话删除) | + +### 重构后的辅助方法 + +为提高代码可读性和可维护性,从原本的 `handleGet()`、`handlePost()`、`handleDelete()` 方法中抽取了以下辅助方法: + +#### 验证请求合法性的方法 + +| 方法名 | 说明 | +|-------------------------------|----------------------------------------------------------| +| `validateGetAcceptHeaders()` | 验证 GET 请求的 Accept 头,确保包含 `text/event-stream` | +| `validatePostAcceptHeaders()` | 验证 POST 请求的 Accept 头,确保包含 `text/event-stream` 和 `application/json` | +| `validateRequestSessionId()` | 验证请求的 `mcp-session-id` 头是否存在,以及对应的会话是否存在 | + +#### 根据请求类型调用处理逻辑的方法 + +| 方法名 | 处理的请求类型 | 说明 | +|---------------------------------|---------|------------------------------------------| +| `handleReplaySseRequest()` | GET | 处理 SSE 消息重放请求,用于断线重连后恢复错过的消息 | +| `handleEstablishSseRequest()` | GET | 处理 SSE 连接建立请求,创建新的持久化 SSE 监听流 | +| `handleInitializeRequest()` | POST | 处理客户端初始化连接请求,创建新的 MCP 会话 | +| `handleJsonRpcMessage()` | POST | 把非Initialize的客户端消息分流给下面三个方法,包含Session验证。 | +| `handleJsonRpcResponse()` | POST | 处理 JSON-RPC 响应消息(如 Elicitation 中的客户端响应) | +| `handleJsonRpcNotification()` | POST | 处理 JSON-RPC 通知消息(客户端单向通知) | +| `handleJsonRpcRequest()` | POST | 处理 JSON-RPC 请求消息,返回 SSE 流式响应 | + +### 内部类 + +| 类名 | 来源 | 说明 | +|------------------------------------|------------|-----------------------------| +| `FitStreamableMcpSessionTransport` | **FIT 改造** | 用于SSE 会话`sendMessage()`传输实现 | +| `Builder` | SDK 原始 | 构建器模式 | + +--- + +## SDK 原始逻辑 + +以下是从 MCP SDK 的 `HttpServletStreamableServerTransportProvider` 类保留的原始逻辑: + +### 1. 会话管理核心逻辑 + +```java +private final Map sessions = new ConcurrentHashMap<>(); +``` + +- 使用 `ConcurrentHashMap` 存储活跃会话 +- 会话以 `mcp-session-id` 作为键 + +### 2. 会话工厂设置 + +```java +public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) { + this.sessionFactory = sessionFactory; +} +``` + +- 由外部设置会话工厂,用于创建新会话 + +### 3. 客户端通知 + +```java +public Mono notifyClients(String method, Object params) { + // ... 广播逻辑 +} +``` + +- 向所有活跃会话并行发送通知 +- 使用 `parallelStream()` 提高效率 +- 单个会话失败不影响其他会话 + +### 4. 关闭逻辑 + +```java +public Mono closeGracefully() { + this.isClosing = true; + // ... 关闭所有会话 + // ... 关闭 keep-alive 调度器 +} +``` + +- 设置关闭标志 +- 关闭所有活跃会话 +- 清理资源 + +## FIT 框架改造核心逻辑 + +以下是为适配 FIT 框架而新增或改造的部分: + +### 1. HTTP 端点处理核心流程(核心改造) + +- 请求/响应对象类型变更: + - `HttpServletRequest` → `HttpClassicServerRequest` + - `HttpServletResponse` → `HttpClassicServerResponse` +- 返回类型改为通用的 `Object`,支持多种返回形式 + +#### a. GET 请求处理流程 + +1. 检查服务器是否正在关闭 +2. **调用 `validateGetAcceptHeaders()`** - 验证 Accept 头是否包含 `text/event-stream` +3. **调用 `validateRequestSessionId()`** - 验证 `mcp-session-id` 头是否存在及对应会话是否存在 +4. 提取 `transportContext` 上下文 +5. 获取会话 ID 和会话对象 +6. 检查是否是重放请求(`Last-Event-ID` 头): + - 如果是,**调用 `handleReplaySseRequest()`** - 重放错过的消息 + - 如果否,**调用 `handleEstablishSseRequest()`** - 建立新的 SSE 监听流 + +#### b. POST 请求处理流程 + +1. 检查服务器是否正在关闭 +2. **调用 `validatePostAcceptHeaders()`** - 验证 Accept 头包含 `text/event-stream` 和 `application/json` +3. 提取 `transportContext` 上下文 +4. 反序列化 JSON-RPC 消息 +5. 判断是否为初始化请求(`initialize` 方法): + - 如果是,**调用 `handleInitializeRequest()`** - 创建新会话并返回初始化结果 +6. **调用 `validateRequestSessionId()`** - 验证会话(仅非初始化请求) +7. 获取会话 ID 和会话对象 +8. 根据消息类型分发处理: + - `JSONRPCResponse` → **调用 `handleJsonRpcResponse()`** + - `JSONRPCNotification` → **调用 `handleJsonRpcNotification()`** + - `JSONRPCRequest` → **调用 `handleJsonRpcRequest()`** + +#### c. DELETE 请求处理流程 + +1. 检查服务器是否正在关闭 +2. 检查是否禁用 DELETE 操作 +3. **调用 `validateRequestSessionId()`** - 验证 `mcp-session-id` 头及会话存在性 +4. 提取 `transportContext` 上下文 +5. 获取会话 ID 和会话对象 +6. 删除会话并从会话映射表中移除 + +### 2. SSE 实现改造(核心改造) + +**原始 SDK**: + +```java +SseEmitter sseEmitter = new SseEmitter(); +sseEmitter.send(SseEmitter.event() + .id(messageId) + .name("message") + .data(jsonText)); +sseEmitter.complete(); +``` + +**FIT 框架改造**: + +```java +// 使用 Choir 和 Emitter 实现 SSE +Choir.create(emitter -> { + // 创建sessionTransport类,用于调用emitter发送消息 + FitStreamableMcpSessionTransport sessionTransport = + new FitStreamableMcpSessionTransport(sessionId, emitter, response); + + // session的逻辑是SDK原有的,里面会调用sessionTransport发送事件流 + session.responseStream(jsonrpcRequest, sessionTransport) + .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) + .block(); + + // 监听 Emitter 的生命周期 + emitter.observe(new Emitter.Observer() { + @Override + public void onEmittedData(TextEvent data) { + // 数据发送完成 + } + + @Override + public void onCompleted() { + // SSE 流正常结束 + listeningStream.close(); + } + + @Override + public void onFailed(Exception cause) { + // SSE 流异常结束 + listeningStream.close(); + } + }); +}); +``` + +**关键变化**: + +- 使用 `Choir` 返回事件流 +- 使用 `Emitter` 替代 `SseEmitter` 的发送方法 +- 使用 `Emitter.Observer` 监听 SSE 生命周期事件 + +### 3. HTTP 响应处理改造 + +**FIT 特有的响应方式**: + +#### 返回纯文本 + +```java +response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); +return Entity.createText(response, "Session ID required in mcp-session-id header"); +``` + +#### 返回 JSON 对象 + +```java +response.statusCode(HttpResponseStatus.NOT_FOUND.statusCode()); +return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS) + .message("Session not found: "+sessionId) + .build()); +``` + +#### 返回 SSE 流(重要改造) + +```java +return Choir. create(emitter ->{ + // emitter封装在sessionTransport中,被session调用 + emitter.emit(textEvent); +}); +``` + +### 4. HTTP 头处理改造 + +**FIT 框架的 Headers API**: + +```java +// 获取 Header +String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse(""); +boolean hasSessionId = request.headers().contains(HttpHeaders.MCP_SESSION_ID); +String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); + +// 设置 Header +response.headers().set("Content-Type",MimeType.APPLICATION_JSON.value()); +response.headers().set(HttpHeaders.MCP_SESSION_ID, sessionId); + +// 设置状态码 +response.statusCode(HttpResponseStatus.OK.statusCode()); +``` + +**变化**: + +- 使用 `request.headers().first(name).orElse(default)` 获取单个 Header +- 使用 `request.headers().contains(name)` 检查 Header 是否存在 +- 使用 FIT 的 `MessageHeaderNames` 和 `MimeType` 常量 +- 使用 `HttpResponseStatus` 枚举设置状态码 + +### 5. 内部类 Transport 实现 + +`FitStreamableMcpSessionTransport` 类的核心职责是发送SSE事件: + +- `sendmessage()`方法通过`Emitter` 发送SSE消息到客户端 +- 保存了当前会话的事件的`Emitter`,负责close时关闭`Emitter` + +- SSE的`Emitter`感知不到GET连接是否断开,因此在`sendmessage()`发送前检查GET连接是否活跃 + +```java +// 在发送消息前检查连接是否仍然活跃 +if(!this.response.isActive()){ + logger.warn("[SSE] Connection inactive detected while sending message for session: {}", + this.sessionId); + this.close(); + return; +} +``` + +## 参考资源 + +### MCP 协议文档 + +- MCP 协议规范:[https://spec.modelcontextprotocol.io/](https://spec.modelcontextprotocol.io/) +- MCP SDK GitHub: [https://github.com/modelcontextprotocol/](https://github.com/modelcontextprotocol/) + +### 更新记录 + +| 日期 | 更新内容 | 负责人 | +|----------|---------------------------------|-----| +| 2025-11-04 | 初始版本,从 SDK 改造为 FIT 框架实现 | 黄可欣 | +| 2025-11-05 | 代码重构,提取9个辅助方法提高可读性和可维护性 | 黄可欣 | \ No newline at end of file diff --git a/framework/fel/java/plugins/tool-mcp-server/pom.xml b/framework/fel/java/plugins/tool-mcp-server/pom.xml index 514874710..b6072ea4a 100644 --- a/framework/fel/java/plugins/tool-mcp-server/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-server/pom.xml @@ -41,6 +41,11 @@ org.fitframework.fel tool-mcp-common + + io.modelcontextprotocol.sdk + mcp + 0.14.1 + diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java index d62c13b8b..7febd4ddd 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java @@ -6,11 +6,9 @@ package modelengine.fel.tool.mcp.server; -import modelengine.fel.tool.mcp.entity.ServerSchema; import modelengine.fel.tool.mcp.entity.Tool; import java.util.List; -import java.util.Map; /** * Represents the MCP Server. @@ -19,13 +17,6 @@ * @since 2025-05-15 */ public interface McpServer { - /** - * Gets MCP server schema. - * - * @return The MCP server schema as a {@link ServerSchema}. - */ - ServerSchema getSchema(); - /** * Gets MCP server tools. * @@ -33,15 +24,6 @@ public interface McpServer { */ List getTools(); - /** - * Calls MCP server tool. - * - * @param name The tool name as a {@link String}. - * @param arguments The tool arguments as a {@link Map}{@code <}{@link String}{@code , }{@link Object}{@code >}. - * @return The tool result as a {@link Object}. - */ - Object callTool(String name, Map arguments); - /** * Registers MCP server tools changed observer. * diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerConfig.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerConfig.java new file mode 100644 index 000000000..880bc4599 --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerConfig.java @@ -0,0 +1,42 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.server; + +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpSyncServer; +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fel.tool.mcp.server.transport.FitMcpStreamableServerTransportProvider; +import modelengine.fitframework.annotation.Bean; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.annotation.Value; + +import java.time.Duration; + +/** + * Mcp Server Bean implemented with MCP SDK. + * + * @author 黄可欣 + * @since 2025-10-22 + */ +@Component +public class McpServerConfig { + @Bean + public FitMcpStreamableServerTransportProvider fitMcpStreamableServerTransportProvider() { + return FitMcpStreamableServerTransportProvider.builder().jsonMapper(McpJsonMapper.getDefault()).build(); + } + + @Bean + public McpSyncServer mcpSyncServer(FitMcpStreamableServerTransportProvider transportProvider, + @Value("${mcp.server.request.timeout-seconds}") int requestTimeoutSeconds) { + return McpServer.sync(transportProvider) + .serverInfo("FIT Store MCP Server", "3.6.0-SNAPSHOT") + .capabilities(McpSchema.ServerCapabilities.builder().tools(true).logging().build()) + .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) + .build(); + } +} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java deleted file mode 100644 index 3585020db..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java +++ /dev/null @@ -1,182 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server; - -import static modelengine.fitframework.inspection.Validation.notNull; -import static modelengine.fitframework.util.ObjectUtils.cast; - -import modelengine.fel.tool.mcp.entity.Event; -import modelengine.fel.tool.mcp.entity.JsonRpc; -import modelengine.fel.tool.mcp.entity.Method; -import modelengine.fel.tool.mcp.server.handler.InitializeHandler; -import modelengine.fel.tool.mcp.server.handler.PingHandler; -import modelengine.fel.tool.mcp.server.handler.ToolCallHandler; -import modelengine.fel.tool.mcp.server.handler.ToolListHandler; -import modelengine.fel.tool.mcp.server.handler.LoggingSetLevelHandler; -import modelengine.fel.tool.mcp.server.handler.UnsupportedMethodHandler; -import modelengine.fit.http.annotation.GetMapping; -import modelengine.fit.http.annotation.PostMapping; -import modelengine.fit.http.annotation.RequestBody; -import modelengine.fit.http.annotation.RequestQuery; -import modelengine.fit.http.entity.TextEvent; -import modelengine.fit.http.server.HttpClassicServerResponse; -import modelengine.fitframework.annotation.Component; -import modelengine.fitframework.annotation.Fit; -import modelengine.fitframework.flowable.Choir; -import modelengine.fitframework.flowable.Emitter; -import modelengine.fitframework.log.Logger; -import modelengine.fitframework.schedule.ExecutePolicy; -import modelengine.fitframework.schedule.Task; -import modelengine.fitframework.schedule.ThreadPoolScheduler; -import modelengine.fitframework.serialization.ObjectSerializer; -import modelengine.fitframework.util.CollectionUtils; -import modelengine.fitframework.util.MapUtils; -import modelengine.fitframework.util.StringUtils; -import modelengine.fitframework.util.UuidUtils; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * FIT MCP Server controller. - * - * @author 季聿阶 - * @since 2025-05-13 - */ -@Component -public class McpServerController implements McpServer.ToolsChangedObserver { - private static final Logger log = Logger.get(McpServerController.class); - private static final String MESSAGE_PATH = "/mcp/message"; - private static final String RESPONSE_OK = StringUtils.EMPTY; - - private final Map> emitters = new ConcurrentHashMap<>(); - private final Map responses = new ConcurrentHashMap<>(); - private final Map methodHandlers = new HashMap<>(); - private final MessageHandler unsupportedMethodHandler = new UnsupportedMethodHandler(); - private final ObjectSerializer serializer; - - /** - * Constructs a new instance of the McpController class. - * - * @param serializer The JSON serializer used to serialize and deserialize RPC messages, as an - * {@link ObjectSerializer}. - * @param mcpServer The MCP server instance used to handle tool operations such as initialization, - * listing tools, and calling tools, as a {@link McpServer}. - */ - public McpServerController(@Fit(alias = "json") ObjectSerializer serializer, McpServer mcpServer) { - this.serializer = notNull(serializer, "The json serializer cannot be null."); - notNull(mcpServer, "The MCP server cannot be null."); - mcpServer.registerToolsChangedObserver(this); - - this.methodHandlers.put(Method.INITIALIZE.code(), new InitializeHandler(mcpServer)); - this.methodHandlers.put(Method.PING.code(), new PingHandler()); - this.methodHandlers.put(Method.TOOLS_LIST.code(), new ToolListHandler(mcpServer)); - this.methodHandlers.put(Method.TOOLS_CALL.code(), new ToolCallHandler(mcpServer, this.serializer)); - this.methodHandlers.put(Method.LOGGING_SET_LEVEL.code(), new LoggingSetLevelHandler()); - - ThreadPoolScheduler channelDetectorScheduler = ThreadPoolScheduler.custom() - .corePoolSize(1) - .isDaemonThread(true) - .threadPoolName("mcp-server-channel-detector") - .build(); - channelDetectorScheduler.schedule(Task.builder().policy(ExecutePolicy.fixedDelay(10000)).runnable(() -> { - if (MapUtils.isEmpty(this.responses)) { - return; - } - List obsoleteSessionIds = new ArrayList<>(); - for (Map.Entry entry : this.responses.entrySet()) { - if (entry.getValue().isActive()) { - continue; - } - obsoleteSessionIds.add(entry.getKey()); - } - if (CollectionUtils.isEmpty(obsoleteSessionIds)) { - return; - } - obsoleteSessionIds.forEach(this.responses::remove); - for (String obsoleteSessionId : obsoleteSessionIds) { - Emitter removed = this.emitters.remove(obsoleteSessionId); - removed.complete(); - } - log.info("Channels are inactive, remove emitters and responses. [sessionIds={}]", obsoleteSessionIds); - }).build()); - } - - /** - * Creates a Server-Sent Events (SSE) channel for real-time communication with the client. - * - *

This method generates a unique session ID and registers an emitter to send events.

- * - * @param response The HTTP server response object used to manage the SSE connection as a - * {@link HttpClassicServerResponse}. - * @return A {@link Choir}{@code <}{@link TextEvent}{@code >} object that emits text events to the connected client. - */ - @GetMapping(path = "/sse") - public Choir createSse(HttpClassicServerResponse response) { - String sessionId = UuidUtils.randomUuidString(); - this.responses.put(sessionId, response); - log.info("New SSE channel for MCP server created. [sessionId={}]", sessionId); - return Choir.create(emitter -> { - emitters.put(sessionId, emitter); - String data = MESSAGE_PATH + "?session_id=" + sessionId; - TextEvent textEvent = TextEvent.custom().id(sessionId).event(Event.ENDPOINT.code()).data(data).build(); - emitter.emit(textEvent); - log.info("Send MCP endpoint. [endpoint={}]", data); - }); - } - - /** - * Receives and processes an MCP message via HTTP POST request. - * - *

This method handles incoming JSON-RPC requests, routes them to the appropriate handler, - * and returns a response via the associated event emitter.

- * - * @param sessionId The session ID used to identify the current client session. - * @param request The JSON-RPC request entity containing the method name and parameters. - * @return Always returns an empty string ({@value #RESPONSE_OK}) to indicate success. - */ - @PostMapping(path = MESSAGE_PATH) - public Object receiveMcpMessage(@RequestQuery(name = "session_id") String sessionId, - @RequestBody Map request) { - log.info("Receive MCP message. [sessionId={}, message={}]", sessionId, request); - Object id = request.get("id"); - if (id == null) { - // Request without an ID indicates a notification message, ignore. - return RESPONSE_OK; - } - String method = cast(request.getOrDefault("method", StringUtils.EMPTY)); - MessageHandler handler = this.methodHandlers.getOrDefault(method, this.unsupportedMethodHandler); - JsonRpc.Response response; - try { - Object result = handler.handle(cast(request.get("params"))); - response = JsonRpc.createResponse(id, result); - } catch (Exception e) { - log.error("Failed to handle MCP message.", e); - response = JsonRpc.createResponseWithError(id, e.getMessage()); - } - String serialized = this.serializer.serialize(response); - TextEvent textEvent = TextEvent.custom().id(sessionId).event(Event.MESSAGE.code()).data(serialized).build(); - Emitter emitter = this.emitters.get(sessionId); - emitter.emit(textEvent); - log.info("Send MCP message. [message={}]", serialized); - return RESPONSE_OK; - } - - @Override - public void onToolsChanged() { - JsonRpc.Notification notification = JsonRpc.createNotification(Method.NOTIFICATION_TOOLS_CHANGED.code()); - String serialized = this.serializer.serialize(notification); - this.emitters.forEach((sessionId, emitter) -> { - TextEvent textEvent = TextEvent.custom().id(sessionId).event(Event.MESSAGE.code()).data(serialized).build(); - emitter.emit(textEvent); - log.info("Send MCP notification: tools changed. [sessionId={}]", sessionId); - }); - } -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageHandler.java deleted file mode 100644 index 458a17ef6..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server; - -import java.util.Map; - -/** - * A functional interface for handling messages in the MCP server. - * Implementations of this interface are responsible for processing incoming message requests - * and returning an appropriate response object. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public interface MessageHandler { - /** - * Handles the given message request. - * - * @param request A map containing the request parameters and data as a - * {@link Map}{@code <}{@link String}{@code , }{@link Object}{@code >}. - * @return The result of processing the request as an {@link Object}, which can be any type of object. - */ - Object handle(Map request); -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageRequest.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageRequest.java deleted file mode 100644 index b850f672c..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageRequest.java +++ /dev/null @@ -1,17 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server; - -/** - * A base class for all message request types in the MCP server. - * This class serves as a common ancestor for specific message request classes, - * providing a shared structure and type for message handling in the system. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public class MessageRequest {} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageResponse.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageResponse.java deleted file mode 100644 index c32634c0a..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/MessageResponse.java +++ /dev/null @@ -1,17 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server; - -/** - * A base class for all message response types in the MCP server. - * This class serves as a common ancestor for specific message response classes, - * providing a shared structure and type for returning results after message processing. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public class MessageResponse {} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/AbstractMessageHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/AbstractMessageHandler.java deleted file mode 100644 index 770489c11..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/AbstractMessageHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.handler; - -import static modelengine.fitframework.inspection.Validation.notNull; - -import modelengine.fel.tool.mcp.server.MessageHandler; -import modelengine.fel.tool.mcp.server.MessageRequest; -import modelengine.fitframework.util.ObjectUtils; - -import java.util.Map; - -/** - * The abstract parent class of {@link MessageHandler}. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public abstract class AbstractMessageHandler implements MessageHandler { - private final Class requestClass; - - AbstractMessageHandler(Class requestClass) { - this.requestClass = notNull(requestClass, "The request class cannot be null."); - } - - @Override - public Object handle(Map request) { - Req req = ObjectUtils.toCustomObject(request, this.requestClass); - return this.handle(req); - } - - /** - * Handles the request. - * - * @param request The request as a {@link Req}. - * @return The response as a {@link Object}. - */ - abstract Object handle(Req request); -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/InitializeHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/InitializeHandler.java deleted file mode 100644 index 8cd9ecd8f..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/InitializeHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.handler; - -import static modelengine.fitframework.inspection.Validation.notNull; - -import modelengine.fel.tool.mcp.server.McpServer; -import modelengine.fel.tool.mcp.server.MessageRequest; - -/** - * A handler for processing initialization requests in the MCP server. - * This class extends {@link AbstractMessageHandler} and is responsible for handling - * {@link InitializeRequest} messages by retrieving server information via the associated {@link McpServer}. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public class InitializeHandler extends AbstractMessageHandler { - private final McpServer mcpServer; - - /** - * Constructs a new instance of the InitializeHandler class. - * - * @param mcpServer The MCP server instance used to retrieve server information during request handling. - * @throws IllegalArgumentException If {@code mcpServer} is null. - */ - public InitializeHandler(McpServer mcpServer) { - super(InitializeRequest.class); - this.mcpServer = notNull(mcpServer, "The MCP server cannot be null."); - } - - @Override - protected Object handle(InitializeRequest request) { - return this.mcpServer.getSchema(); - } - - /** - * Represents an initialization request in the MCP server. - * This request is handled by {@link InitializeHandler} to retrieve server information. - * - * @author 季聿阶 - * @since 2025-05-15 - */ - public static class InitializeRequest extends MessageRequest {} -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/LoggingSetLevelHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/LoggingSetLevelHandler.java deleted file mode 100644 index 1cc2fb440..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/LoggingSetLevelHandler.java +++ /dev/null @@ -1,75 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.handler; - -import modelengine.fel.tool.mcp.entity.LoggingLevel; -import modelengine.fel.tool.mcp.server.MessageRequest; -import modelengine.fitframework.util.StringUtils; - -import java.util.Collections; -import java.util.Map; - -/** - * A handler for processing logging set level requests in the MCP server. - * This class extends {@link AbstractMessageHandler} and is responsible for handling - * {@link LoggingSetLevelRequest} messages. - * - * @author 黄可欣 - * @since 2025-09-10 - */ -public class LoggingSetLevelHandler extends AbstractMessageHandler { - private static final Map SET_LEVEL_RESULT = Collections.emptyMap(); - - /** - * Constructs a new instance of the LoggingSetLevelHandler class. - */ - public LoggingSetLevelHandler() { - super(LoggingSetLevelHandler.LoggingSetLevelRequest.class); - } - - @Override - public Object handle(LoggingSetLevelHandler.LoggingSetLevelRequest request) { - if (request == null) { - throw new IllegalStateException("No logging set level request."); - } - if (StringUtils.isBlank(request.getLevel())) { - throw new IllegalStateException("No logging level in request."); - } - String loggingLevelString = request.getLevel(); - LoggingLevel loggingLevel = LoggingLevel.fromCode(loggingLevelString); - // TODO change the logging level of corresponding session. - return SET_LEVEL_RESULT; - } - - /** - * Represents a request to set the logging level in the MCP server. - * This request is handled by {@link LoggingSetLevelHandler} to set the logging level in the MCP server. - * - * @since 2025-09-10 - */ - public static class LoggingSetLevelRequest extends MessageRequest { - private String level; - - /** - * Gets the level of server logging. - * - * @return The level of server logging as a {@link String}. - */ - public String getLevel() { - return this.level; - } - - /** - * Sets the level of server logging . - * - * @param level The level of server logging as a {@link String}. - */ - public void setLevel(String level) { - this.level = level; - } - } -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/PingHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/PingHandler.java deleted file mode 100644 index 5e0f1fd2e..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/PingHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.handler; - -import modelengine.fel.tool.mcp.server.MessageRequest; - -import java.util.Collections; -import java.util.Map; - -/** - * @author 季聿阶 - * @since 2025-05-15 - */ -public class PingHandler extends AbstractMessageHandler { - private static final Map PING_RESULT = Collections.emptyMap(); - - /** - * Constructs a new instance of the PingHandler class. - */ - public PingHandler() { - super(PingRequest.class); - } - - @Override - public Object handle(PingRequest request) { - return PING_RESULT; - } - - /** - * @author 季聿阶 - * @since 2025-05-15 - */ - public static class PingRequest extends MessageRequest {} -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/ToolCallHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/ToolCallHandler.java deleted file mode 100644 index 291f6e70b..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/ToolCallHandler.java +++ /dev/null @@ -1,232 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.handler; - -import static modelengine.fitframework.inspection.Validation.notNull; -import static modelengine.fitframework.util.ObjectUtils.cast; - -import modelengine.fel.tool.mcp.server.McpServer; -import modelengine.fel.tool.mcp.server.MessageRequest; -import modelengine.fel.tool.mcp.server.MessageResponse; -import modelengine.fitframework.annotation.Property; -import modelengine.fitframework.serialization.ObjectSerializer; -import modelengine.fitframework.util.StringUtils; - -import java.util.List; -import java.util.Map; - -/** - * A handler for processing tool call requests in the MCP server. - * This class extends {@link AbstractMessageHandler} and is responsible for handling - * {@link ToolCallRequest} messages by invoking the specified tool via the associated {@link McpServer}. - * It serializes the result using the provided {@link ObjectSerializer} and returns a structured - * response through the {@link ToolCallResponse} class. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public class ToolCallHandler extends AbstractMessageHandler { - private final McpServer mcpServer; - private final ObjectSerializer jsonSerializer; - - /** - * Constructs a new instance of the ToolCallHandler class. - * - * @param mcpServer The MCP server instance used to invoke tools during request handling. - * @param jsonSerializer The serializer used to convert non-string results into JSON strings. - * @throws IllegalArgumentException If {@code mcpServer} or {@code jsonSerializer} is null. - */ - public ToolCallHandler(McpServer mcpServer, ObjectSerializer jsonSerializer) { - super(ToolCallRequest.class); - this.mcpServer = notNull(mcpServer, "The MCP server cannot be null."); - this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null."); - } - - @Override - protected Object handle(ToolCallRequest request) { - if (request == null) { - throw new IllegalStateException("No tool call request."); - } - if (StringUtils.isBlank(request.getName())) { - throw new IllegalStateException("No tool name to call."); - } - ToolCallResponse response = new ToolCallResponse(); - ToolCallResponse.Content content = new ToolCallResponse.Content(); - response.setContents(List.of(content)); - content.setType("text"); - try { - Object result = this.mcpServer.callTool(request.getName(), request.getArguments()); - if (result instanceof String) { - content.setText(cast(result)); - } else { - content.setText(this.jsonSerializer.serialize(result)); - } - response.setError(false); - } catch (Exception e) { - content.setText(e.getMessage()); - response.setError(true); - } - return response; - } - - /** - * Represents a tool call request in the MCP server. - * This request contains the name of the tool to be invoked and a map of arguments - * to be passed to the tool. It is handled by {@link ToolCallHandler} to execute the tool - * and return the result. - * - * @author 季聿阶 - * @since 2025-05-15 - */ - public static class ToolCallRequest extends MessageRequest { - private String name; - private Map arguments; - - /** - * Gets the name of the tool to be called. - * - * @return The name of the tool as a {@link String}. - */ - public String getName() { - return this.name; - } - - /** - * Sets the name of the tool to be called. - * - * @param name The name of the tool as a {@link String}. - */ - public void setName(String name) { - this.name = name; - } - - /** - * Gets the arguments to be passed to the tool. - * - * @return A map containing the arguments as a {@link Map}{@code <}{@link String}{@code , - * }{@link Object}{@code >}. - */ - public Map getArguments() { - return this.arguments; - } - - /** - * Sets the arguments to be passed to the tool. - * - * @param arguments A map containing the arguments as a - * {@link Map}{@code <}{@link String}{@code , }{@link Object}{@code >}. - */ - public void setArguments(Map arguments) { - this.arguments = arguments; - } - } - - /** - * Represents the structured response returned after executing a tool call. - * This class includes a list of content items and an error flag indicating - * whether the execution was successful. - * - *

Each content item has a type and text value, which can be used to represent - * the result or error message from the tool execution.

- * - * @author 季聿阶 - * @since 2025-05-15 - */ - public static class ToolCallResponse extends MessageResponse { - @Property(name = "content") - private List contents; - private boolean isError; - - /** - * Gets the list of content items included in the response. - * - * @return A list of content items as a {@link List}{@code <}{@link Content}{@code >}. - */ - public List getContents() { - return this.contents; - } - - /** - * Sets the list of content items included in the response. - * - * @param contents A list of content items as a {@link List}{@code <}{@link Content}{@code >}. - */ - public void setContents(List contents) { - this.contents = contents; - } - - /** - * Checks whether the tool execution resulted in an error. - * - * @return true if an error occurred; false otherwise. - */ - public boolean isError() { - return this.isError; - } - - /** - * Sets the error flag indicating whether the tool execution resulted in an error. - * - * @param error true if an error occurred; false otherwise. - */ - public void setError(boolean error) { - this.isError = error; - } - - /** - * Represents a single content item within the tool call response. - * Each content item has a type (e.g., "text", "json") and a text value, - * typically used to describe the result or error message from the tool execution. - * - *

This class supports multiple content formats, allowing flexible representation - * of the tool's output.

- * - * @author 季聿阶 - * @since 2025-05-15 - */ - public static class Content { - private String type; - private String text; - - /** - * Gets the type of the content item. - * - * @return The type of the content as a {@link String}. - */ - public String getType() { - return this.type; - } - - /** - * Sets the type of the content item. - * - * @param type The type of the content as a {@link String}. - */ - public void setType(String type) { - this.type = type; - } - - /** - * Gets the text value of the content item. - * - * @return The text value as a {@link String}. - */ - public String getText() { - return this.text; - } - - /** - * Sets the text value of the content item. - * - * @param text The text value as a {@link String}. - */ - public void setText(String text) { - this.text = text; - } - } - } -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/ToolListHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/ToolListHandler.java deleted file mode 100644 index be8ac2760..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/ToolListHandler.java +++ /dev/null @@ -1,52 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.handler; - -import static modelengine.fitframework.inspection.Validation.notNull; - -import modelengine.fel.tool.mcp.server.McpServer; -import modelengine.fel.tool.mcp.server.MessageRequest; -import modelengine.fitframework.util.MapBuilder; - -/** - * A handler for processing tool list requests in the MCP server. - * This class extends {@link AbstractMessageHandler} and is responsible for handling - * {@link ToolListRequest} messages by retrieving the list of tools from the associated {@link McpServer} - * and returning them in a structured map format. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public class ToolListHandler extends AbstractMessageHandler { - private final McpServer mcpServer; - - /** - * Constructs a new instance of the ToolListHandler class. - * - * @param mcpServer The MCP server instance used to retrieve the list of tools during request handling. - * @throws IllegalArgumentException If {@code mcpServer} is null. - */ - public ToolListHandler(McpServer mcpServer) { - super(ToolListRequest.class); - this.mcpServer = notNull(mcpServer, "The MCP server cannot be null."); - } - - @Override - public Object handle(ToolListRequest request) { - return MapBuilder.get().put("tools", this.mcpServer.getTools()).build(); - } - - /** - * Represents a tool list request in the MCP server. - * This request is handled by {@link ToolListHandler} to retrieve the list of available tools - * from the server and return them in a structured format. - * - * @author 季聿阶 - * @since 2025-05-15 - */ - public static class ToolListRequest extends MessageRequest {} -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/UnsupportedMethodHandler.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/UnsupportedMethodHandler.java deleted file mode 100644 index a83dd2c04..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/handler/UnsupportedMethodHandler.java +++ /dev/null @@ -1,45 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.handler; - -import modelengine.fel.tool.mcp.server.MessageRequest; -import modelengine.fel.tool.mcp.server.MessageResponse; - -/** - * Represents a request for an unsupported method in the MCP server. - * This request is handled by {@link UnsupportedMethodHandler} to indicate that the - * corresponding operation is not implemented or supported. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -public class UnsupportedMethodHandler - extends AbstractMessageHandler { - /** - * Constructs a new instance of the UnsupportedMethodHandler class. - * - *

This handler is used to handle requests for methods that are not supported or implemented.

- */ - public UnsupportedMethodHandler() { - super(UnsupportedMethodRequest.class); - } - - @Override - public MessageResponse handle(UnsupportedMethodRequest request) { - throw new UnsupportedOperationException("Not supported request method."); - } - - /** - * Represents a request for an operation that is not supported by the current handler. - * This class is used in conjunction with {@link UnsupportedMethodHandler} to signal - * that the requested method has no implementation. - * - * @author 季聿阶 - * @since 2025-05-15 - */ - public static class UnsupportedMethodRequest extends MessageRequest {} -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpServer.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpServer.java deleted file mode 100644 index 4287ecda7..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpServer.java +++ /dev/null @@ -1,112 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server.support; - -import static modelengine.fitframework.inspection.Validation.notNull; - -import modelengine.fel.tool.mcp.entity.ServerSchema; -import modelengine.fel.tool.mcp.entity.Tool; -import modelengine.fel.tool.mcp.server.McpServer; -import modelengine.fel.tool.service.ToolChangedObserver; -import modelengine.fel.tool.service.ToolExecuteService; -import modelengine.fitframework.annotation.Component; -import modelengine.fitframework.log.Logger; -import modelengine.fitframework.util.MapUtils; -import modelengine.fitframework.util.StringUtils; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * The default implementation of {@link McpServer}. - * - * @author 季聿阶 - * @since 2025-05-15 - */ -@Component -public class DefaultMcpServer implements McpServer, ToolChangedObserver { - private static final Logger log = Logger.get(DefaultMcpServer.class); - - private final ToolExecuteService toolExecuteService; - private final Map tools = new ConcurrentHashMap<>(); - private final List toolsChangedObservers = new ArrayList<>(); - - /** - * Constructs a new instance of the DefaultMcpServer class. - * - * @param toolExecuteService The service used to execute tools when handling tool call requests. - * @throws IllegalArgumentException If {@code toolExecuteService} is null. - */ - public DefaultMcpServer(ToolExecuteService toolExecuteService) { - this.toolExecuteService = notNull(toolExecuteService, "The tool execute service cannot be null."); - } - - @Override - public ServerSchema getSchema() { - ServerSchema.Info info = new ServerSchema.Info("FIT Store MCP Server", "3.6.0-SNAPSHOT"); - ServerSchema.Capabilities.Logging logging = new ServerSchema.Capabilities.Logging(); - ServerSchema.Capabilities.Tools tools = new ServerSchema.Capabilities.Tools(true); - ServerSchema.Capabilities capabilities = new ServerSchema.Capabilities(logging, tools); - return new ServerSchema("2024-11-05", capabilities, info); - } - - @Override - public List getTools() { - return List.copyOf(this.tools.values()); - } - - @Override - public Object callTool(String name, Map arguments) { - log.info("Calling tool. [toolName={}, arguments={}]", name, arguments); - String result = this.toolExecuteService.execute(name, arguments); - log.info("Tool called. [result={}]", result); - return result; - } - - @Override - public void registerToolsChangedObserver(ToolsChangedObserver observer) { - if (observer != null) { - this.toolsChangedObservers.add(observer); - } - } - - @Override - public void onToolAdded(String name, String description, Map parameters) { - if (StringUtils.isBlank(name)) { - log.warn("Tool addition is ignored: tool name is blank."); - return; - } - if (StringUtils.isBlank(description)) { - log.warn("Tool addition is ignored: tool description is blank. [toolName={}]", name); - return; - } - if (MapUtils.isEmpty(parameters)) { - log.warn("Tool addition is ignored: tool schema is null or empty. [toolName={}]", name); - return; - } - Tool tool = new Tool(); - tool.setName(name); - tool.setDescription(description); - tool.setInputSchema(parameters); - this.tools.put(name, tool); - log.info("Tool added to MCP server. [toolName={}, description={}, schema={}]", name, description, parameters); - this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged); - } - - @Override - public void onToolRemoved(String name) { - if (StringUtils.isBlank(name)) { - log.warn("Tool removal is ignored: tool name is blank."); - return; - } - this.tools.remove(name); - log.info("Tool removed from MCP server. [toolName={}]", name); - this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged); - } -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServer.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServer.java new file mode 100644 index 000000000..f3de70277 --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServer.java @@ -0,0 +1,217 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.server.support; + +import static modelengine.fel.tool.info.schema.PluginSchema.TYPE; +import static modelengine.fel.tool.info.schema.ToolsSchema.PROPERTIES; +import static modelengine.fel.tool.info.schema.ToolsSchema.REQUIRED; +import static modelengine.fitframework.inspection.Validation.notNull; + +import io.modelcontextprotocol.server.McpServerFeatures; +import io.modelcontextprotocol.server.McpSyncServer; +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fel.tool.mcp.entity.Tool; +import modelengine.fel.tool.mcp.server.McpServer; +import modelengine.fel.tool.service.ToolChangedObserver; +import modelengine.fel.tool.service.ToolExecuteService; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.log.Logger; +import modelengine.fitframework.util.MapUtils; +import modelengine.fitframework.util.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Mcp Server implementing interface {@link McpServer}, {@link ToolChangedObserver} + * with MCP Server Bean {@link McpSyncServer}. + * + * @author 季聿阶 + * @since 2025-05-15 + */ +@Component +public class DefaultMcpStreamableServer implements McpServer, ToolChangedObserver { + private static final Logger log = Logger.get(DefaultMcpStreamableServer.class); + private final McpSyncServer mcpSyncServer; + + private final ToolExecuteService toolExecuteService; + private final List toolsChangedObservers = new ArrayList<>(); + + /** + * Constructs a new instance of the DefaultMcpServer class. + * + * @param toolExecuteService The service used to execute tools when handling tool call requests. + * @throws IllegalArgumentException If {@code toolExecuteService} is null. + */ + public DefaultMcpStreamableServer(ToolExecuteService toolExecuteService, McpSyncServer mcpSyncServer) { + this.toolExecuteService = notNull(toolExecuteService, "The tool execute service cannot be null."); + this.mcpSyncServer = mcpSyncServer; + } + + @Override + public List getTools() { + return this.mcpSyncServer.listTools().stream().map(this::convertToFelTool).collect(Collectors.toList()); + } + + @Override + public void registerToolsChangedObserver(ToolsChangedObserver observer) { + if (observer != null) { + this.toolsChangedObservers.add(observer); + } + } + + @Override + public void onToolAdded(String name, String description, Map parameters) { + if (StringUtils.isBlank(name)) { + log.warn("Tool addition is ignored: tool name is blank."); + return; + } + if (StringUtils.isBlank(description)) { + log.warn("Tool addition is ignored: tool description is blank. [toolName={}]", name); + return; + } + if (MapUtils.isEmpty(parameters)) { + log.warn("Tool addition is ignored: tool schema is null or empty. [toolName={}]", name); + return; + } + if (!isValidParameterSchema(parameters)) { + log.warn("Invalid parameter schema. [toolName={}]", name); + return; + } + + McpServerFeatures.SyncToolSpecification toolSpecification = + createToolSpecification(name, description, parameters); + + this.mcpSyncServer.addTool(toolSpecification); + log.info("Tool added to MCP server. [toolName={}, description={}, schema={}]", name, description, parameters); + this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged); + } + + @Override + public void onToolRemoved(String name) { + if (StringUtils.isBlank(name)) { + log.warn("Tool removal is ignored: tool name is blank."); + return; + } + this.mcpSyncServer.removeTool(name); + log.info("Tool removed from MCP server. [toolName={}]", name); + this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged); + } + + /** + * Creates a tool specification for the MCP server. + *

+ * This method constructs a {@link McpServerFeatures.SyncToolSpecification} that includes: + *

    + *
  • Tool metadata (name, description, input schema)
  • + *
  • Call handler that executes the tool and handles exceptions
  • + *
+ * + * @param name The name of the tool. + * @param description The description of the tool. + * @param parameters The parameter schema containing type, properties, and required fields. + * @return A fully configured {@link McpServerFeatures.SyncToolSpecification}. + */ + private McpServerFeatures.SyncToolSpecification createToolSpecification(String name, String description, + Map parameters) { + @SuppressWarnings("unchecked") McpSchema.JsonSchema inputSchema = + new McpSchema.JsonSchema((String) parameters.get(TYPE), + (Map) parameters.get(PROPERTIES), + (List) parameters.get(REQUIRED), + null, + null, + null); + + return McpServerFeatures.SyncToolSpecification.builder() + .tool(McpSchema.Tool.builder().name(name).description(description).inputSchema(inputSchema).build()) + .callHandler((exchange, request) -> executeToolWithErrorHandling(name, request)) + .build(); + } + + /** + * Executes a tool and handles any exceptions that may occur. + *

+ * This method handles two types of exceptions: + *

    + *
  • {@link IllegalArgumentException}: Invalid tool arguments (logged as warning)
  • + *
  • {@link Exception}: Any other execution failure (logged as error)
  • + *
+ * + * @param toolName The name of the tool to execute. + * @param request The tool call request containing arguments. + * @return A {@link McpSchema.CallToolResult} with the execution result or error message. + */ + private McpSchema.CallToolResult executeToolWithErrorHandling(String toolName, McpSchema.CallToolRequest request) { + try { + Map args = request.arguments(); + String result = this.toolExecuteService.execute(toolName, args); + return new McpSchema.CallToolResult(result, false); + } catch (IllegalArgumentException e) { + log.warn("Invalid arguments for tool execution. [toolName={}, error={}]", toolName, e.getMessage()); + return new McpSchema.CallToolResult("Error: Invalid arguments - " + e.getMessage(), true); + } catch (Exception e) { + log.error("Failed to execute tool. [toolName={}]", toolName, e); + return new McpSchema.CallToolResult("Error: Tool execution failed - " + e.getMessage(), true); + } + } + + /** + * Converts an MCP SDK Tool to a FEL Tool entity. + * + * @param mcpTool The MCP SDK tool to convert. + * @return A FEL Tool entity with the corresponding name, description, and input schema. + */ + private Tool convertToFelTool(McpSchema.Tool mcpTool) { + Tool tool = new Tool(); + tool.setName(mcpTool.name()); + tool.setDescription(mcpTool.description()); + + // Convert JsonSchema to Map + McpSchema.JsonSchema inputSchema = mcpTool.inputSchema(); + Map schemaMap = new HashMap<>(); + schemaMap.put(TYPE, inputSchema.type()); + if (inputSchema.properties() != null) { + schemaMap.put(PROPERTIES, inputSchema.properties()); + } + if (inputSchema.required() != null) { + schemaMap.put(REQUIRED, inputSchema.required()); + } + tool.setInputSchema(schemaMap); + + return tool; + } + + /** + * Validates the structure of the parameter schema to ensure it conforms to the expected format. + * + * @param parameters The parameter schema to validate, represented as a Map with String keys and Object values. + * @return {@code true} if the parameter schema is valid; {@code false} otherwise. + */ + private boolean isValidParameterSchema(Map parameters) { + Object type = parameters.get(TYPE); + if (!(type instanceof String)) { + return false; + } + + Object props = parameters.get(PROPERTIES); + if (!(props instanceof Map propsMap)) { + return false; + } + if (propsMap.keySet().stream().anyMatch(k -> !(k instanceof String))) { + return false; + } + + Object reqs = parameters.get(REQUIRED); + if (!(reqs instanceof List reqsList)) { + return false; + } + return reqsList.stream().allMatch(v -> v instanceof String); + } +} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java new file mode 100644 index 000000000..324c427d1 --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java @@ -0,0 +1,845 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.server.transport; + +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.server.McpTransportContextExtractor; +import io.modelcontextprotocol.spec.HttpHeaders; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpStreamableServerSession; +import io.modelcontextprotocol.spec.McpStreamableServerTransport; +import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider; +import io.modelcontextprotocol.spec.ProtocolVersions; +import io.modelcontextprotocol.util.KeepAliveScheduler; +import modelengine.fel.tool.mcp.entity.Event; +import modelengine.fit.http.annotation.DeleteMapping; +import modelengine.fit.http.annotation.GetMapping; +import modelengine.fit.http.annotation.PostMapping; +import modelengine.fit.http.entity.Entity; +import modelengine.fit.http.entity.TextEvent; +import modelengine.fit.http.protocol.HttpResponseStatus; +import modelengine.fit.http.protocol.MessageHeaderNames; +import modelengine.fit.http.protocol.MimeType; +import modelengine.fit.http.server.HttpClassicServerRequest; +import modelengine.fit.http.server.HttpClassicServerResponse; +import modelengine.fitframework.flowable.Choir; +import modelengine.fitframework.flowable.Emitter; +import modelengine.fitframework.inspection.Validation; +import modelengine.fitframework.log.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The default implementation of {@link McpStreamableServerTransportProvider}. + * The FIT transport provider for MCP Server, according to {@code HttpServletStreamableServerTransportProvider} in MCP + * SDK. + * + * @author 黄可欣 + * @since 2025-09-30 + */ +public class FitMcpStreamableServerTransportProvider implements McpStreamableServerTransportProvider { + private static final Logger logger = Logger.get(FitMcpStreamableServerTransportProvider.class); + + private static final String MESSAGE_ENDPOINT = "/mcp/streamable"; + + /** + * Flag indicating whether DELETE requests are disallowed on the endpoint. + */ + private final boolean disallowDelete; + private final McpJsonMapper jsonMapper; + private final McpTransportContextExtractor contextExtractor; + private KeepAliveScheduler keepAliveScheduler; + + private McpStreamableServerSession.Factory sessionFactory; + + /** + * Map of active client sessions, keyed by mcp-session-id. + */ + private final Map sessions = new ConcurrentHashMap<>(); + + /** + * Flag indicating if the transport is shutting down. + */ + private volatile boolean isClosing = false; + + /** + * Constructs a new FitMcpStreamableServerTransportProvider instance, + * for {@link FitMcpStreamableServerTransportProvider.Builder}. + * + * @param jsonMapper The jsonMapper to use for JSON serialization/deserialization + * of messages. + * @param disallowDelete Whether to disallow DELETE requests on the endpoint. + * @param contextExtractor The context extractor to fill in a {@link McpTransportContext}. + * @param keepAliveInterval The interval for sending keep-alive messages to clients. + * @throws IllegalArgumentException if any parameter is null + */ + private FitMcpStreamableServerTransportProvider(McpJsonMapper jsonMapper, boolean disallowDelete, + McpTransportContextExtractor contextExtractor, Duration keepAliveInterval) { + Validation.notNull(jsonMapper, "jsonMapper must not be null"); + Validation.notNull(contextExtractor, "McpTransportContextExtractor must not be null"); + + this.jsonMapper = jsonMapper; + this.disallowDelete = disallowDelete; + this.contextExtractor = contextExtractor; + + if (keepAliveInterval != null) { + this.keepAliveScheduler = KeepAliveScheduler.builder(() -> (isClosing) + ? Flux.empty() + : Flux.fromIterable(this.sessions.values())) + .initialDelay(keepAliveInterval) + .interval(keepAliveInterval) + .build(); + + this.keepAliveScheduler.start(); + } + } + + @Override + public List protocolVersions() { + return List.of(ProtocolVersions.MCP_2024_11_05, + ProtocolVersions.MCP_2025_03_26, + ProtocolVersions.MCP_2025_06_18); + } + + @Override + public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) { + this.sessionFactory = sessionFactory; + } + + /** + * Broadcasts a notification to all connected clients through their SSE connections. + * If any errors occur during sending to a particular client, they are logged but + * don't prevent sending to other clients. + * + * @param method The method name for the notification + * @param params The parameters for the notification + * @return A Mono that completes when the broadcast attempt is finished + */ + @Override + public Mono notifyClients(String method, Object params) { + if (this.sessions.isEmpty()) { + logger.debug("No active sessions to broadcast message to"); + return Mono.empty(); + } + + logger.info("Attempting to broadcast message to {} active sessions", this.sessions.size()); + + return Mono.fromRunnable(() -> { + this.sessions.values().parallelStream().forEach(session -> { + try { + session.sendNotification(method, params).block(); + } catch (Exception e) { + logger.error("Failed to send message to session {}: {}", session.getId(), e.getMessage(), e); + } + }); + }); + } + + /** + * Initiates a graceful shutdown of the transport. + * + * @return A Mono that completes when all cleanup operations are finished + */ + @Override + public Mono closeGracefully() { + return Mono.fromRunnable(() -> { + this.isClosing = true; + logger.info("Initiating graceful shutdown with {} active sessions", this.sessions.size()); + + this.sessions.values().parallelStream().forEach(session -> { + try { + session.closeGracefully().block(); + } catch (Exception e) { + logger.error("Failed to close session {}: {}", session.getId(), e.getMessage(), e); + } + }); + + this.sessions.clear(); + logger.info("Graceful shutdown completed"); + }).then().doOnSuccess(v -> { + if (this.keepAliveScheduler != null) { + this.keepAliveScheduler.shutdown(); + } + }); + } + + /** + * Set up the listening SSE connections and message replay. + * + * @param request The incoming server request + * @param response The HTTP response + * @return Return the HTTP response body {@link Entity} or a {@link Choir}{@code <}{@link TextEvent}{@code >} object + */ + @GetMapping(path = MESSAGE_ENDPOINT) + public Object handleGet(HttpClassicServerRequest request, HttpClassicServerResponse response) { + if (this.isClosing) { + response.statusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.statusCode()); + return Entity.createText(response, "Server is shutting down"); + } + + Object headerError = validateGetAcceptHeaders(request, response); + if (headerError != null) { + return headerError; + } + + // Get session ID and session + Object sessionError = validateRequestSessionId(request, response); + if (sessionError != null) { + return sessionError; + } + String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); + McpStreamableServerSession session = this.sessions.get(sessionId); + logger.info("[GET] Handling GET request for session: {}", sessionId); + + McpTransportContext transportContext = this.contextExtractor.extract(request); + try { + return Choir.create(emitter -> { + FitStreamableMcpSessionTransport sessionTransport = + new FitStreamableMcpSessionTransport(sessionId, emitter, response); + + // Handle building SSE, and check if this is a replay request + if (request.headers().contains(HttpHeaders.LAST_EVENT_ID)) { + handleReplaySseRequest(request, transportContext, sessionId, session, sessionTransport, emitter); + } else { + handleEstablishSseRequest(sessionId, session, sessionTransport, emitter); + } + }); + } catch (Exception e) { + logger.error("Failed to handle GET request for session {}: {}", sessionId, e.getMessage(), e); + response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); + return null; + } + } + + /** + * Handles POST requests for incoming JSON-RPC messages from clients. + * + * @param request The incoming server request containing the JSON-RPC message + * @param response The HTTP response + * @return Return the HTTP response body {@link Entity} or a {@link Choir}{@code <}{@link TextEvent}{@code >} object + */ + @PostMapping(path = MESSAGE_ENDPOINT) + public Object handlePost(HttpClassicServerRequest request, HttpClassicServerResponse response) { + if (this.isClosing) { + response.statusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.statusCode()); + return Entity.createText(response, "Server is shutting down"); + } + Object headerError = validatePostAcceptHeaders(request, response); + if (headerError != null) { + return headerError; + } + + McpTransportContext transportContext = this.contextExtractor.extract(request); + try { + String requestBody = new String(request.entityBytes(), StandardCharsets.UTF_8); + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, requestBody); + + // Handle JSONRPCMessage + if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest && jsonrpcRequest.method() + .equals(McpSchema.METHOD_INITIALIZE)) { + logger.info("[POST] Handling initialize method, with receiving message: {}", requestBody); + return handleInitializeRequest(request, response, jsonrpcRequest); + } else { + return handleJsonRpcMessage(message, request, requestBody, transportContext, response); + } + } catch (IllegalArgumentException | IOException e) { + logger.error("Failed to deserialize message: {}", e.getMessage(), e); + response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format").build()); + } catch (Exception e) { + logger.error("Error handling message: {}", e.getMessage(), e); + response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); + } + } + + /** + * Handles DELETE requests for session deletion. + * + * @param request The incoming server request + * @param response The HTTP response + * @return Return HTTP response body {@link Entity}. + */ + @DeleteMapping(path = MESSAGE_ENDPOINT) + public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerResponse response) { + if (this.isClosing) { + response.statusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.statusCode()); + return Entity.createText(response, "Server is shutting down"); + } + if (this.disallowDelete) { + response.statusCode(HttpResponseStatus.METHOD_NOT_ALLOWED.statusCode()); + return null; + } + + // Get session ID and session + Object sessionError = validateRequestSessionId(request, response); + if (sessionError != null) { + return sessionError; + } + String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); + McpStreamableServerSession session = this.sessions.get(sessionId); + logger.info("[DELETE] Receiving delete request from session: {}", sessionId); + + McpTransportContext transportContext = this.contextExtractor.extract(request); + try { + session.delete().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)).block(); + this.sessions.remove(sessionId); + response.statusCode(HttpResponseStatus.OK.statusCode()); + return null; + } catch (Exception e) { + logger.error("Failed to delete session {}: {}", sessionId, e.getMessage(), e); + response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); + } + } + + /** + * Validates the Accept header for SSE (Server-Sent Events) connections in GET requests. + * Checks if the request contains the required {@code text/event-stream} content type. + * + * @param request The incoming {@link HttpClassicServerRequest} + * @param response The {@link HttpClassicServerResponse} to set status code if validation fails + * @return An error {@link Entity} if validation fails, {@code null} if validation succeeds + */ + private Object validateGetAcceptHeaders(HttpClassicServerRequest request, HttpClassicServerResponse response) { + String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse(""); + if (!acceptHeaders.contains(MimeType.TEXT_EVENT_STREAM.value())) { + response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); + return Entity.createText(response, "Invalid Accept header. Expected TEXT_EVENT_STREAM"); + } + return null; + } + + /** + * Validates the Accept headers for POST requests. + * Checks if the request contains both {@code text/event-stream} and {@code application/json} content types, + * as POST requests may return either SSE streams or JSON responses. + * + * @param request The incoming {@link HttpClassicServerRequest} + * @param response The {@link HttpClassicServerResponse} to set status code if validation fails + * @return An error {@link Entity} with {@link McpError} if validation fails, {@code null} if validation succeeds + */ + private Object validatePostAcceptHeaders(HttpClassicServerRequest request, HttpClassicServerResponse response) { + String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse(""); + if (!acceptHeaders.contains(MimeType.TEXT_EVENT_STREAM.value()) + || !acceptHeaders.contains(MimeType.APPLICATION_JSON.value())) { + response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST) + .message("Invalid Accept headers. Expected TEXT_EVENT_STREAM and APPLICATION_JSON") + .build()); + } + return null; + } + + /** + * Validates the MCP session ID in the request headers and verifies the session exists. + * This method checks both the presence of the {@code mcp-session-id} header and + * the existence of the corresponding session in the active sessions map. + * + * @param request The incoming {@link HttpClassicServerRequest} containing the session ID header + * @param response The {@link HttpClassicServerResponse} to set status code if validation fails + * @return An error {@link Entity} if validation fails (either missing session ID or session not found), + * {@code null} if validation succeeds + */ + private Object validateRequestSessionId(HttpClassicServerRequest request, HttpClassicServerResponse response) { + if (!request.headers().contains(HttpHeaders.MCP_SESSION_ID)) { + response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); + return Entity.createText(response, "Session ID required in mcp-session-id header"); + } + String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); + if (this.sessions.get(sessionId) == null) { + response.statusCode(HttpResponseStatus.NOT_FOUND.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS) + .message("Session not found: " + sessionId) + .build()); + } + return null; + } + + /** + * Handles message replay requests for SSE connections. + * Replays previously sent messages starting from the last received event ID, + * allowing clients to recover missed messages after reconnection. + * + * @param request The incoming {@link HttpClassicServerRequest} containing the {@code Last-Event-ID} header + * @param transportContext The {@link McpTransportContext} for request context propagation + * @param sessionId The MCP session identifier + * @param session The {@link McpStreamableServerSession} to replay messages from + * @param sessionTransport The {@link FitStreamableMcpSessionTransport} for sending replayed messages + * @param emitter The SSE {@link Emitter} to send {@link TextEvent} to the client + */ + private void handleReplaySseRequest(HttpClassicServerRequest request, McpTransportContext transportContext, + String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, + Emitter emitter) { + String lastId = request.headers().first(HttpHeaders.LAST_EVENT_ID).orElse("0"); + logger.info("[GET] Receiving replay request from session: {}", sessionId); + + try { + session.replay(lastId) + .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) + .toIterable() + .forEach(message -> { + try { + sessionTransport.sendMessage(message) + .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) + .block(); + } catch (Exception e) { + logger.error("Failed to replay message: {}", e.getMessage(), e); + emitter.fail(e); + } + }); + } catch (Exception e) { + logger.error("Failed to replay messages: {}", e.getMessage(), e); + emitter.fail(e); + } + } + + /** + * Establishes a new SSE listening stream for real-time message delivery. + * Creates a persistent connection that allows the server to push messages to the client + * as they become available. The stream remains open until explicitly closed or an error occurs. + * + * @param sessionId The MCP session identifier + * @param session The {@link McpStreamableServerSession} to establish the listening stream for + * @param sessionTransport The {@link FitStreamableMcpSessionTransport} for bidirectional communication + * @param emitter The SSE {@link Emitter} to send {@link TextEvent} to the client + */ + private void handleEstablishSseRequest(String sessionId, McpStreamableServerSession session, + FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { + logger.info("[GET] Receiving Get request to establish new SSE for session: {}", sessionId); + McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = + session.listeningStream(sessionTransport); + + emitter.observe(new Emitter.Observer() { + @Override + public void onEmittedData(TextEvent data) { + // No action needed + } + + @Override + public void onCompleted() { + logger.info("[SSE] Completed SSE emitting for session: {}", sessionId); + try { + listeningStream.close(); + } catch (Exception e) { + logger.warn("[SSE] Error closing listeningStream on complete: {}", e.getMessage()); + } + } + + @Override + public void onFailed(Exception cause) { + logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, cause.getMessage()); + try { + listeningStream.close(); + } catch (Exception e) { + logger.warn("[SSE] Error closing listeningStream on failure: {}", e.getMessage()); + } + } + }); + } + + /** + * Handles MCP session initialization requests. + * Creates a new {@link McpStreamableServerSession} and returns the initialization result + * with the assigned session ID in the response headers. + * + * @param request The incoming {@link HttpClassicServerRequest} + * @param response The {@link HttpClassicServerResponse} to set session ID and initialization result + * @param jsonrpcRequest The {@link McpSchema.JSONRPCRequest} containing {@link McpSchema.InitializeRequest} + * parameters + * @return An {@link Entity} containing the {@link McpSchema.JSONRPCResponse} with + * {@link McpSchema.InitializeResult} + * on success, or an error {@link Entity} with {@link McpError} on failure + */ + private Object handleInitializeRequest(HttpClassicServerRequest request, HttpClassicServerResponse response, + McpSchema.JSONRPCRequest jsonrpcRequest) { + McpSchema.InitializeRequest initializeRequest = + jsonMapper.convertValue(jsonrpcRequest.params(), new TypeRef() {}); + McpStreamableServerSession.McpStreamableServerSessionInit init = + this.sessionFactory.startSession(initializeRequest); + this.sessions.put(init.session().getId(), init.session()); + + try { + McpSchema.InitializeResult initResult = init.initResult().block(); + response.statusCode(HttpResponseStatus.OK.statusCode()); + response.headers().set("Content-Type", MimeType.APPLICATION_JSON.value()); + response.headers().set(HttpHeaders.MCP_SESSION_ID, init.session().getId()); + logger.info("[POST] Sending initialize message via HTTP response to session {}", init.session().getId()); + return Entity.createObject(response, + new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initResult, null)); + } catch (Exception e) { + logger.error("Failed to initialize session: {}", e.getMessage(), e); + response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); + } + } + + /** + * Handles different types of JSON-RPC messages (Response, Notification, Request). + * Routes the message to the appropriate handler method based on its type. + * + * @param message The {@link McpSchema.JSONRPCMessage} to handle + * @param request The incoming {@link HttpClassicServerRequest} + * @param requestBody The {@link String} of request body. + * @param transportContext The {@link McpTransportContext} for request context propagation + * @param response The {@link HttpClassicServerResponse} to set status code and return data + * @return An {@link Entity} or {@link Choir} containing the response data, or {@code null} for accepted messages + */ + private Object handleJsonRpcMessage(McpSchema.JSONRPCMessage message, HttpClassicServerRequest request, + String requestBody, McpTransportContext transportContext, HttpClassicServerResponse response) { + // Get session ID and session + Object sessionError = validateRequestSessionId(request, response); + if (sessionError != null) { + return sessionError; + } + String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); + McpStreamableServerSession session = this.sessions.get(sessionId); + logger.info("[POST] Receiving message from session {}: {}", sessionId, requestBody); + + if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) { + handleJsonRpcResponse(jsonrpcResponse, session, transportContext, response); + return null; + } else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) { + handleJsonRpcNotification(jsonrpcNotification, session, transportContext, response); + return null; + } else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) { + return handleJsonRpcRequest(jsonrpcRequest, session, sessionId, transportContext, response); + } else { + response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); + return Entity.createObject(response, + McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message("Unknown message type").build()); + } + } + + /** + * Handles incoming JSON-RPC response messages from clients. + * Accepts the response and delivers it to the corresponding pending request within the session. + * Sets the HTTP response status to {@code 202 Accepted} to acknowledge receipt. + * + * @param jsonrpcResponse The {@link McpSchema.JSONRPCResponse} from the client + * @param session The {@link McpStreamableServerSession} to accept the response + * @param transportContext The {@link McpTransportContext} for request context propagation + * @param response The {@link HttpClassicServerResponse} to set the status code + */ + private void handleJsonRpcResponse(McpSchema.JSONRPCResponse jsonrpcResponse, McpStreamableServerSession session, + McpTransportContext transportContext, HttpClassicServerResponse response) { + session.accept(jsonrpcResponse).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)).block(); + response.statusCode(HttpResponseStatus.ACCEPTED.statusCode()); + } + + /** + * Handles incoming JSON-RPC notification messages from clients. + * Notifications are one-way messages that do not require a response. + * Sets the HTTP response status to {@code 202 Accepted} to acknowledge receipt. + * + * @param jsonrpcNotification The {@link McpSchema.JSONRPCNotification} from the client + * @param session The {@link McpStreamableServerSession} to accept the notification + * @param transportContext The {@link McpTransportContext} for request context propagation + * @param response The {@link HttpClassicServerResponse} to set the status code + */ + private void handleJsonRpcNotification(McpSchema.JSONRPCNotification jsonrpcNotification, + McpStreamableServerSession session, McpTransportContext transportContext, + HttpClassicServerResponse response) { + session.accept(jsonrpcNotification) + .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) + .block(); + response.statusCode(HttpResponseStatus.ACCEPTED.statusCode()); + } + + /** + * Handles incoming JSON-RPC request messages from clients with streaming response support. + * Creates an SSE stream to send the response and any subsequent messages back to the client. + * This allows for real-time, bidirectional communication during request processing. + * + * @param jsonrpcRequest The {@link McpSchema.JSONRPCRequest} from the client + * @param session The {@link McpStreamableServerSession} to process the request + * @param sessionId The MCP session identifier for logging and tracking + * @param transportContext The {@link McpTransportContext} for request context propagation + * @param response The {@link HttpClassicServerResponse} for the SSE stream + * @return A {@link Choir} containing {@link TextEvent} for SSE streaming of the response + */ + private Object handleJsonRpcRequest(McpSchema.JSONRPCRequest jsonrpcRequest, McpStreamableServerSession session, + String sessionId, McpTransportContext transportContext, HttpClassicServerResponse response) { + return Choir.create(emitter -> { + emitter.observe(new Emitter.Observer() { + @Override + public void onEmittedData(TextEvent data) { + // No action needed + } + + @Override + public void onCompleted() { + logger.info("[SSE] Completed SSE emitting for session: {}", sessionId); + } + + @Override + public void onFailed(Exception e) { + logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, e.getMessage()); + } + }); + + FitStreamableMcpSessionTransport sessionTransport = + new FitStreamableMcpSessionTransport(sessionId, emitter, response); + + try { + session.responseStream(jsonrpcRequest, sessionTransport) + .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) + .block(); + } catch (Exception e) { + logger.error("Failed to handle request stream: {}", e.getMessage(), e); + emitter.fail(e); + } + }); + } + + /** + * Implementation of McpStreamableServerTransport for WebMVC SSE sessions. This class + * handles the transport-level communication for a specific client session. + * + *

+ * This class is thread-safe and uses a ReentrantLock to synchronize access to the + * underlying SSE builder to prevent race conditions when multiple threads attempt to + * send messages concurrently. + */ + private class FitStreamableMcpSessionTransport implements McpStreamableServerTransport { + private final String sessionId; + private final Emitter emitter; + private final HttpClassicServerResponse response; + + private final ReentrantLock lock = new ReentrantLock(); + + private volatile boolean closed = false; + + /** + * Creates a new session transport with the specified ID and SSE builder. + * + * @param sessionId The unique identifier for this session + * @param emitter The emitter for sending events + * @param response The HTTP response for checking connection status + */ + FitStreamableMcpSessionTransport(String sessionId, Emitter emitter, + HttpClassicServerResponse response) { + this.sessionId = sessionId; + this.emitter = emitter; + this.response = response; + logger.info("[SSE] Building SSE for session: {} ", sessionId); + } + + /** + * Sends a JSON-RPC message to the client through the SSE connection. + * + * @param message The JSON-RPC message to send + * @return A Mono that completes when the message has been sent + */ + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + return sendMessage(message, null); + } + + /** + * Sends a JSON-RPC message to the client through the SSE connection with a + * specific message ID. + * + * @param message The JSON-RPC message to send + * @param messageId The message ID for SSE event identification + * @return A Mono that completes when the message has been sent + */ + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId) { + return Mono.fromRunnable(() -> { + if (this.closed) { + logger.info("Attempted to send message to closed session: {}", this.sessionId); + return; + } + + this.lock.lock(); + try { + if (this.closed) { + logger.info("Session {} was closed during message send attempt", this.sessionId); + return; + } + + // Check if connection is still active before sending + if (!this.response.isActive()) { + logger.warn("[SSE] Connection inactive detected while sending message for session: {}", + this.sessionId); + this.close(); + return; + } + + String jsonText = jsonMapper.writeValueAsString(message); + TextEvent textEvent = + TextEvent.custom().id(this.sessionId).event(Event.MESSAGE.code()).data(jsonText).build(); + this.emitter.emit(textEvent); + + logger.info("[SSE] Sending message to session {}: {}", this.sessionId, jsonText); + } catch (Exception e) { + logger.error("Failed to send message to session {}: {}", this.sessionId, e.getMessage(), e); + try { + this.emitter.fail(e); + } catch (Exception errorException) { + logger.error("Failed to send error to SSE builder for session {}: {}", + this.sessionId, + errorException.getMessage(), + errorException); + } + } finally { + this.lock.unlock(); + } + }); + } + + /** + * Converts data from one type to another using the configured jsonMapper. + * + * @param data The source data object to convert + * @param typeRef The target type reference + * @param The target type + * @return The converted object of type T + */ + @Override + public T unmarshalFrom(Object data, TypeRef typeRef) { + return jsonMapper.convertValue(data, typeRef); + } + + /** + * Initiates a graceful shutdown of the transport. + * + * @return A Mono that completes when the shutdown is complete + */ + @Override + public Mono closeGracefully() { + return Mono.fromRunnable(FitStreamableMcpSessionTransport.this::close); + } + + /** + * Closes the transport immediately. + */ + @Override + public void close() { + this.lock.lock(); + try { + if (this.closed) { + logger.info("Session transport {} already closed", this.sessionId); + return; + } + + this.closed = true; + + this.emitter.complete(); + logger.info("[SSE] Closed SSE builder successfully for session {}", sessionId); + } catch (Exception e) { + logger.warn("Failed to complete SSE builder for session {}: {}", sessionId, e.getMessage()); + } finally { + this.lock.unlock(); + } + } + + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for creating instances of {@link FitMcpStreamableServerTransportProvider}. + */ + public static class Builder { + private McpJsonMapper jsonMapper; + private boolean disallowDelete = false; + private McpTransportContextExtractor contextExtractor = + (HttpClassicServerRequest) -> McpTransportContext.EMPTY; + private Duration keepAliveInterval; + + /** + * Sets the jsonMapper to use for JSON serialization/deserialization of MCP messages. + * + * @param jsonMapper The jsonMapper instance. Must not be null. + * @return this builder instance + * @throws IllegalArgumentException if jsonMapper is null + */ + public Builder jsonMapper(McpJsonMapper jsonMapper) { + Validation.notNull(jsonMapper, "jsonMapper must not be null"); + this.jsonMapper = jsonMapper; + return this; + } + + /** + * Sets whether to disallow DELETE requests on the endpoint. + * + * @param disallowDelete true to disallow DELETE requests, false otherwise + * @return this builder instance + */ + public Builder disallowDelete(boolean disallowDelete) { + this.disallowDelete = disallowDelete; + return this; + } + + /** + * Sets the context extractor that allows providing the MCP feature + * implementations to inspect HTTP transport level metadata that was present at + * HTTP request processing time. This allows to extract custom headers and other + * useful data for use during execution later on in the process. + * + * @param contextExtractor The contextExtractor to fill in a + * {@link McpTransportContext}. + * @return this builder instance + * @throws IllegalArgumentException if contextExtractor is null + */ + public Builder contextExtractor(McpTransportContextExtractor contextExtractor) { + Validation.notNull(contextExtractor, "contextExtractor must not be null"); + this.contextExtractor = contextExtractor; + return this; + } + + /** + * Sets the keep-alive interval for the transport. If set, a keep-alive scheduler + * will be created to periodically check and send keep-alive messages to clients. + * + * @param keepAliveInterval The interval duration for keep-alive messages, or null + * to disable keep-alive + * @return this builder instance + */ + public Builder keepAliveInterval(Duration keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + return this; + } + + /** + * Builds a new instance of {@link FitMcpStreamableServerTransportProvider} with + * the configured settings. + * + * @return A new FitMcpStreamableServerTransportProvider instance + * @throws IllegalStateException if required parameters are not set + */ + public FitMcpStreamableServerTransportProvider build() { + Validation.notNull(this.jsonMapper, "jsonMapper must be set"); + + return new FitMcpStreamableServerTransportProvider(this.jsonMapper, + this.disallowDelete, + this.contextExtractor, + this.keepAliveInterval); + } + } +} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/resources/application.yml b/framework/fel/java/plugins/tool-mcp-server/src/main/resources/application.yml index 64ea10351..d3ac184e8 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/resources/application.yml +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/resources/application.yml @@ -1,4 +1,9 @@ fit: beans: packages: - - 'modelengine.fel.tool.mcp.server' \ No newline at end of file + - 'modelengine.fel.tool.mcp.server' + +mcp: + server: + request: + timeout-seconds: 60 \ No newline at end of file diff --git a/framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/McpServerControllerTest.java b/framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/McpServerControllerTest.java deleted file mode 100644 index 068e46718..000000000 --- a/framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/McpServerControllerTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.server; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.catchThrowableOfType; -import static org.mockito.Mockito.mock; - -import modelengine.fitframework.serialization.ObjectSerializer; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -/** - * Unit test for {@link McpServerController}. - * - * @author 季聿阶 - * @since 2025-05-20 - */ -@DisplayName("Unit tests for McpController") -public class McpServerControllerTest { - private ObjectSerializer objectSerializer; - private McpServer mcpServer; - - @BeforeEach - void setup() { - this.objectSerializer = mock(ObjectSerializer.class); - this.mcpServer = mock(McpServer.class); - } - - @Nested - @DisplayName("Constructor Tests") - class GivenConstructor { - @Test - @DisplayName("Should throw exception when serializer is null") - void shouldThrowExceptionWhenSerializerIsNull() { - var exception = catchThrowableOfType(IllegalArgumentException.class, - () -> new McpServerController(null, mcpServer)); - assertThat(exception).hasMessage("The json serializer cannot be null."); - } - - @Test - @DisplayName("Should throw exception when mcpServer is null") - void shouldThrowExceptionWhenMcpServerIsNull() { - var exception = catchThrowableOfType(IllegalArgumentException.class, - () -> new McpServerController(objectSerializer, null)); - assertThat(exception).hasMessage("The MCP server cannot be null."); - } - } -} diff --git a/framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/support/DefaultMcpServerTest.java b/framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServerTest.java similarity index 56% rename from framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/support/DefaultMcpServerTest.java rename to framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServerTest.java index 555592161..0e411778f 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/support/DefaultMcpServerTest.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/test/java/modelengine/fel/tool/mcp/server/support/DefaultMcpStreamableServerTest.java @@ -8,17 +8,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowableOfType; -import static org.mockito.Mockito.anyMap; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import modelengine.fel.tool.mcp.entity.ServerSchema; +import io.modelcontextprotocol.server.McpSyncServer; import modelengine.fel.tool.mcp.entity.Tool; import modelengine.fel.tool.mcp.server.McpServer; +import modelengine.fel.tool.mcp.server.McpServerConfig; import modelengine.fel.tool.service.ToolExecuteService; import modelengine.fitframework.util.MapBuilder; @@ -27,22 +24,26 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; import java.util.Map; /** - * Unit test for {@link DefaultMcpServer}. + * Unit test for {@link DefaultMcpStreamableServer}. * * @author 季聿阶 * @since 2025-05-20 */ -@DisplayName("Unit tests for DefaultMcpServer") -public class DefaultMcpServerTest { +@DisplayName("Unit tests for DefaultMcpStreamableServer") +public class DefaultMcpStreamableServerTest { private ToolExecuteService toolExecuteService; + private McpSyncServer mcpSyncServer; @BeforeEach void setup() { this.toolExecuteService = mock(ToolExecuteService.class); + McpServerConfig config = new McpServerConfig(); + this.mcpSyncServer = config.mcpSyncServer(config.fitMcpStreamableServerTransportProvider(), 10); } @Nested @@ -51,48 +52,28 @@ class GivenConstructor { @Test @DisplayName("Should throw IllegalArgumentException when toolExecuteService is null") void throwIllegalArgumentExceptionWhenToolExecuteServiceIsNull() { - IllegalArgumentException exception = - catchThrowableOfType(IllegalArgumentException.class, () -> new DefaultMcpServer(null)); + IllegalArgumentException exception = catchThrowableOfType(IllegalArgumentException.class, + () -> new DefaultMcpStreamableServer(null, mcpSyncServer)); assertThat(exception).isNotNull().hasMessage("The tool execute service cannot be null."); } } - @Nested - @DisplayName("getInfo Method Tests") - class GivenGetInfo { - @Test - @DisplayName("Should return expected server information") - void returnExpectedServerInfo() { - McpServer server = new DefaultMcpServer(toolExecuteService); - ServerSchema info = server.getSchema(); - - assertThat(info).returns("2024-11-05", ServerSchema::protocolVersion); - - ServerSchema.Capabilities capabilities = info.capabilities(); - assertThat(capabilities).isNotNull(); - - ServerSchema.Capabilities.Tools toolsCapability = capabilities.tools(); - assertThat(toolsCapability).returns(true, ServerSchema.Capabilities.Tools::listChanged); - - ServerSchema.Info serverInfo = info.serverInfo(); - assertThat(serverInfo).returns("FIT Store MCP Server", ServerSchema.Info::name) - .returns("3.6.0-SNAPSHOT", ServerSchema.Info::version); - } - } - @Nested @DisplayName("registerToolsChangedObserver and Notification Tests") class GivenRegisterAndNotify { @Test @DisplayName("Should notify observers when tools are added or removed") void notifyObserversOnToolAddOrRemove() { - DefaultMcpServer server = new DefaultMcpServer(toolExecuteService); + DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); McpServer.ToolsChangedObserver observer = mock(McpServer.ToolsChangedObserver.class); server.registerToolsChangedObserver(observer); - server.onToolAdded("tool1", - "description1", - MapBuilder.get().put("schema", "value1").build()); + Map schema = MapBuilder.get() + .put("type", "object") + .put("properties", Collections.emptyMap()) + .put("required", Collections.emptyList()) + .build(); + server.onToolAdded("tool1", "description1", schema); verify(observer, times(1)).onToolsChanged(); server.onToolRemoved("tool1"); @@ -106,10 +87,14 @@ class GivenOnToolAdded { @Test @DisplayName("Should add tool successfully with valid parameters") void addToolSuccessfully() { - DefaultMcpServer server = new DefaultMcpServer(toolExecuteService); + DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); String name = "tool1"; String description = "description1"; - Map schema = MapBuilder.get().put("input", "value").build(); + Map schema = MapBuilder.get() + .put("type", "object") + .put("properties", Collections.emptyMap()) + .put("required", Collections.emptyList()) + .build(); server.onToolAdded(name, description, schema); @@ -125,12 +110,17 @@ void addToolSuccessfully() { @Test @DisplayName("Should ignore invalid parameters and not add any tool") void ignoreInvalidParameters() { - DefaultMcpServer server = new DefaultMcpServer(toolExecuteService); - - server.onToolAdded("", "description", MapBuilder.get().put("input", "value").build()); + DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); + Map schema = MapBuilder.get() + .put("type", "object") + .put("properties", Collections.emptyMap()) + .put("required", Collections.emptyList()) + .build(); + + server.onToolAdded("", "description", schema); assertThat(server.getTools()).isEmpty(); - server.onToolAdded("tool1", "", MapBuilder.get().put("input", "value").build()); + server.onToolAdded("tool1", "", schema); assertThat(server.getTools()).isEmpty(); server.onToolAdded("tool1", "description", null); @@ -144,8 +134,13 @@ class GivenOnToolRemoved { @Test @DisplayName("Should remove an added tool correctly") void removeToolSuccessfully() { - DefaultMcpServer server = new DefaultMcpServer(toolExecuteService); - server.onToolAdded("tool1", "desc", MapBuilder.get().put("input", "value").build()); + DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); + Map schema = MapBuilder.get() + .put("type", "object") + .put("properties", Collections.emptyMap()) + .put("required", Collections.emptyList()) + .build(); + server.onToolAdded("tool1", "desc", schema); server.onToolRemoved("tool1"); @@ -155,28 +150,17 @@ void removeToolSuccessfully() { @Test @DisplayName("Should ignore removal if name is blank") void ignoreBlankName() { - DefaultMcpServer server = new DefaultMcpServer(toolExecuteService); - server.onToolAdded("tool1", "desc", MapBuilder.get().put("input", "value").build()); + DefaultMcpStreamableServer server = new DefaultMcpStreamableServer(toolExecuteService, mcpSyncServer); + Map schema = MapBuilder.get() + .put("type", "object") + .put("properties", Collections.emptyMap()) + .put("required", Collections.emptyList()) + .build(); + server.onToolAdded("tool1", "desc", schema); server.onToolRemoved(""); assertThat(server.getTools()).hasSize(1); } } - - @Nested - @DisplayName("callTool Method Tests") - class GivenCallTool { - @Test - @DisplayName("Should call the tool and return correct result") - void callToolSuccessfully() { - when(toolExecuteService.execute(anyString(), anyMap())).thenReturn("result"); - McpServer server = new DefaultMcpServer(toolExecuteService); - - Object result = server.callTool("tool1", Map.of("arg1", "value1")); - - assertThat(result).isEqualTo("result"); - verify(toolExecuteService, times(1)).execute(eq("tool1"), anyMap()); - } - } } \ No newline at end of file