-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathserver.py
More file actions
105 lines (91 loc) · 3.16 KB
/
server.py
File metadata and controls
105 lines (91 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import argparse
import asyncio
import os
from typing import Any
import pymssql
import uvicorn
from dotenv import load_dotenv
from mcp.server import Server
from mcp.server.fastmcp import FastMCP
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Mount, Route
load_dotenv()
MSSQL_HOST = os.getenv("MSSQL_HOST", "localhost")
MSSQL_USER = os.getenv("MSSQL_USER", "sa")
MSSQL_PASSWORD = os.getenv("MSSQL_PASSWORD", "1234")
MSSQL_PORT = int(os.getenv("MSSQL_PORT", "1433"))
app = FastMCP("MSSQL MCP Server")
def run_query_sync(query: str, database: str) -> Any:
"""
Executes SQL queries synchronously using pymssql and
returns the results as a list of dictionaries.
"""
try:
conn = pymssql.connect(
server=MSSQL_HOST,
user=MSSQL_USER,
password=MSSQL_PASSWORD,
database=database,
port=MSSQL_PORT
)
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
try:
result = cursor.fetchall()
except Exception:
result = []
conn.commit()
cursor.close()
conn.close()
return result
except Exception as e:
return {"error": str(e)}
@app.tool()
async def execute_query(query: str, database: str) -> str:
"""
Executes SQL queries on the specified MSSQL database using pymssql.
Args:
query: SQL query to execute (e.g., SELECT, INSERT, UPDATE, DELETE, etc.)
database: Name of the database to use
Returns:
Query results (list of rows for SELECT statements, status message for others) as a string.
"""
try:
result = await asyncio.to_thread(run_query_sync, query, database)
return str(result)
except Exception as e:
return f"Error: {str(e)}"
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
"""
Creates a Starlette application that exposes the MCP server externally through SSE.
"""
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await asyncio.sleep(1)
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
if __name__ == "__main__":
mcp_server = app._mcp_server
parser = argparse.ArgumentParser(description='Run MCP SSE-based server with MSSQL query tool')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=8080, help='Port to listen on')
args = parser.parse_args()
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)