-
Notifications
You must be signed in to change notification settings - Fork 53
🐛 Fix DB connection pool getting exhausted #862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
c85c725
02641db
17e6fd6
4ad87a5
1a0942c
20bd53a
f20601b
76df1f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,23 @@ | ||
| import asyncio | ||
| import concurrent | ||
| import concurrent.futures | ||
| import datetime | ||
| import json | ||
| import os | ||
| import subprocess | ||
| import threading | ||
| import unittest | ||
| from uuid import uuid4 | ||
|
|
||
| from fastmcp import Client | ||
| from mcp import ClientSession | ||
| from mcp.types import CallToolResult | ||
|
|
||
| from kai_mcp_solution_server.analyzer_types import ExtendedIncident | ||
| from kai_mcp_solution_server.server import GetBestHintResult, SuccessRateMetric | ||
| from tests.mcp_client import MCPClientArgs | ||
| from tests.mcp_loader_script import create_client | ||
| from tests.ssl_utils import apply_ssl_bypass | ||
|
|
||
| # TODO: The tracebacks from these tests contain horrible impossibly-to-parse output. | ||
|
|
||
|
|
@@ -353,3 +359,183 @@ | |
| best_hint = GetBestHintResult(**json.loads(get_best_hint.content[0].text)) | ||
| print(f"Best hint for {RULESET_NAME_A}/{VIOLATION_NAME_A}: {best_hint}") | ||
| self.assertEqual(best_hint.hint, llm_params["responses"][0]) | ||
|
|
||
| @unittest.skip("Skipping test_solution_server_2 for now") | ||
| async def test_solution_server_2(self) -> None: | ||
| llm_params = { | ||
| "model": "fake", | ||
| "responses": [ | ||
| f"{uuid4()} You should add a smiley face to the file.", | ||
| ], | ||
| } | ||
| os.environ["KAI_LLM_PARAMS"] = json.dumps(llm_params) | ||
|
|
||
| async with create_client(self.mcp_args) as session: | ||
| await session.initialize() | ||
|
|
||
| RULESET_NAME_A = f"ruleset-{uuid4()}" | ||
| VIOLATION_NAME_A = f"violation-{uuid4()}" | ||
| CLIENT_ID_A = str(uuid4()) | ||
|
|
||
| print() | ||
| print("--- Testing modify ---") | ||
|
|
||
| create_incident_a = await self.call_tool( | ||
| session, | ||
| "create_incident", | ||
| { | ||
| "client_id": CLIENT_ID_A, | ||
| "extended_incident": ExtendedIncident( | ||
| uri="file://src/file_to_smile.txt", | ||
| message="this file needs to have a smiley face", | ||
| ruleset_name=RULESET_NAME_A, | ||
| violation_name=VIOLATION_NAME_A, | ||
| ).model_dump(), | ||
| }, | ||
| ) | ||
| INCIDENT_ID_A = int(create_incident_a.model_dump()["content"][0]["text"]) | ||
|
|
||
| create_solution_for_incident_a = await self.call_tool( | ||
| session, | ||
| "create_solution", | ||
| { | ||
| "client_id": CLIENT_ID_A, | ||
| "incident_ids": [INCIDENT_ID_A], | ||
| "before": [ | ||
| { | ||
| "uri": "file://src/file_to_smile.txt", | ||
| "content": "I am very frowny :(", | ||
| } | ||
| ], | ||
| "after": [ | ||
| { | ||
| "uri": "file://src/file_to_smile.txt", | ||
| "content": "I am very smiley :)", | ||
| } | ||
| ], | ||
| "reasoning": None, | ||
| "used_hint_ids": None, | ||
| }, | ||
| ) | ||
| SOLUTION_FOR_INCIDENT_A_ID = int( | ||
| create_solution_for_incident_a.model_dump()["content"][0]["text"] | ||
| ) | ||
|
|
||
| async def test_multiple_users(self) -> None: | ||
| multiple_user_mcp_args = MCPClientArgs( | ||
| transport="http", | ||
| host="localhost", | ||
| port=8087, | ||
| insecure=True, | ||
| server_path=self.mcp_args.server_path, | ||
| ) | ||
|
|
||
| os.environ["KAI_LLM_PARAMS"] = json.dumps( | ||
| { | ||
| "model": "fake", | ||
| "responses": [ | ||
| f"You should add a smiley face to the file.", | ||
| ], | ||
| } | ||
| ) | ||
|
|
||
| def stream_output(process: subprocess.Popen) -> None: | ||
| try: | ||
| assert process.stdout is not None | ||
| for line in iter(process.stdout.readline, b""): | ||
| print(f"[Server] {line.decode().rstrip()}") | ||
| except Exception as e: | ||
| print(f"Error while streaming output: {e}") | ||
| finally: | ||
| process.stdout.close() | ||
|
|
||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| def poll_process(process: subprocess.Popen) -> None: | ||
| # Check if the process has exited early | ||
| if process.poll() is not None: | ||
| output = process.stdout.read() if process.stdout else b"" | ||
| raise RuntimeError( | ||
| f"HTTP server process exited prematurely. Output: {output.decode(errors='replace')}" | ||
| ) | ||
|
|
||
| def run_async_in_thread(fn, *args, **kwargs): | ||
| try: | ||
| loop = asyncio.get_event_loop() | ||
| except RuntimeError: | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
|
|
||
| try: | ||
| result = loop.run_until_complete(fn(*args, **kwargs)) | ||
| return result | ||
| finally: | ||
| loop.close() | ||
|
|
||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| async def client_task(client_id: str) -> None: | ||
| print(f"[Client {client_id}] starting") | ||
| ssl_patch = apply_ssl_bypass() | ||
|
|
||
| client = Client( | ||
| transport="http://localhost:8087", | ||
| ) | ||
|
|
||
| async with client: | ||
| await client.session.initialize() | ||
| print(f"[Client {client_id}] initialized") | ||
|
|
||
| await client.session.list_tools() | ||
| print(f"[Client {client_id}] listed tools") | ||
|
|
||
| print(f"[Client {client_id}] finished") | ||
|
|
||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try: | ||
| self.http_server_process = subprocess.Popen( | ||
|
Check notice on line 491 in kai_mcp_solution_server/tests/test_multiple_integration.py
|
||
| [ | ||
| "python", | ||
| "-m", | ||
| "kai_mcp_solution_server", | ||
| "--transport", | ||
| "streamable-http", | ||
| "--port", | ||
| "8087", | ||
| ], | ||
|
||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.STDOUT, | ||
| ) | ||
|
|
||
| stream_thread = threading.Thread( | ||
| target=stream_output, args=(self.http_server_process,) | ||
| ) | ||
| stream_thread.daemon = True | ||
| stream_thread.start() | ||
|
|
||
| await asyncio.sleep(1) # give the server a second to start | ||
|
|
||
| NUM_TASKS = 1 | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | ||
| # Submit each task to the thread pool and store the Future objects. | ||
| # The executor will call run_async_in_thread for each task ID. | ||
| futures = { | ||
| executor.submit(run_async_in_thread, client_task, i): i | ||
| for i in range(1, NUM_TASKS + 1) | ||
| } | ||
|
|
||
| # Use as_completed() to process results as they become available. | ||
| # This is non-blocking to the main thread while tasks are running. | ||
| for future in concurrent.futures.as_completed(futures): | ||
| task_id = futures[future] | ||
| try: | ||
| result = future.result() | ||
| print( | ||
| f"[Main] received result for Task {task_id}: {result}", | ||
| flush=True, | ||
| ) | ||
| except Exception as exc: | ||
| print(f"[Main] Task {task_id} generated an exception: {exc}") | ||
|
|
||
| await asyncio.sleep(10) # wait a moment for all output to be printed | ||
|
|
||
| finally: | ||
| self.http_server_process.terminate() | ||
| self.http_server_process.wait() | ||
| print("Server process terminated.") | ||
| stream_thread.join() | ||
Uh oh!
There was an error while loading. Please reload this page.