-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_mcp.py
More file actions
executable file
·133 lines (114 loc) · 3.79 KB
/
test_mcp.py
File metadata and controls
executable file
·133 lines (114 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
import json
import subprocess
import sys
def test_mcp_server():
"""Test the MCP server functionality."""
print("Testing SQL Local MCP Server...")
try:
# Start the server
start_server()
# Test initialize
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
print("\n1. Testing initialize...")
result = send_request(init_request)
if result and result.get("result"):
print("✅ Initialize successful")
print(f" Server: {result['result']['serverInfo']['name']} v{result['result']['serverInfo']['version']}")
else:
print("❌ Initialize failed")
return False
# Send initialized notification
initialized_notification = {
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
}
send_notification(initialized_notification)
# Test tools/list
tools_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
print("\n2. Testing tools/list...")
result = send_request(tools_request)
if result and result.get("result") and result["result"].get("tools"):
tools = result["result"]["tools"]
print(f"✅ Tools list successful - {len(tools)} tools available:")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
else:
print("❌ Tools list failed")
return False
print("\n✅ All tests passed! MCP server is working correctly.")
return True
finally:
cleanup()
# Global process for persistent connection
_process = None
def start_server():
"""Start the MCP server process."""
global _process
_process = subprocess.Popen(
[sys.executable, "mcp_server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0
)
return _process
import time
def send_request(request):
"""Send a request to the MCP server."""
try:
request_json = json.dumps(request) + "\n"
_process.stdin.write(request_json)
_process.stdin.flush()
# Read response
response_line = _process.stdout.readline()
if response_line.strip():
try:
return json.loads(response_line.strip())
except json.JSONDecodeError as e:
print(f"Error parsing response: {e}")
return None
else:
time.sleep(0.1) # wait for stderr to be written
# Check for errors in stderr
error_output = _process.stderr.read()
if error_output:
print(f"Server Error: {error_output.strip()}")
return None
except Exception as e:
print(f"Error sending request: {e}")
return None
def send_notification(notification):
"""Send a notification to the MCP server (no response expected)."""
try:
notification_json = json.dumps(notification) + "\n"
_process.stdin.write(notification_json)
_process.stdin.flush()
except Exception as e:
print(f"Error sending notification: {e}")
def cleanup():
"""Clean up the server process."""
global _process
if _process:
try:
_process.terminate()
_process.wait(timeout=5)
except subprocess.TimeoutExpired:
_process.kill()
except:
pass
if __name__ == "__main__":
success = test_mcp_server()
sys.exit(0 if success else 1)