Skip to content

Conversation

@fabianvf
Copy link
Contributor

@fabianvf fabianvf commented Sep 25, 2025

Summary by CodeRabbit

  • New Features

    • Shared resource initialization and centralized recovery for server DB/model usage.
    • Deployment compose file added for multi-service local deployment (Postgres + app).
  • Bug Fixes

    • Robust DB recovery with retries/backoff, idle-connection cleanup, and longer DB pool timeout.
    • Improved stability for hint generation and file accept/reject under load.
  • Tests

    • Multi-client stress test rewritten to asyncio with coordinated clients; default concurrent clients now 200.
  • Chores

    • Fixed deployment startup working directory.

…o limit concurrent operations

Signed-off-by: Fabian von Feilitzsch <[email protected]>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 25, 2025

Walkthrough

Centralizes shared DB/model resources with semaphore‑controlled DB access, adds retry/backoff and idle‑connection cleanup, adjusts SQLAlchemy pool settings, updates deployment/Makefile and podman-compose, raises CI stress-test concurrency default, and converts a thread‑based integration test to asyncio tasks and events.

Changes

Cohort / File(s) Summary
CI workflow
.github/workflows/stress-test-mcp-server.yml
Default NUM_CONCURRENT_CLIENTS fallback increased from '100''200'.
Deployment / Makefile
kai_mcp_solution_server/Makefile, kai_mcp_solution_server/tools/deploy/podman-compose.yml
Makefile podman-postgres target now runs from tools/deploy (cd tools/deploy && ...); added podman-compose.yml describing postgres and kai-mcp-server services, healthchecks, volume, and env passthrough.
DB layer
kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py
Added kill_idle_connections(engine: AsyncEngine) to terminate idle backends for application kai-solution-server; increased non‑SQLite pool_timeout 30→60s and added pool_reset_on_return='rollback'; moved Postgres-specific session / timeout setup behind a dialect guard.
Server core / resource management
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py
Added _SharedResources, KaiSolutionServerContext, _initialize_shared_resources(), with_db_recovery decorator, _recover_from_db_error(), semaphore-controlled DB concurrency, and applied retry/backoff/recovery to many DB‑facing handlers; removed per-lifespan engine disposal.
Tests
kai_mcp_solution_server/tests/test_multiple_integration.py
Replaced thread‑pool concurrency with asyncio tasks and asyncio.Event synchronization; clients signal readiness, wait on a global release event, and use asyncio.gather(return_exceptions=True) for centralized exception handling.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant Server
  participant Shared as _SharedResources
  participant DB as Database
  participant Model as LLM/Model

  Client->>Server: request (e.g., create_solution)
  Server->>Shared: ensure initialized (_initialize_shared_resources)
  Shared-->>Server: engine, session_maker, model, db_semaphore

  rect rgba(220,235,255,0.6)
    Server->>Shared: acquire db_semaphore
    Server->>DB: execute DB operation
    alt DB success
      DB-->>Server: result
    else DB connection / error
      Server->>Server: _recover_from_db_error()
      note right of Server: may call kill_idle_connections\nor recreate engine, then retry with backoff
      Server->>DB: retry operation
    end
    Server->>Shared: release db_semaphore
  end

  opt uses model
    Server->>Model: generate/process hint
    Model-->>Server: output
  end

  Server-->>Client: response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • JonahSussman
  • shawn-hurley

Poem

In burrows warm I nibble code and cheer,
Semaphores line up two hundred near.
I chase idle tails and wake each thread,
Retry with hop and carrot in my head.
Soft thump — the server wakes, all good, my dear. 🐇🥕

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title accurately reflects the main changes around preventing extra database connections and introducing a semaphore for concurrency control, but it includes an emoji and is longer than necessary, which conflicts with the guideline to keep titles concise and free of noise. Please remove the emoji and shorten the title to a clear, concise sentence such as “Avoid unneeded DB connections and add a semaphore to limit concurrent operations.”
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Signed-off-by: Fabian von Feilitzsch <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (4)

1-1: Fix forward-reference NameError in annotations

KaiSolutionServerContext is referenced in annotations before it's defined. Without postponed evaluation, this raises at import time. Add future annotations at the top.

+from __future__ import annotations
 import asyncio

560-565: Mypy: kai_ctx.model is Optional; narrow before use

Add an assertion or a local cast before calling .ainvoke(...) to satisfy types.

-            response = await kai_ctx.model.ainvoke(prompt)
+            assert kai_ctx.model is not None
+            response = await kai_ctx.model.ainvoke(prompt)

Apply similarly in generate_hint_v2 and generate_hint_v3.

Also applies to: 641-647, 734-738


972-979: Duplicate session.add(new_file) call

new_file is added twice. Drop the second add.

             for solution, old_file in files_to_update:
                 # Remove the old file from the solution
                 solution.after.remove(old_file)
                 solution.after.add(new_file)
 
-                session.add(new_file)
                 session.add(solution)

297-337: Ensure session_maker, engine, and model are non-None for mypy
Add assert kai_ctx.session_maker, assert kai_ctx.engine, and assert kai_ctx.model at the top of each handler (create_incident, create_solution, etc.), or change their annotations to non-Optional and initialize them in create() so mypy knows they’re always set.

🧹 Nitpick comments (14)
kai_mcp_solution_server/Makefile (1)

106-109: Use $(BUILD_DIR) for robust pathing instead of a relative cd

Using a relative path can break when invoking the Makefile from outside the directory. Prefer the already-defined $(BUILD_DIR) for consistency.

-	@cd tools/deploy && \
+	@cd $(BUILD_DIR) && \
 	IMAGE=$(IMAGE) KAI_LLM_PARAMS='$(KAI_LLM_PARAMS)' MOUNT_PATH='$(MOUNT_PATH)' \
 		podman-compose up --force-recreate
kai_mcp_solution_server/tools/deploy/podman-compose.yml (2)

31-31: Avoid embedding credentials in DSN; parameterize to satisfy secret scanners

Inline creds trigger CKV_SECRET_4 and are brittle. Compose supports env substitution; wire DSN from env to avoid hardcoding.

-      KAI_DB_DSN: "postgresql+asyncpg://kai_user:kai_password@postgres:5432/kai_db"
+      # Compose-style defaults allow local use without exposing literals in the file
+      KAI_DB_DSN: ${KAI_DB_DSN:-postgresql+asyncpg://${POSTGRES_USER:-kai_user}:${POSTGRES_PASSWORD:-kai_password}@postgres:5432/${POSTGRES_DB:-kai_db}}

12-12: Tidy YAML per linters (quotes and empty value)

  • Quotes around simple strings are unnecessary and flagged by yamllint.
  • Define empty volume as {} to silence empty-values.
-    ports:
-      - "5432:5432"
+    ports:
+      - 5432:5432
@@
-      test: ["CMD-SHELL", "pg_isready -U kai_user"]
+      test: [CMD-SHELL, "pg_isready -U kai_user"]
@@
-    ports:
-      - "8000:8000"
+    ports:
+      - 8000:8000
@@
-      KAI_DB_DSN: "postgresql+asyncpg://kai_user:kai_password@postgres:5432/kai_db"
+      KAI_DB_DSN: ${KAI_DB_DSN:-postgresql+asyncpg://${POSTGRES_USER:-kai_user}:${POSTGRES_PASSWORD:-kai_password}@postgres:5432/${POSTGRES_DB:-kai_db}}
@@
-volumes:
-  kai-postgres-data:
+volumes:
+  kai-postgres-data: {}

Also applies to: 14-14, 29-29, 31-31, 49-49

kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (1)

150-158: Pool tuning looks good; also gate Postgres-specific connect SETs

The new pool_timeout and pool_reset_on_return are sensible. Ensure the session SETs are only applied to Postgres to avoid failures on other backends.

-        @event.listens_for(engine.sync_engine, "connect")
-        def _set_pg_timeouts(dbapi_conn: Any, conn_record: Any) -> None:
-            cur = dbapi_conn.cursor()
-            cur.execute("SET idle_session_timeout = '1min'")
-            cur.execute("SET idle_in_transaction_session_timeout = '1min'")
-            cur.execute("SET application_name = 'kai-solution-server'")
-            cur.close()
+        if engine.dialect.name == "postgresql":
+            @event.listens_for(engine.sync_engine, "connect")
+            def _set_pg_timeouts(dbapi_conn: Any, conn_record: Any) -> None:
+                cur = dbapi_conn.cursor()
+                cur.execute("SET idle_session_timeout = '1min'")
+                cur.execute("SET idle_in_transaction_session_timeout = '1min'")
+                cur.execute("SET application_name = 'kai-solution-server'")
+                cur.close()
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (10)

224-237: Kill idle connections only on Postgres; otherwise skip to dispose/reinit

kill_idle_connections issues a Postgres-specific query. On SQLite/MySQL it will always fail, causing unnecessary dispose/reinit attempts.

 async def _recover_from_db_error() -> None:
     """Recover from database errors by killing idle connections or recreating engine."""
     if _SharedResources.engine is not None:
-        log("Recovering from database error - killing idle connections...")
-        try:
-            await kill_idle_connections(_SharedResources.engine)
-            log("Successfully killed idle connections")
-        except Exception as e:
-            log(f"Failed to kill idle connections: {e}")
-            log("Disposing and recreating engine...")
-            await _SharedResources.engine.dispose()
-            _SharedResources.initialized = False
-            await _initialize_shared_resources()
+        try:
+            if _SharedResources.engine.dialect.name == "postgresql":
+                log("Recovering from database error - killing idle connections...")
+                await kill_idle_connections(_SharedResources.engine)
+                log("Successfully killed idle connections")
+            else:
+                # Non-Postgres: immediately dispose and reinitialize
+                log("Non-Postgres engine; disposing and recreating engine...")
+                await _SharedResources.engine.dispose()
+                _SharedResources.initialized = False
+                await _initialize_shared_resources()
+        except Exception as ex:
+            log(f"DB recovery via kill/dispose failed: {ex}")
+            log("Force-disposing and recreating engine...")
+            await _SharedResources.engine.dispose()
+            _SharedResources.initialized = False
+            await _initialize_shared_resources()

282-284: Avoid catching blind Exception; remove unused variable

Use a broad catch only to log and re-raise; no need to bind e. Satisfies ruff BLE001/F841 and keeps behavior.

-    except Exception as e:
+    except Exception:
         log(f"Error in lifespan: {traceback.format_exc()}")
-        raise e
+        raise

421-446: Potential duplicate before-file rows; consider content hashing

The current logic inserts a new "before" DBFile when content differs. Consider deduplication via content hash or (client_id, uri, content) uniqueness to avoid unbounded growth.


447-470: After-files are always re-inserted (FIXME noted)

You always create and add a new after-file even if identical exists. This inflates rows and can confuse status tracking.

-        for file in after:
-            # FIXME: Something is fishy here...
-            next_after = DBFile(
-                client_id=client_id,
-                uri=file.uri,
-                content=file.content,
-                status=SolutionStatus.PENDING,
-                solution_before=set(),
-                solution_after=set(),
-            )
-
-            stmt = (
+        for file in after:
+            stmt = (
                 select(DBFile)
                 .where(
                     DBFile.client_id == client_id,
                     DBFile.uri == file.uri,
                 )
                 .order_by(DBFile.created_at.desc())
             )
-
-            db_after_files.add(next_after)
-            session.add(next_after)
+            prev_after = (await session.execute(stmt)).scalars().first()
+            if prev_after is None or prev_after.content != file.content:
+                next_after = DBFile(
+                    client_id=client_id,
+                    uri=file.uri,
+                    content=file.content,
+                    status=SolutionStatus.PENDING,
+                    solution_before=set(),
+                    solution_after=set(),
+                )
+                session.add(next_after)
+                db_after_files.add(next_after)
+            else:
+                db_after_files.add(prev_after)

796-819: Iteration over ORM relationship without ordering may be nondeterministic

If “best” means newest, relying on created_at sort is good, but ensure timezone-aware timestamps and DB index on (ruleset_name, violation_name, created_at DESC) to make this fast.


882-893: Side-effecty update_solution_status() implies missing DB-level derivation

If status is derived from file statuses, consider storing it as a computed property or database trigger to avoid inconsistent states and manual refreshes.


1007-1009: Log background task failures to avoid silent errors

Add a done-callback to surface exceptions from the background task.

-    if all_solutions_accepted_or_modified:
-        asyncio.create_task(generate_hint_v3(kai_ctx, client_id))  # type: ignore[unused-awaitable, unused-ignore]
+    if all_solutions_accepted_or_modified:
+        task = asyncio.create_task(generate_hint_v3(kai_ctx, client_id))
+        task.add_done_callback(lambda t: (ex := t.exception()) and log(f"generate_hint_v3 failed: {ex}"))

1026-1041: Consider triggering hint generation on rejection as well (business logic)

If rejection should prevent hint creation, fine. Otherwise, you may want to recompute success metrics or hints post-rejection similarly to acceptance.


191-197: Semaphore size: consider making it configurable and aligned with pool

Current limit (80) can still oversubscribe Postgres pool (pool_size=20, max_overflow=80). Suggest exposing max_concurrent_ops via env/config and setting default <= pool_size + max_overflow, or slightly below to leave headroom.

Also applies to: 129-167


51-85: Address ruff warnings (TRY003, BLE001, F841) holistically

  • F841: remove unused e as suggested above.
  • BLE001: narrow broad catches or re-raise without binding as suggested above.
  • TRY003: consider shorter messages or custom exception types where messages are long (non-blocking).

Also applies to: 210-210, 231-231, 255-259

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 39b3ca8 and 7e484e5.

📒 Files selected for processing (6)
  • .github/workflows/stress-test-mcp-server.yml (1 hunks)
  • kai_mcp_solution_server/Makefile (1 hunks)
  • kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (3 hunks)
  • kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (14 hunks)
  • kai_mcp_solution_server/tests/test_multiple_integration.py (5 hunks)
  • kai_mcp_solution_server/tools/deploy/podman-compose.yml (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (2)
kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (3)
  • kill_idle_connections (114-127)
  • Base (92-98)
  • get_async_engine (130-168)
kai_mcp_solution_server/src/kai_mcp_solution_server/constants.py (1)
  • log (9-10)
🪛 GitHub Check: Trunk Check
kai_mcp_solution_server/tools/deploy/podman-compose.yml

[failure] 12-12: yamllint(quoted-strings)
[new] string value is redundantly quoted with any quotes


[failure] 14-14: yamllint(quoted-strings)
[new] string value is redundantly quoted with any quotes


[failure] 14-14: yamllint(quoted-strings)
[new] string value is redundantly quoted with any quotes


[failure] 29-29: yamllint(quoted-strings)
[new] string value is redundantly quoted with any quotes


[failure] 31-31: checkov(CKV_SECRET_4)
[new] Basic Auth Credentials


[failure] 31-31: yamllint(quoted-strings)
[new] string value is redundantly quoted with any quotes


[failure] 49-49: yamllint(empty-values)
[new] empty value in block mapping

kai_mcp_solution_server/src/kai_mcp_solution_server/server.py

[failure] 63-63: ruff(F841)
[new] Local variable e is assigned to but never used

🪛 Checkov (3.2.334)
kai_mcp_solution_server/tools/deploy/podman-compose.yml

[medium] 31-32: Basic Auth Credentials

(CKV_SECRET_4)

🪛 Ruff (0.13.1)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py

51-51: Avoid specifying long messages outside the exception class

(TRY003)


63-63: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


210-210: Avoid specifying long messages outside the exception class

(TRY003)


231-231: Do not catch blind exception: Exception

(BLE001)


255-255: Avoid specifying long messages outside the exception class

(TRY003)


257-257: Avoid specifying long messages outside the exception class

(TRY003)


259-259: Avoid specifying long messages outside the exception class

(TRY003)

🪛 GitHub Actions: Check Types on PR
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py

[error] 1-1: Mypy failed. 35 errors reported in src/kai_mcp_solution_server/server.py while running './run_mypy.sh'. See pipeline log for full details.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Run e2e test (windows-latest, cmd, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (macos-latest, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (ubuntu-24.04, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (macos-13, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (ubuntu-22.04-arm, bash, ChatOpenAI, kai-test-generation)
🔇 Additional comments (4)
.github/workflows/stress-test-mcp-server.yml (1)

69-69: Bump to 200 clients looks good; verify runtime stays within 10 minutes

The higher default is aligned with the new concurrency controls. Ensure the job consistently completes under the step’s timeout at increased load.

If you want parity in the manual trigger, consider updating workflow_dispatch.inputs.num_clients.default to 200 as well.

kai_mcp_solution_server/tests/test_multiple_integration.py (3)

401-404: Async client task with event synchronization is a solid improvement

Moving off threads to pure asyncio with per-client readiness and coordinated release reduces flakiness and overhead.


610-617: Good: prevent deadlocks on failures by signaling readiness in except

Ensures the main gather doesn’t hang if a client fails before signaling.

Also applies to: 620-620


431-433: Per-client unique rules/violations avoids cross-talk

Scoping to test-ruleset/test-violation per client makes the metrics assertions deterministic under concurrency.

Also applies to: 504-506, 560-562

Comment on lines 114 to 128
async def kill_idle_connections(engine: AsyncEngine) -> None:
"""Kill all idle connections from this application to the database."""
async with engine.begin() as conn:
await conn.execute(
text(
"""
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE application_name = 'kai-solution-server'
AND state = 'idle'
AND pid != pg_backend_pid()
"""
)
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard Postgres-only termination and return affected count

Kill query is Postgres-specific; on SQLite/MySQL it will fail. Also, returning how many sessions were terminated is useful for observability.

-async def kill_idle_connections(engine: AsyncEngine) -> None:
-    """Kill all idle connections from this application to the database."""
-    async with engine.begin() as conn:
-        await conn.execute(
-            text(
-                """
-                SELECT pg_terminate_backend(pid)
-                FROM pg_stat_activity
-                WHERE application_name = 'kai-solution-server'
-                AND state = 'idle'
-                AND pid != pg_backend_pid()
-                """
-            )
-        )
+async def kill_idle_connections(engine: AsyncEngine) -> int:
+    """Kill all idle connections from this application to the database (Postgres only). Returns number terminated."""
+    if getattr(engine, "dialect", None) is None or engine.dialect.name != "postgresql":
+        return 0
+    async with engine.begin() as conn:
+        res = await conn.execute(
+            text(
+                """
+                SELECT pg_terminate_backend(pid)
+                FROM pg_stat_activity
+                WHERE application_name = 'kai-solution-server'
+                  AND state = 'idle'
+                  AND pid != pg_backend_pid()
+                """
+            )
+        )
+        # rowcount reflects number of rows returned/affected
+        return getattr(res, "rowcount", 0) or 0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def kill_idle_connections(engine: AsyncEngine) -> None:
"""Kill all idle connections from this application to the database."""
async with engine.begin() as conn:
await conn.execute(
text(
"""
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE application_name = 'kai-solution-server'
AND state = 'idle'
AND pid != pg_backend_pid()
"""
)
)
async def kill_idle_connections(engine: AsyncEngine) -> int:
"""Kill all idle connections from this application to the database (Postgres only). Returns number terminated."""
if getattr(engine, "dialect", None) is None or engine.dialect.name != "postgresql":
return 0
async with engine.begin() as conn:
res = await conn.execute(
text(
"""
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE application_name = 'kai-solution-server'
AND state = 'idle'
AND pid != pg_backend_pid()
"""
)
)
# rowcount reflects number of rows returned/affected
return getattr(res, "rowcount", 0) or 0

Comment on lines +639 to +636

# Wait for all clients to signal they're ready (operations complete, connections still open)
await asyncio.gather(*[event.wait() for event in ready_events])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix off-by-one when indexing ready_events

Tasks start at 1..N but ready_events is 0-indexed; this mismatches with later checks (which also expect client 0). Start from 0.

-        tasks = [
-            asyncio.create_task(client_task(i, ready_events[i - 1], release_event))
-            for i in range(1, NUM_TASKS + 1)
-        ]
+        tasks = [
+            asyncio.create_task(client_task(i, ready_events[i], release_event))
+            for i in range(NUM_TASKS)
+        ]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tasks = [
asyncio.create_task(client_task(i, ready_events[i - 1], release_event))
for i in range(1, NUM_TASKS + 1)
]
tasks = [
asyncio.create_task(client_task(i, ready_events[i], release_event))
for i in range(NUM_TASKS)
]
🤖 Prompt for AI Agents
In kai_mcp_solution_server/tests/test_multiple_integration.py around lines 633
to 636, the task creation uses range(1, NUM_TASKS + 1) which causes an
off-by-one when indexing ready_events (0-indexed) and mismatches later checks;
change the loop to iterate range(NUM_TASKS) and pass ready_events[i] (and the
client id i) so tasks are created for client ids 0..NUM_TASKS-1 and index into
ready_events correctly.

Signed-off-by: Fabian von Feilitzsch <[email protected]>
@fabianvf fabianvf force-pushed the solution-server-db-overhaul branch from e29aa70 to 69d58d5 Compare September 25, 2025 17:51
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (1)

297-304: Fix union-attr by asserting resources before use (quick patch option)

If you prefer not to add helper accessors, assert non-None before using session_maker/model in each function.

Example diffs (apply pattern to all similar functions):

 async def create_incident(
     kai_ctx: KaiSolutionServerContext,
@@
-    async with kai_ctx.session_maker.begin() as session:
+    assert kai_ctx.session_maker is not None
+    async with kai_ctx.session_maker.begin() as session:
 async def generate_hint_v1(
     kai_ctx: KaiSolutionServerContext,
@@
-    async with kai_ctx.session_maker.begin() as session:
+    assert kai_ctx.session_maker is not None
+    async with kai_ctx.session_maker.begin() as session:
@@
-            response = await kai_ctx.model.ainvoke(prompt)
+            assert kai_ctx.model is not None
+            response = await kai_ctx.model.ainvoke(prompt)

Repeat for: create_solution, generate_hint_v2, generate_hint_v3, delete_solution, get_best_hint, get_success_rate, accept_file, reject_file. This clears all “union-attr” errors in the pipeline.

Also applies to: 385-401, 515-523, 580-588, 659-669, 750-757, 790-797, 853-865, 936-944, 1025-1033

🧹 Nitpick comments (2)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (1)

224-237: Narrow broad exception in recovery path

Catching bare Exception (BLE001) hides unrelated bugs. Limit to DB/network-timeout exceptions; keep the dispose-and-recreate fallback.

-        try:
+        try:
             await kill_idle_connections(_SharedResources.engine)
             log("Successfully killed idle connections")
-        except Exception as e:
+        except (DBAPIError, OperationalError, SQLAlchemyTimeoutError, OSError) as e:
             log(f"Failed to kill idle connections: {e}")
             log("Disposing and recreating engine...")
             await _SharedResources.engine.dispose()
             _SharedResources.initialized = False
             await _initialize_shared_resources()

This satisfies BLE001 without reducing resiliency.

kai_mcp_solution_server/tools/deploy/podman-compose.yml (1)

31-31: Avoid committing inline DB credentials; prefer env_file or secret store

Inline DSN with password trips CKV_SECRET_4 and risks accidental leakage. Use an env_file or compose secrets and reference KAI_DB_DSN via environment.

Example:

-      KAI_DB_DSN: postgresql+asyncpg://kai_user:kai_password@postgres:5432/kai_db # trunk-ignore(checkov/CKV_SECRET_4)
+      KAI_DB_DSN: ${KAI_DB_DSN}

And provide KAI_DB_DSN via .env or secret manager.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7e484e5 and 69d58d5.

📒 Files selected for processing (3)
  • kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (14 hunks)
  • kai_mcp_solution_server/tests/test_multiple_integration.py (5 hunks)
  • kai_mcp_solution_server/tools/deploy/podman-compose.yml (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (2)
kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (3)
  • kill_idle_connections (114-127)
  • Base (92-98)
  • get_async_engine (130-168)
kai_mcp_solution_server/src/kai_mcp_solution_server/constants.py (1)
  • log (9-10)
🪛 Checkov (3.2.334)
kai_mcp_solution_server/tools/deploy/podman-compose.yml

[medium] 31-32: Basic Auth Credentials

(CKV_SECRET_4)

🪛 Ruff (0.13.1)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py

51-51: Avoid specifying long messages outside the exception class

(TRY003)


210-210: Avoid specifying long messages outside the exception class

(TRY003)


231-231: Do not catch blind exception: Exception

(BLE001)


255-255: Avoid specifying long messages outside the exception class

(TRY003)


257-257: Avoid specifying long messages outside the exception class

(TRY003)


259-259: Avoid specifying long messages outside the exception class

(TRY003)

🪛 GitHub Actions: Check Types on PR
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py

[error] 42-42: Function is missing a type annotation. [no-untyped-def]


[error] 49-49: Function is missing a return type annotation. [no-untyped-def]


[error] 49-49: Function is missing a type annotation for one or more arguments. [no-untyped-def]


[error] 166-166: Missing type parameters for generic type "async_sessionmaker". [type-arg]


[error] 247-247: Missing type parameters for generic type "async_sessionmaker". [type-arg]


[error] 297-297: Untyped decorator makes function "create_incident" untyped. [misc]


[error] 303-303: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 354-354: Returning Any from function declared to return "int". [no-any-return]


[error] 385-385: Untyped decorator makes function "create_solution" untyped. [misc]


[error] 401-401: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 504-504: Returning Any from function declared to return "int". [no-any-return]


[error] 515-515: Untyped decorator makes function "generate_hint_v1" untyped. [misc]


[error] 520-520: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 562-562: Item "None" of "BaseChatModel | None" has no attribute "ainvoke". [union-attr]


[error] 580-580: Untyped decorator makes function "generate_hint_v2" untyped. [misc]


[error] 586-586: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 641-641: Item "None" of "BaseChatModel | None" has no attribute "ainvoke". [union-attr]


[error] 659-659: Untyped decorator makes function "generate_hint_v3" untyped. [misc]


[error] 667-667: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 734-734: Item "None" of "BaseChatModel | None" has no attribute "ainvoke". [union-attr]


[error] 750-750: Untyped decorator makes function "delete_solution" untyped. [misc]


[error] 756-756: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 774-774: Returning Any from function declared to return "bool". [no-any-return]


[error] 790-790: Untyped decorator makes function "get_best_hint" untyped. [misc]


[error] 796-796: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 837-837: Returning Any from function declared to return "GetBestHintResult | None". [no-any-return]


[error] 853-853: Untyped decorator makes function "get_success_rate" untyped. [misc]


[error] 863-863: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 930-930: Returning Any from function declared to return "list[SuccessRateMetric] | None". [no-any-return]


[error] 936-936: Untyped decorator makes function "accept_file" untyped. [misc]


[error] 942-942: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 1018-1018: Returning Any from function declared to return "None". [no-any-return]


[error] 1025-1025: Untyped decorator makes function "reject_file" untyped. [misc]


[error] 1031-1031: Item "None" of "async_sessionmaker[Any] | None" has no attribute "begin". [union-attr]


[error] 1050-1050: Returning Any from function declared to return "None". [no-any-return]

🔇 Additional comments (4)
kai_mcp_solution_server/tests/test_multiple_integration.py (2)

399-405: Stabilize per-client identifiers for determinism

Using f"test-ruleset-{client_id}" and f"test-violation-{client_id}" is great. To ensure predictability across runs, keep client_id as the 0-based loop index (after the off‑by‑one fix) and keep the same UUID for client_id per task.

Please verify the three “verify hints were generated…” checks at lines 683–687 still match client IDs 0,1,2 after the loop fix.

Also applies to: 418-436, 456-477, 496-507, 552-563


626-664: Fix off-by-one in task creation and event indexing

Clients are numbered 1..N but ready_events is 0-indexed, causing mismatches. Start from 0 and align exception reporting.

-        # Launch all client tasks concurrently
-        tasks = [
-            asyncio.create_task(client_task(i, ready_events[i - 1], release_event))
-            for i in range(1, NUM_TASKS + 1)
-        ]
+        # Launch all client tasks concurrently (0..NUM_TASKS-1)
+        tasks = [
+            asyncio.create_task(client_task(i, ready_events[i], release_event))
+            for i in range(NUM_TASKS)
+        ]
@@
-        exceptions = [
-            (i + 1, r) for i, r in enumerate(results) if isinstance(r, Exception)
-        ]
+        exceptions = [(i, r) for i, r in enumerate(results) if isinstance(r, Exception)]

This aligns later checks that expect clients 0,1,2.

kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (2)

19-20: Type the async session maker (and import AsyncSession) to fix type-arg errors

These lines trigger “Missing type parameters for generic type async_sessionmaker” and cause downstream union-attr errors. Import AsyncSession and parameterize async_sessionmaker.

-from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
+from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
@@
-    session_maker: async_sessionmaker | None = None
+    session_maker: async_sessionmaker[AsyncSession] | None = None
@@
-        self.session_maker: async_sessionmaker | None = None
+        self.session_maker: async_sessionmaker[AsyncSession] | None = None

Also update any other occurrences of async_sessionmaker without type args. Based on pipeline failures.

Also applies to: 166-167, 247-248


191-197: Don’t recreate the semaphore during recovery/rehydration

Reinitializing the semaphore can strand waiters and break in-flight concurrency limits. Create once if None.

-        # Initialize semaphore to limit concurrent DB operations
-        _SharedResources.db_semaphore = asyncio.Semaphore(
-            _SharedResources.max_concurrent_ops
-        )
-        log(
-            f"DB operation semaphore initialized with limit: {_SharedResources.max_concurrent_ops}"
-        )
+        # Initialize semaphore to limit concurrent DB operations (create once)
+        if _SharedResources.db_semaphore is None:
+            _SharedResources.db_semaphore = asyncio.Semaphore(
+                _SharedResources.max_concurrent_ops
+            )
+            log(
+                f"DB operation semaphore initialized with limit: {_SharedResources.max_concurrent_ops}"
+            )

This avoids leaking/conflating semaphores across engine resets. Based on past review comments.

Comment on lines 42 to 87
def with_db_recovery(func):
"""Decorator to execute database operations with automatic recovery on connection errors.
Uses a semaphore to limit concurrent DB operations and prevent pool exhaustion.
Implements exponential backoff with retry on connection errors.
"""

async def wrapper(kai_ctx: KaiSolutionServerContext, *args, **kwargs):
if _SharedResources.db_semaphore is None:
raise RuntimeError("Database semaphore not initialized")

# Semaphore ensures we don't overwhelm the connection pool
async with _SharedResources.db_semaphore:
max_retries = 3
base_delay = 0.1 # 100ms base delay

for attempt in range(max_retries):
try:
return await func(kai_ctx, *args, **kwargs)
except IntegrityError:
raise
except SQLAlchemyTimeoutError:
if attempt < max_retries - 1:
delay = base_delay * (2**attempt) # Exponential backoff
log(
f"Connection pool timeout (attempt {attempt + 1}), retrying in {delay}s..."
)
await asyncio.sleep(delay)
await _recover_from_db_error()
else:
log(f"Connection pool exhausted after {max_retries} attempts")
raise
except (DBAPIError, OperationalError) as e:
if attempt < max_retries - 1:
delay = base_delay * (2**attempt)
log(
f"Database error (attempt {attempt + 1}): {e}, retrying in {delay}s..."
)
await asyncio.sleep(delay)
await _recover_from_db_error()
await kai_ctx.create()
else:
log(f"Database error after {max_retries} attempts, giving up")
raise

return wrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Make the decorator fully typed and preserve function metadata; ensure ctx resources are ready

The untyped decorator makes all decorated functions “untyped” and causes multiple return-type “Any” errors. Type it using ParamSpec/TypeVar, use functools.wraps, and ensure the context has resources before first attempt.

+import functools
@@
-def with_db_recovery(func):
+from typing import Callable, Coroutine, TypeVar, ParamSpec, Concatenate
+
+P = ParamSpec("P")
+R = TypeVar("R")
+
+def with_db_recovery(
+    func: Callable[
+        Concatenate["KaiSolutionServerContext", P],
+        Coroutine[Any, Any, R],
+    ]
+) -> Callable[Concatenate["KaiSolutionServerContext", P], Coroutine[Any, Any, R]]:
@@
-    async def wrapper(kai_ctx: KaiSolutionServerContext, *args, **kwargs):
+    @functools.wraps(func)
+    async def wrapper(
+        kai_ctx: "KaiSolutionServerContext", *args: P.args, **kwargs: P.kwargs
+    ) -> R:
         if _SharedResources.db_semaphore is None:
             raise RuntimeError("Database semaphore not initialized")
 
+        # Ensure per-request context has references to initialized shared resources
+        if kai_ctx.session_maker is None or kai_ctx.model is None:
+            await kai_ctx.create()
+
         # Semaphore ensures we don't overwhelm the connection pool
         async with _SharedResources.db_semaphore:
             max_retries = 3
             base_delay = 0.1  # 100ms base delay
@@
-                    return await func(kai_ctx, *args, **kwargs)
+                    return await func(kai_ctx, *args, **kwargs)

Add missing imports near the top of the file:

-from typing import Annotated, Any, cast
+from typing import Annotated, Any, cast, Callable, Coroutine, TypeVar, ParamSpec, Concatenate
+import functools

This resolves: no-untyped-def, misc (untyped decorator), and no-any-return. Based on pipeline failures.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.13.1)

51-51: Avoid specifying long messages outside the exception class

(TRY003)

🪛 GitHub Actions: Check Types on PR

[error] 42-42: Function is missing a type annotation. [no-untyped-def]


[error] 49-49: Function is missing a return type annotation. [no-untyped-def]


[error] 49-49: Function is missing a type annotation for one or more arguments. [no-untyped-def]

Copy link
Contributor

@JonahSussman JonahSussman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big fan of decorators

@fabianvf fabianvf added the cherry-pick/release-0.8 This PR should be cherry-picked to release-0.8 branch label Sep 29, 2025
Signed-off-by: Fabian von Feilitzsch <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (2)

784-792: Authorization gap: client_id is ignored in delete_solution.

This allows deleting any solution by ID regardless of ownership. Validate ownership before delete. This also resolves the “unused argument: client_id” warning.

     async with kai_ctx.session_maker.begin() as session:
         sln = await session.get(DBSolution, solution_id)
         if sln is None:
             return False
+        if sln.client_id != client_id:
+            # Do not allow deleting solutions owned by a different client
+            return False
         await session.delete(sln)
     return True

286-289: Redact secrets from settings logging.

llm_params often includes API keys/tokens; logging the full settings risks secret leakage. Log a redacted view instead.

-        log(f"Settings: {settings.model_dump_json(indent=2)}")
+        # Avoid logging secrets (DB DSN/LLM params)
+        safe = settings.model_dump()
+        if "llm_params" in safe and isinstance(safe["llm_params"], dict):
+            safe["llm_params"] = {
+                k: ("***" if any(s in k.lower() for s in ("key", "token", "secret", "password")) else v)
+                for k, v in safe["llm_params"].items()
+            }
+        try:
+            # SQLAlchemy URL can render with hidden password
+            safe["db_dsn"] = settings.db_dsn.render_as_string(hide_password=True)  # type: ignore[attr-defined]
+        except Exception:
+            safe["db_dsn"] = str(settings.db_dsn)
+        log(f"Settings (redacted): {json.dumps(safe, indent=2)}")
🧹 Nitpick comments (4)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (4)

896-906: Fix generator passed to or_; remove the type ignore.

sqlalchemy.or_ expects variadic clauses, not a single generator. Expand with * (or build a list).

-        violations_where = or_(  # type: ignore[arg-type]
-            and_(
-                DBViolation.ruleset_name == violation_id.ruleset_name,
-                DBViolation.violation_name == violation_id.violation_name,
-            )
-            for violation_id in violation_ids
-        )
+        violations_where = or_(
+            *[
+                and_(
+                    DBViolation.ruleset_name == violation_id.ruleset_name,
+                    DBViolation.violation_name == violation_id.violation_name,
+                )
+                for violation_id in violation_ids
+            ]
+        )

1011-1016: Remove duplicate session.add(new_file).

new_file is added before the loop; adding it again per-iteration is redundant.

             for solution, old_file in files_to_update:
                 # Remove the old file from the solution
                 solution.after.remove(old_file)
                 solution.after.add(new_file)
 
-                session.add(new_file)
                 session.add(solution)

237-250: Broad exception in recovery; consider narrowing to SQLAlchemy errors.

Catching Exception may hide programming errors. Prefer SQLAlchemyError (and maybe asyncio.TimeoutError) to control the fallback path.

-        except Exception as e:
+        except Exception as e:  # consider: from sqlalchemy import SQLAlchemyError
             log(f"Failed to kill idle connections: {e}")
             log("Disposing and recreating engine...")
             await _SharedResources.engine.dispose()
             _SharedResources.initialized = False
             await _initialize_shared_resources()

If you keep Exception, add a comment justifying the breadth.


316-352: Optional: ensure IDs are assigned before returning.

Relying on commit-time flush for incident.id is usually fine, but an explicit await session.flush() before exiting the context makes it deterministic.

         session.add(incident)
 
-    return incident.id
+        await session.flush()
+    return incident.id
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 69d58d5 and 88d02fa.

📒 Files selected for processing (1)
  • kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (20 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (4)
kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (3)
  • kill_idle_connections (114-127)
  • Base (92-98)
  • get_async_engine (130-168)
kai_mcp_solution_server/src/kai_mcp_solution_server/constants.py (1)
  • log (9-10)
kai_mcp_solution_server/src/kai_mcp_solution_server/analyzer_types.py (1)
  • ExtendedIncident (87-106)
kai_mcp_solution_server/src/kai_mcp_solution_server/db/python_objects.py (1)
  • SolutionFile (7-9)
🪛 Ruff (0.13.1)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py

60-60: Avoid specifying long messages outside the exception class

(TRY003)


98-98: Avoid specifying long messages outside the exception class

(TRY003)


223-223: Avoid specifying long messages outside the exception class

(TRY003)


244-244: Do not catch blind exception: Exception

(BLE001)


268-268: Avoid specifying long messages outside the exception class

(TRY003)


270-270: Avoid specifying long messages outside the exception class

(TRY003)


272-272: Avoid specifying long messages outside the exception class

(TRY003)


317-317: Avoid specifying long messages outside the exception class

(TRY003)


417-417: Avoid specifying long messages outside the exception class

(TRY003)


538-538: Avoid specifying long messages outside the exception class

(TRY003)


582-582: Avoid specifying long messages outside the exception class

(TRY003)


608-608: Avoid specifying long messages outside the exception class

(TRY003)


665-665: Avoid specifying long messages outside the exception class

(TRY003)


693-693: Avoid specifying long messages outside the exception class

(TRY003)


762-762: Avoid specifying long messages outside the exception class

(TRY003)


782-782: Unused function argument: client_id

(ARG001)


786-786: Avoid specifying long messages outside the exception class

(TRY003)


828-828: Avoid specifying long messages outside the exception class

(TRY003)


897-897: Avoid specifying long messages outside the exception class

(TRY003)


978-978: Avoid specifying long messages outside the exception class

(TRY003)


1069-1069: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Run e2e test (windows-latest, cmd, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (ubuntu-22.04-arm, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (ubuntu-24.04, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (macos-latest, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (macos-13, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Check Types on PR Runner (ubuntu-latest, 3.12, x64)
  • GitHub Check: Stress Test with PostgreSQL
🔇 Additional comments (3)
kai_mcp_solution_server/src/kai_mcp_solution_server/server.py (3)

252-278: Add non-None accessors to narrow optional resources (cleaner call sites, fewer union-attr issues).

This avoids repeated checks and improves type safety at call sites.

 class KaiSolutionServerContext:
@@
     def __init__(self, settings: SolutionServerSettings) -> None:
         self.settings = settings
-        self.lock = asyncio.Lock()
         # References to shared resources (set in create())
         self.engine: AsyncEngine | None = None
         self.session_maker: async_sessionmaker[AsyncSession] | None = None
         self.model: BaseChatModel | None = None
@@
         self.model = _SharedResources.model
+
+    def require_session_maker(self) -> async_sessionmaker[AsyncSession]:
+        if self.session_maker is None:
+            raise RuntimeError("Session maker not initialized; call create() first")
+        return self.session_maker
+
+    def require_model(self) -> BaseChatModel:
+        if self.model is None:
+            raise RuntimeError("Model not initialized; call create() first")
+        return self.model

Example usage (apply across call sites):

-    if kai_ctx.session_maker is None:
-        raise RuntimeError("Session maker not initialized")
-    async with kai_ctx.session_maker.begin() as session:
+    async with kai_ctx.require_session_maker().begin() as session:
-    if kai_ctx.model is None:
-        raise RuntimeError("Model not initialized")
-    response = await kai_ctx.model.ainvoke(prompt)
+    response = await kai_ctx.require_model().ainvoke(prompt)

9-9: Tighten decorator typing with ParamSpec + Concatenate; preserve wraps.

Improves inference for the decorated functions and removes “untyped decorator” fallout.

-from typing import Annotated, Any, ParamSpec, TypeVar, cast
+from typing import Annotated, Any, ParamSpec, TypeVar, cast, Concatenate
-def with_db_recovery(
-    func: Callable[..., Coroutine[Any, Any, T]]
-) -> Callable[..., Coroutine[Any, Any, T]]:
+def with_db_recovery(
+    func: Callable[Concatenate["KaiSolutionServerContext", P], Coroutine[Any, Any, T]],
+) -> Callable[Concatenate["KaiSolutionServerContext", P], Coroutine[Any, Any, T]]:
-    async def wrapper(
-        kai_ctx: KaiSolutionServerContext, *args: Any, **kwargs: Any
-    ) -> T:
+    async def wrapper(
+        kai_ctx: "KaiSolutionServerContext", *args: P.args, **kwargs: P.kwargs
+    ) -> T:

Also applies to: 46-48, 55-58


204-210: Do not recreate the semaphore on re-init; create it once to avoid stranding waiters.

Reinitializing _SharedResources.db_semaphore can leave tasks waiting on the old instance forever and break concurrency limits during recovery. Create it only if None.

-        # Initialize semaphore to limit concurrent DB operations
-        _SharedResources.db_semaphore = asyncio.Semaphore(
-            _SharedResources.max_concurrent_ops
-        )
-        log(
-            f"DB operation semaphore initialized with limit: {_SharedResources.max_concurrent_ops}"
-        )
+        # Initialize semaphore to limit concurrent DB operations (create once)
+        if _SharedResources.db_semaphore is None:
+            _SharedResources.db_semaphore = asyncio.Semaphore(
+                _SharedResources.max_concurrent_ops
+            )
+            log(
+                f"DB operation semaphore initialized with limit: {_SharedResources.max_concurrent_ops}"
+            )

Comment on lines +72 to +83
except SQLAlchemyTimeoutError:
if attempt < max_retries - 1:
delay = base_delay * (2**attempt) # Exponential backoff
log(
f"Connection pool timeout (attempt {attempt + 1}), retrying in {delay}s..."
)
await asyncio.sleep(delay)
await _recover_from_db_error()
else:
log(f"Connection pool exhausted after {max_retries} attempts")
raise
except (DBAPIError, OperationalError) as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Refresh per-request context after recovering from pool timeouts.

On SQLAlchemyTimeoutError, you recover but don’t refresh kai_ctx. If _recover_from_db_error() disposes/recreates the engine, the old session_maker in kai_ctx can point to a disposed engine, causing subsequent failures. Add await kai_ctx.create() after recovery.

                 except SQLAlchemyTimeoutError:
                     if attempt < max_retries - 1:
                         delay = base_delay * (2**attempt)  # Exponential backoff
                         log(
                             f"Connection pool timeout (attempt {attempt + 1}), retrying in {delay}s..."
                         )
                         await asyncio.sleep(delay)
-                        await _recover_from_db_error()
+                        await _recover_from_db_error()
+                        await kai_ctx.create()
                     else:
                         log(f"Connection pool exhausted after {max_retries} attempts")
                         raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except SQLAlchemyTimeoutError:
if attempt < max_retries - 1:
delay = base_delay * (2**attempt) # Exponential backoff
log(
f"Connection pool timeout (attempt {attempt + 1}), retrying in {delay}s..."
)
await asyncio.sleep(delay)
await _recover_from_db_error()
else:
log(f"Connection pool exhausted after {max_retries} attempts")
raise
except (DBAPIError, OperationalError) as e:
except SQLAlchemyTimeoutError:
if attempt < max_retries - 1:
delay = base_delay * (2**attempt) # Exponential backoff
log(
f"Connection pool timeout (attempt {attempt + 1}), retrying in {delay}s..."
)
await asyncio.sleep(delay)
await _recover_from_db_error()
await kai_ctx.create()
else:
log(f"Connection pool exhausted after {max_retries} attempts")
raise
except (DBAPIError, OperationalError) as e:
🤖 Prompt for AI Agents
In kai_mcp_solution_server/src/kai_mcp_solution_server/server.py around lines 72
to 83, the SQLAlchemyTimeoutError handling calls _recover_from_db_error() but
does not refresh the per-request kai_ctx; if the engine/sessionmaker was
disposed and recreated, kai_ctx.session_maker will still point to the old
engine. After awaiting _recover_from_db_error(), call await kai_ctx.create() to
recreate the session_maker in the current request context (ensure kai_ctx is in
scope and awaitable), so subsequent retries use the fresh engine; keep this only
in the retry path before the next sleep/attempt.

Signed-off-by: Fabian von Feilitzsch <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (2)

161-166: Pool tuning LGTM; consider minor QoL knobs

  • pool_timeout=60 and explicit pool_reset_on_return="rollback" are reasonable (the latter matches the 2.x default).
  • Optional: add pool_use_lifo=True to favor hot connections under bursty load; and/or make pool_timeout configurable via env for ops flexibility.
         engine = create_async_engine(
             url,
             pool_size=20,
             max_overflow=80,
             pool_timeout=60,
             pool_recycle=3600,
             pool_pre_ping=True,
             echo_pool=False,
             pool_reset_on_return="rollback",
+            pool_use_lifo=True,
         )

168-179: Silence Ruff ARG001 and use a context manager for the cursor

Rename the unused arg and ensure the cursor always closes, even on error.

-        @event.listens_for(engine.sync_engine, "connect")
-        def _set_pg_timeouts(dbapi_conn: Any, conn_record: Any) -> None:
-            cur = dbapi_conn.cursor()
-            cur.execute("SET idle_session_timeout = '1min'")
-            cur.execute("SET idle_in_transaction_session_timeout = '1min'")
-            cur.execute("SET application_name = 'kai-solution-server'")
-            cur.close()
+        @event.listens_for(engine.sync_engine, "connect")
+        def _set_pg_timeouts(dbapi_conn: Any, _conn_record: Any) -> None:
+            with dbapi_conn.cursor() as cur:
+                cur.execute("SET idle_session_timeout = '1min'")
+                cur.execute("SET idle_in_transaction_session_timeout = '1min'")
+                cur.execute("SET application_name = 'kai-solution-server'")

As a separate optional tweak, consider SET statement_timeout/lock_timeout if long-running queries aren’t expected.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 88d02fa and 980f54a.

📒 Files selected for processing (1)
  • kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (3 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py

173-173: Unused function argument: conn_record

(ARG001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Run e2e test (ubuntu-22.04-arm, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (macos-latest, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (windows-latest, cmd, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (macos-13, bash, ChatOpenAI, kai-test-generation)
  • GitHub Check: Run e2e test (ubuntu-24.04, bash, ChatOpenAI, kai-test-generation)
🔇 Additional comments (2)
kai_mcp_solution_server/src/kai_mcp_solution_server/db/dao.py (2)

19-20: Import of text looks good

Required for the new admin query. No concerns.


114-136: Return terminated count, cover “idle in transaction”, and parameterize application_name

Nice Postgres guard. Two improvements:

  • Return how many backends were terminated for observability.
  • Include “idle in transaction” variants (state LIKE 'idle%').
  • Bind application_name as a param and scope to current DB/user for safety.

Apply:

-async def kill_idle_connections(engine: AsyncEngine) -> None:
-    """Kill all idle connections from this application to the database.
+async def kill_idle_connections(engine: AsyncEngine) -> int:
+    """Kill all idle connections from this application to the database (PostgreSQL only). Returns number terminated.
@@
-    if engine.dialect.name != "postgresql":
-        # Silently skip for non-PostgreSQL databases
-        return
+    if engine.dialect.name != "postgresql":
+        return 0
@@
-    async with engine.begin() as conn:
-        await conn.execute(
-            text(
-                """
-                SELECT pg_terminate_backend(pid)
-                FROM pg_stat_activity
-                WHERE application_name = 'kai-solution-server'
-                AND state = 'idle'
-                AND pid != pg_backend_pid()
-                """
-            )
-        )
+    async with engine.begin() as conn:
+        res = await conn.scalars(
+            text(
+                """
+                SELECT pg_terminate_backend(pid) AS terminated
+                FROM pg_stat_activity
+                WHERE application_name = :app_name
+                  AND datname = current_database()
+                  AND usename = current_user
+                  AND state ILIKE 'idle%%'
+                  AND pid <> pg_backend_pid()
+                """
+            ),
+            {"app_name": "kai-solution-server"},
+        )
+        flags = await res.all()
+        return sum(1 for ok in flags if ok)

Based on learnings (SQLAlchemy 2.0.x async: prefer scalars(...).all() for SELECT results).

@fabianvf fabianvf merged commit 0e3f474 into konveyor:main Sep 29, 2025
15 checks passed
fabianvf added a commit to fabianvf/kai that referenced this pull request Sep 29, 2025
…o limit concurrent operations (konveyor#878)

* 🐛 Avoid creating unneeded extra DB connections, and add a semaphore to limit concurrent operations

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Bump stress test to 200

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Fix trunk issues

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Fix type errors

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Guard pg-specific operations

Signed-off-by: Fabian von Feilitzsch <[email protected]>

---------

Signed-off-by: Fabian von Feilitzsch <[email protected]>
fabianvf added a commit that referenced this pull request Sep 30, 2025
…o limit concurrent operations (#878)

* 🐛 Avoid creating unneeded extra DB connections, and add a semaphore to limit concurrent operations

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Bump stress test to 200

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Fix trunk issues

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Fix type errors

Signed-off-by: Fabian von Feilitzsch <[email protected]>

* Guard pg-specific operations

Signed-off-by: Fabian von Feilitzsch <[email protected]>

---------

Signed-off-by: Fabian von Feilitzsch <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cherry-pick/release-0.8 This PR should be cherry-picked to release-0.8 branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants