Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a8de38b
Enable MCP client for Agent
lvliang-intel May 8, 2025
0b5199b
Merge branch 'main' of https://github.com/lvliang-intel/GenAIComps in…
lvliang-intel May 8, 2025
24c43e6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 8, 2025
fc0c442
update code path
lvliang-intel May 8, 2025
b233b09
Merge branch 'agent_mcp_client' of https://github.com/lvliang-intel/G…
lvliang-intel May 8, 2025
f6872f5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 8, 2025
62c88b5
update test code
lvliang-intel May 8, 2025
b83fd14
Merge branch 'agent_mcp_client' of https://github.com/lvliang-intel/G…
lvliang-intel May 8, 2025
2aec889
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 8, 2025
8089984
add mcp dependency
lvliang-intel May 8, 2025
18c3f98
Merge branch 'agent_mcp_client' of https://github.com/lvliang-intel/G…
lvliang-intel May 8, 2025
d6b2be8
fix code path
lvliang-intel May 8, 2025
5492a23
improve code test coverage
lvliang-intel May 11, 2025
e89b5fb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 11, 2025
1372bea
fix precommit issue
lvliang-intel May 11, 2025
547af2f
Merge branch 'agent_mcp_client' of https://github.com/lvliang-intel/G…
lvliang-intel May 11, 2025
1788f42
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 11, 2025
89fe0ec
Merge branch 'main' into agent_mcp_client
Spycsh May 12, 2025
d7d1ee1
Merge branch 'main' of https://github.com/lvliang-intel/GenAIComps in…
lvliang-intel May 13, 2025
6a71f43
Merge branch 'agent_mcp_client' of https://github.com/lvliang-intel/G…
lvliang-intel May 13, 2025
fc2777e
Merge branch 'main' into agent_mcp_client
lvliang-intel May 22, 2025
7046df8
Merge branch 'main' of https://github.com/lvliang-intel/GenAIComps in…
lvliang-intel Jun 10, 2025
e32bbb2
Merge branch 'agent_mcp_client' of https://github.com/lvliang-intel/G…
lvliang-intel Jun 10, 2025
d698e1d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 10, 2025
191e699
Merge branch 'main' into agent_mcp_client
lvliang-intel Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions comps/cores/mcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# OPEA MCP Tool

The **OPEA MCP Tool** is a client tool designed to facilitate seamless integration between agents and MCP (Model Context Protocol) servers. It provides a unified interface for managing MCP clients, enabling agents to access and interact with various tools and data sources exposed by MCP servers.

---

## **OPEA MCP Tool Overview**

The **OPEA MCP Tool** provides a unified interface for managing MCP clients and interacting with tools exposed by MCP servers. It supports both **SSE (Server-Sent Events)** and **Stdio** server configurations, making it flexible for various use cases.

### **Features**

- **Dynamic Tool Registration**: Automatically registers tools exposed by MCP servers for natural invocation.
- **Asynchronous Operations**: Fully asynchronous API for efficient integration with modern Python applications.
- **Context Management**: Supports Python's `async with` syntax for automatic resource management.
- **Error Handling**: Robust error handling for client initialization, tool execution, and disconnection.

---

## **API Usage**

### Initialization

To initialize the OpeaMCPToolsManager, provide an OpeaMCPConfig object containing the server configurations:

```python
from comps.cores.mcp.config import OpeaMCPConfig, OpeaMCPSSEServerConfig, OpeaMCPStdioServerConfig
from comps.cores.mcp.manager import OpeaMCPToolsManager

config = OpeaMCPConfig(
sse_servers=[
OpeaMCPSSEServerConfig(url="http://sse-server-1.com", api_key="your_api_key"),
],
stdio_servers=[
OpeaMCPStdioServerConfig(name="stdio-server-1", command="python", args=["tool.py"]),
],
)

manager = await OpeaMCPToolsManager.create(config)
```

### Tool Execution

Once initialized, you can execute tools exposed by MCP servers using the execute_tool method:

```python
result = await manager.execute_tool("tool_name", {"param1": "value1", "param2": "value2"})
print(result)
```

### Context Management

The OpeaMCPToolsManager supports Python's async with syntax for automatic resource management:

```python
async with await OpeaMCPToolsManager.create(config) as manager:
result = await manager.execute_tool("tool_name", {"param1": "value1"})
print(result)
```

### Dynamic Tool Invocation

Tools are dynamically registered as methods of the manager, allowing for natural invocation:

```python
async with OpeaMCPToolsManager.create(config) as manager:
result = await manager.tool_name(param1="value1", param2="value2")
print(result)
```

## **Examples**

### **Launch a SSE MCP Server**

To launch an SSE MCP server using Playwright, run the following command:

```bash
npx @playwright/mcp@latest --port 8931
```

### **Launch a Stdio MCP Server**

To launch a simple Stdio MCP server, follow these steps:

```bash
git clone https://github.com/modelcontextprotocol/python-sdk.git
cd python-sdk/examples/servers/simple-tool/mcp_simple_tool
uv run mcp-simple-tool
```

### **Run the MCP Client**

The following example demonstrates how to connect to both SSE and Stdio MCP servers and execute tools:

```python
import asyncio
from comps.cores.mcp.config import OpeaMCPConfig, OpeaMCPSSEServerConfig, OpeaMCPStdioServerConfig
from comps.cores.mcp.manager import OpeaMCPToolsManager


async def main():
config = OpeaMCPConfig(
sse_servers=[
OpeaMCPSSEServerConfig(url="http://localhost:8931/sse"),
],
stdio_servers=[
OpeaMCPStdioServerConfig(name="mcp-simple-tool", command="uv", args=["run", "mcp-simple-tool"]),
],
)

async with await OpeaMCPToolsManager.create(config) as manager:
# Execute tools exposed by the servers
result = await manager.execute_tool("browser_snapshot", {})
print(result)

result = await manager.execute_tool("fetch", {"url": "https://opea.dev/"})
print(result)


# Run the async function
asyncio.run(main())
```
2 changes: 2 additions & 0 deletions comps/cores/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
165 changes: 165 additions & 0 deletions comps/cores/mcp/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import asyncio
import os
from contextlib import AsyncExitStack
from typing import List, Optional

from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from pydantic import BaseModel, Field

from comps import CustomLogger
from comps.cores.mcp.tool import OpeaMCPClientTool

logger = CustomLogger("comps-mcp-client")
log_flag = os.getenv("LOGFLAG", False)


class OpeaMCPClient(BaseModel):
"""A client for interacting with MCP servers, managing tools, and handling server communication."""

description: str = "MCP client for server interaction and tool management"
session: Optional[ClientSession] = None
exit_stack: AsyncExitStack = AsyncExitStack()

tools: List[OpeaMCPClientTool] = Field(default_factory=list)
tool_registry: dict[str, OpeaMCPClientTool] = Field(default_factory=dict)

class Config:
arbitrary_types_allowed = True

async def connect_via_sse(self, server_url: str, api_key: Optional[str] = None, timeout: float = 30.0) -> None:
"""Establish a connection to an MCP server using SSE (Server-Sent Events) transport.

Args:
server_url: The URL of the SSE server to connect to.
api_key: Optional API key for authentication.
timeout: Connection timeout in seconds. Default is 30 seconds.

Raises:
ValueError: If the server URL is not provided.
asyncio.TimeoutError: If the connection times out.
Exception: For other connection errors.
"""
if not server_url:
raise ValueError("Server URL is required.")

Check warning on line 48 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L48

Added line #L48 was not covered by tests
if self.session:
await self.disconnect()

Check warning on line 50 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L50

Added line #L50 was not covered by tests

try:

async def connect_with_timeout():
streams_context = sse_client(
url=server_url,
headers={"Authorization": f"Bearer {api_key}"} if api_key else None,
timeout=timeout,
)
streams = await self.exit_stack.enter_async_context(streams_context)
self.session = await self.exit_stack.enter_async_context(ClientSession(*streams))
await self._initialize_tools()

await asyncio.wait_for(connect_with_timeout(), timeout=timeout)
except asyncio.TimeoutError:
logger.error(f"Connection to {server_url} timed out after {timeout} seconds")
await self.disconnect()
raise
except Exception as e:
logger.error(f"Error connecting to {server_url}: {str(e)}")
await self.disconnect()
raise

Check warning on line 72 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L65-L72

Added lines #L65 - L72 were not covered by tests

async def connect_via_stdio(self, command: str, args: List[str]) -> None:
"""Establish a connection to an MCP server using stdio (standard input/output) transport.

Args:
command: The command to start the server.
args: A list of arguments for the command.

Raises:
ValueError: If the command is not provided.
Exception: For other connection errors.
"""
if not command:
raise ValueError("Server command is required.")

Check warning on line 86 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L86

Added line #L86 was not covered by tests
if self.session:
await self.disconnect()

Check warning on line 88 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L88

Added line #L88 was not covered by tests

try:
server_params = StdioServerParameters(command=command, args=args)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
read, write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))

await self._initialize_tools()
except Exception as e:
logger.error(f"Error connecting to {command}: {str(e)}")
await self.disconnect()
raise

Check warning on line 100 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L97-L100

Added lines #L97 - L100 were not covered by tests

async def _initialize_tools(self) -> None:
"""Initialize the client session and populate the tool registry with available tools.

Raises:
RuntimeError: If the session is not initialized.
"""
if not self.session:
raise RuntimeError("Session not initialized.")

Check warning on line 109 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L109

Added line #L109 was not covered by tests

await self.session.initialize()
response = await self.session.list_tools()

# Clear existing tools
self.tools = []
self.tool_registry = {}

# Populate tools and registry
for tool in response.tools:
client_tool = OpeaMCPClientTool(
name=tool.name,
description=tool.description,
inputSchema=tool.inputSchema,
session=self.session,
)
self.tool_registry[tool.name] = client_tool
self.tools.append(client_tool)

logger.info(f"Connected to server with tools: {[tool.name for tool in response.tools]}")

async def invoke_tool(self, tool_name: str, parameters: dict):
"""Invoke a tool on the MCP server.

Args:
tool_name: The name of the tool to invoke.
parameters: The parameters to pass to the tool.

Returns:
The result of the tool invocation.

Raises:
ValueError: If the tool is not found in the registry.
RuntimeError: If the client session is not available.
"""
if tool_name not in self.tool_registry:
raise ValueError(f"Tool '{tool_name}' not found in the registry.")
if not self.session:
raise RuntimeError("Client session is not available.")

Check warning on line 148 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L145-L148

Added lines #L145 - L148 were not covered by tests

return await self.session.call_tool(name=tool_name, arguments=parameters)

Check warning on line 150 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L150

Added line #L150 was not covered by tests

async def disconnect(self) -> None:
"""Disconnect from the MCP server and clean up resources."""
if self.session:
try:
if hasattr(self.session, "close"):
await self.session.close()
await self.exit_stack.aclose()
except Exception as e:
logger.error(f"Error during disconnect: {str(e)}")

Check warning on line 160 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L154-L160

Added lines #L154 - L160 were not covered by tests
finally:
self.session = None
self.tools = []
self.tool_registry = {}
logger.info("Disconnected from MCP server")

Check warning on line 165 in comps/cores/mcp/client.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/client.py#L162-L165

Added lines #L162 - L165 were not covered by tests
63 changes: 63 additions & 0 deletions comps/cores/mcp/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from urllib.parse import urlparse

from pydantic import BaseModel, Field


class OpeaMCPSSEServerConfig(BaseModel):
"""Configuration for a single MCP server.

Attributes:
url: The server URL
api_key: Optional API key for authentication
"""

url: str
api_key: str | None = None


class OpeaMCPStdioServerConfig(BaseModel):
"""Configuration for a MCP (Model Context Protocol) server that uses stdio.

Attributes:
name: The name of the server
command: The command to run the server
args: The arguments to pass to the server
env: The environment variables to set for the server
"""

name: str
command: str
args: list[str] = Field(default_factory=list)
env: dict[str, str] = Field(default_factory=dict)


class OpeaMCPConfig(BaseModel):
"""Configuration for MCP (Model Context Protocol) settings.

Attributes:
sse_servers: List of MCP SSE server configs
stdio_servers: List of MCP stdio server configs. These servers will be added to the MCP Router running inside runtime container.
"""

sse_servers: list[OpeaMCPSSEServerConfig] = Field(default_factory=list)
stdio_servers: list[OpeaMCPStdioServerConfig] = Field(default_factory=list)

def validate_servers(self) -> None:
"""Validate that server URLs are valid and unique."""
urls = [server.url for server in self.sse_servers]

Check warning on line 50 in comps/cores/mcp/config.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/config.py#L50

Added line #L50 was not covered by tests

# Check for duplicate server URLs
if len(set(urls)) != len(urls):
raise ValueError("Duplicate MCP server URLs are not allowed")

Check warning on line 54 in comps/cores/mcp/config.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/config.py#L53-L54

Added lines #L53 - L54 were not covered by tests

# Validate URLs
for url in urls:
try:
result = urlparse(url)
if not all([result.scheme, result.netloc]):
raise ValueError(f"Invalid URL format: {url}")
except Exception as e:
raise ValueError(f"Invalid URL {url}: {str(e)}")

Check warning on line 63 in comps/cores/mcp/config.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/mcp/config.py#L57-L63

Added lines #L57 - L63 were not covered by tests
Loading