Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,29 +404,30 @@ async def iterate_with_cancellation(

async def merge_async_iterators(
*iterators: AsyncGenerator[T, None],
is_cancelled: Callable[[], Awaitable[bool]],
is_cancelled: Optional[Callable[[], Awaitable[bool]]] = None,
) -> AsyncGenerator[Tuple[int, T], None]:
"""Merge multiple asynchronous iterators into a single iterator.

This method handle the case where some iterators finish before others.
When it yields, it yields a tuple (i, item) where i is the index of the
iterator that yields the item.

It also polls the provided function at least once per second to check
for client cancellation.
It also optionally polls a provided function at least once per second
to check for client cancellation.
"""

# Can use anext() in python >= 3.10
awaits = {
ensure_future(pair[1].__anext__()): pair
for pair in enumerate(iterators)
}
timeout = None if is_cancelled is None else 1
try:
while awaits:
done, pending = await asyncio.wait(awaits.keys(),
return_when=FIRST_COMPLETED,
timeout=1)
if await is_cancelled():
timeout=timeout)
if is_cancelled is not None and await is_cancelled():
raise asyncio.CancelledError("client cancelled")
for d in done:
pair = awaits.pop(d)
Expand Down