-
Notifications
You must be signed in to change notification settings - Fork 1.5k
chore(wren-ai-service): minor updates #1294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes update session state handling, SQL generation reasoning, and asynchronous streaming in the application. A new session state variable ( Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Router
participant AskService
participant SQLGenerationReasoning
Client->>Router: Send ask request
Router->>AskService: Invoke ask(request, query_id)
AskService->>SQLGenerationReasoning: Run query with query_id
SQLGenerationReasoning-->>AskService: Process streaming callback (chunk)
AskService->>SQLGenerationReasoning: Request streaming results (query_id)
SQLGenerationReasoning-->>AskService: Return streaming results
AskService-->>Router: Provide complete response
Router-->>Client: Deliver response
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
wren-ai-service/src/pipelines/generation/sql_generation_reasoning.py (3)
30-33: Potential confusion in the instructions.
Line 30 says not to include triple backticks, but line 33 directs returning plain Markdown. Consider clarifying that the output should be valid Markdown text without triple backtick delimitation, to avoid mixed signals.
81-83: Consider adding a docstring or type hint for clarity.
While this function signature is straightforward, adding a brief docstring and mentioning the expected shape of the returned dictionary can improve maintainability.
104-104: Ensure thread-safe handling of_user_queues.
A plain dictionary may risk key conflicts in concurrent usage. If concurrency is expected, consider using an asyncio-safe data structure, or ensure calls are properly synchronized.wren-ai-service/src/web/v1/services/question_recommendation.py (1)
84-89: Consider passingquery_idto the SQL Generation Reasoning pipeline.
Although this runs fine, passingquery_idensures better traceability, particularly if you intend to handle or stream partial reasoning results for each question in the future.await self._pipelines["sql_generation_reasoning"].run( query=candidate["question"], contexts=table_ddls, configuration=configuration, + query_id=request_id, ).get("post_process", {})
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
wren-ai-service/demo/app.py(1 hunks)wren-ai-service/demo/utils.py(6 hunks)wren-ai-service/src/pipelines/generation/sql_generation_reasoning.py(6 hunks)wren-ai-service/src/web/v1/routers/ask.py(1 hunks)wren-ai-service/src/web/v1/services/ask.py(2 hunks)wren-ai-service/src/web/v1/services/question_recommendation.py(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: pytest
- GitHub Check: Analyze (go)
🔇 Additional comments (16)
wren-ai-service/demo/app.py (1)
43-44: LGTM!The new session state variable
retrieved_tablesis properly initialized following the same pattern as other session state variables.wren-ai-service/demo/utils.py (7)
184-184: LGTM!The function signature change improves the handling of SQL generation reasoning by making it explicit through the parameter.
Also applies to: 186-186
226-228: LGTM!The new section properly displays retrieved tables from session state.
230-230: LGTM!The SQL generation reasoning handling is properly updated to use the changed value from the text area.
Also applies to: 238-242
247-247: LGTM!The label visibility is properly set for better UI experience.
Also applies to: 254-254
534-534: LGTM!The session state is properly reset to maintain consistency.
587-589: LGTM!The retrieved tables are properly joined with commas for better readability.
676-688: LGTM!The new function properly handles streaming responses using Server-Sent Events (SSE) and follows the same pattern as
display_sql_answer.wren-ai-service/src/pipelines/generation/sql_generation_reasoning.py (6)
1-1: Import looks good.
Usingasynciowill enable the asynchronous callbacks and streaming operations. No issues found here.
95-95: Simplified model kwargs.
Declaring a cleartextresponse_format is coherent with the rest of the pipeline usage. Good addition.
109-109: Callback assignment looks consistent.
No concerns—tying_streaming_callbackinto the generator maintains a straightforward streaming mechanism.
120-129: Check concurrency in_streaming_callback.
Multiple coroutines may try to create or put items in the queue concurrently. Ensure that the dictionary manipulations and queue operations do not cause race conditions. Consider applying locks or verifying single-threaded usage under event loops.
162-163: Optionalquery_idparameter is well introduced.
This addition nicely aligns with the streaming callback logic. No issues found.
172-172: Wiringquery_idinto inputs.
Good job consistently populating the pipeline withquery_id. No issues found.wren-ai-service/src/web/v1/services/ask.py (2)
355-362: LGTM! Clean integration of query_id parameter.The addition of the
query_idparameter to the SQL generation reasoning pipeline call maintains consistency with other pipeline calls and improves request tracking.
548-564: LGTM! Well-structured streaming enhancement.The streaming enhancement for the "planning" status follows the established pattern, includes proper null checks, and cleanly integrates with the existing SSE event system.
| generate_sql_reasoning: dict, | ||
| ) -> dict: | ||
| return orjson.loads(generate_sql_reasoning.get("replies")[0]) | ||
| return generate_sql_reasoning.get("replies")[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against empty or missing replies.
Accessing generate_sql_reasoning.get("replies")[0] can raise an IndexError if replies is empty. Add a safety check to avoid runtime failures.
-return generate_sql_reasoning.get("replies")[0]
+replies = generate_sql_reasoning.get("replies")
+if not replies:
+ return {}
+return replies[0]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return generate_sql_reasoning.get("replies")[0] | |
| replies = generate_sql_reasoning.get("replies") | |
| if not replies: | |
| return {} | |
| return replies[0] |
| async def get_streaming_results(self, query_id): | ||
| async def _get_streaming_results(query_id): | ||
| return await self._user_queues[query_id].get() | ||
|
|
||
| if query_id not in self._user_queues: | ||
| self._user_queues[ | ||
| query_id | ||
| ] = asyncio.Queue() # Ensure the user's queue exists | ||
| while True: | ||
| try: | ||
| # Wait for an item from the user's queue | ||
| self._streaming_results = await asyncio.wait_for( | ||
| _get_streaming_results(query_id), timeout=120 | ||
| ) | ||
| if ( | ||
| self._streaming_results == "<DONE>" | ||
| ): # Check for end-of-stream signal | ||
| del self._user_queues[query_id] | ||
| break | ||
| if self._streaming_results: # Check if there are results to yield | ||
| yield self._streaming_results | ||
| self._streaming_results = "" # Clear after yielding | ||
| except TimeoutError: | ||
| break | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid storing streaming data in a shared instance property.
Using self._streaming_results can cause concurrency issues if multiple queries are handled simultaneously, as they would overwrite each other's result state. Use a local variable or wrap the logic in a dedicated queue read instead.
- self._streaming_results = await asyncio.wait_for(
- _get_streaming_results(query_id), timeout=120
- )
- if self._streaming_results == "<DONE>":
+ next_chunk = await asyncio.wait_for(
+ _get_streaming_results(query_id), timeout=120
+ )
+ if next_chunk == "<DONE>":
...
- if self._streaming_results:
- yield self._streaming_results
- self._streaming_results = ""
+ if next_chunk:
+ yield next_chunk📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def get_streaming_results(self, query_id): | |
| async def _get_streaming_results(query_id): | |
| return await self._user_queues[query_id].get() | |
| if query_id not in self._user_queues: | |
| self._user_queues[ | |
| query_id | |
| ] = asyncio.Queue() # Ensure the user's queue exists | |
| while True: | |
| try: | |
| # Wait for an item from the user's queue | |
| self._streaming_results = await asyncio.wait_for( | |
| _get_streaming_results(query_id), timeout=120 | |
| ) | |
| if ( | |
| self._streaming_results == "<DONE>" | |
| ): # Check for end-of-stream signal | |
| del self._user_queues[query_id] | |
| break | |
| if self._streaming_results: # Check if there are results to yield | |
| yield self._streaming_results | |
| self._streaming_results = "" # Clear after yielding | |
| except TimeoutError: | |
| break | |
| async def get_streaming_results(self, query_id): | |
| async def _get_streaming_results(query_id): | |
| return await self._user_queues[query_id].get() | |
| if query_id not in self._user_queues: | |
| self._user_queues[ | |
| query_id | |
| ] = asyncio.Queue() # Ensure the user's queue exists | |
| while True: | |
| try: | |
| # Wait for an item from the user's queue | |
| next_chunk = await asyncio.wait_for( | |
| _get_streaming_results(query_id), timeout=120 | |
| ) | |
| if next_chunk == "<DONE>": # Check for end-of-stream signal | |
| del self._user_queues[query_id] | |
| break | |
| if next_chunk: # Check if there are results to yield | |
| yield next_chunk | |
| except TimeoutError: | |
| break |
| service_metadata: ServiceMetadata = Depends(get_service_metadata), | ||
| ) -> AskResponse: | ||
| query_id = str(uuid.uuid4()) | ||
| query_id = "c1b011f4-1360-43e0-9c20-22c0c6025206" # str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a static UUID can cause collisions.
By hardcoding query_id, all new queries share the same ID, which breaks traceability and concurrency. Return to a dynamic UUID or otherwise ensure uniqueness if you need per-request separation.
- query_id = "c1b011f4-1360-43e0-9c20-22c0c6025206" # str(uuid.uuid4())
+ query_id = str(uuid.uuid4())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| query_id = "c1b011f4-1360-43e0-9c20-22c0c6025206" # str(uuid.uuid4()) | |
| query_id = str(uuid.uuid4()) |
paopa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, wait for ci done.
Summary by CodeRabbit
New Features
Bug Fixes