Skip to content

Commit e9dc304

Browse files
authored
fix: threading approach in run_async_in_sync didn't wait for all tasks to complete (#487)
## Description Fix the error when running tasks in threads ## PR Type <!-- Delete the types that don't apply --!> 🐛 Bug Fix ## Relevant issues <!-- e.g. "Fixes #123" --> ## Checklist - [x] I have added unit tests that prove my fix/feature works - [x] New and existing tests pass locally - [x] Documentation was updated where necessary - [x] I have read and followed the [contribution guidelines](https://github.com/mozilla-ai/any-llm/blob/main/CONTRIBUTING.md)```
1 parent 2b3cd17 commit e9dc304

File tree

2 files changed

+54
-1
lines changed

2 files changed

+54
-1
lines changed

src/any_llm/utils/aio.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,32 @@ def run_async_in_sync(coro: Coroutine[Any, Any, T], allow_running_loop: bool = T
4242
# If we get here, there's a loop running, so we can't use run_until_complete()
4343
# or asyncio.run() - must use threading approach
4444
def run_in_thread() -> T:
45-
return asyncio.run(coro)
45+
async def run_with_cleanup() -> T:
46+
try:
47+
result = await coro
48+
49+
# Wait for any pending background tasks to complete to prevent "Event loop is closed" errors
50+
pending_tasks = [
51+
task for task in asyncio.all_tasks() if not task.done() and task is not asyncio.current_task()
52+
]
53+
54+
if pending_tasks:
55+
await asyncio.gather(*pending_tasks, return_exceptions=True)
56+
except Exception:
57+
pending_tasks = [
58+
task for task in asyncio.all_tasks() if not task.done() and task is not asyncio.current_task()
59+
]
60+
61+
for task in pending_tasks:
62+
task.cancel()
63+
64+
if pending_tasks:
65+
await asyncio.gather(*pending_tasks, return_exceptions=True)
66+
67+
raise
68+
return result
69+
70+
return asyncio.run(run_with_cleanup())
4671

4772
with concurrent.futures.ThreadPoolExecutor() as executor:
4873
return executor.submit(run_in_thread).result()

tests/unit/test_utils.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import asyncio
2+
3+
from any_llm.utils.aio import run_async_in_sync
4+
5+
6+
def test_run_async_in_sync_fails_with_background_task_state() -> None:
7+
task_completed = {"value": False}
8+
9+
async def operation_with_critical_background_task() -> str:
10+
"""Simulates an operation where a background task MUST complete for success."""
11+
12+
async def critical_background_work() -> None:
13+
await asyncio.sleep(0.02)
14+
task_completed["value"] = True
15+
16+
task = asyncio.create_task(critical_background_work())
17+
assert task is not None
18+
return "operation_started"
19+
20+
async def test_in_streamlit_context() -> None:
21+
task_completed["value"] = False
22+
# This triggers the threading in run_async_in_sync
23+
result = run_async_in_sync(operation_with_critical_background_task())
24+
assert result == "operation_started"
25+
await asyncio.sleep(0.05)
26+
assert task_completed["value"] is True
27+
28+
asyncio.run(test_in_streamlit_context())

0 commit comments

Comments
 (0)