Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tests/v1/executor/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import os
from collections.abc import Callable
from concurrent.futures import Future
from typing import Any

import pytest
Expand All @@ -27,7 +28,7 @@ def collective_rpc(
kwargs: dict | None = None,
non_block: bool = False,
unique_reply_rank: int | None = None,
) -> list[Any]:
) -> Any | list[Any] | Future[Any | list[Any]]:
# Drop marker to show that this was run
with open(".marker", "w"):
...
Expand Down
32 changes: 12 additions & 20 deletions tests/v1/kv_connector/unit/test_output_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,12 @@ def test_aggregate_workers_output():
def test_async_aggregate_workers_output():
aggregator = KVOutputAggregator(expected_finished_count=2)

future1: Future[DummyModelRunnerOutput] = Future()
future2: Future[DummyModelRunnerOutput] = Future()
result_future = aggregator.async_aggregate([future1, future2])
future: Future[list[DummyModelRunnerOutput]] = Future()
result_future = aggregator.async_aggregate(future)

output1 = DummyModelRunnerOutput()
output2 = DummyModelRunnerOutput()
future1.set_result(output1)
future2.set_result(output2)
future.set_result([output1, output2])

assert result_future.done()
aggregated = result_future.result()
Expand All @@ -106,16 +104,14 @@ def test_async_aggregate_workers_output():
assert aggregated.finished_recving is None
assert not aggregated.invalid_block_ids

future1 = Future()
future2 = Future()
result_future = aggregator.async_aggregate([future1, future2])
future = Future()
result_future = aggregator.async_aggregate(future)

output1 = DummyModelRunnerOutput(
finished_sending={"req1"}, finished_recving={"req2"}
)
output2 = DummyModelRunnerOutput(invalid_block_ids={1})
future1.set_result(output1)
future2.set_result(output2)
future.set_result([output1, output2])

assert result_future.done()
aggregated = result_future.result()
Expand All @@ -125,14 +121,12 @@ def test_async_aggregate_workers_output():
assert aggregated.finished_recving is None
assert aggregated.invalid_block_ids == {1}

future1 = Future()
future2 = Future()
result_future = aggregator.async_aggregate([future1, future2])
future = Future()
result_future = aggregator.async_aggregate(future)

output1 = DummyModelRunnerOutput(invalid_block_ids={2})
output2 = DummyModelRunnerOutput(finished_sending={"req1"})
future1.set_result(output1)
future2.set_result(output2)
future.set_result([output1, output2])

assert result_future.done()
aggregated = result_future.result()
Expand All @@ -142,16 +136,14 @@ def test_async_aggregate_workers_output():
assert aggregated.finished_recving is None
assert aggregated.invalid_block_ids == {2}

future1 = Future()
future2 = Future()
result_future = aggregator.async_aggregate([future1, future2])
future = Future()
result_future = aggregator.async_aggregate(future)

output1 = DummyModelRunnerOutput(invalid_block_ids={3, 4})
output2 = DummyModelRunnerOutput(
finished_recving={"req2"}, invalid_block_ids={4, 5}
)
future1.set_result(output1)
future2.set_result(output2)
future.set_result([output1, output2])

assert result_future.done()
aggregated = result_future.result()
Expand Down
43 changes: 14 additions & 29 deletions vllm/distributed/kv_transfer/kv_connector/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,39 +221,24 @@ def update_finished_set(

def async_aggregate(
self,
output_futures: Sequence[Future[ModelRunnerOutput | None]],
output_future: Future[Sequence[ModelRunnerOutput | None]],
output_rank: int = 0,
) -> Future[ModelRunnerOutput | None]:
"""Takes a list of futures and returns a single future which resolves
to the respective list of outputs."""
"""Takes a future that resolves to a list of outputs and returns a future
which resolves to a single aggregated output."""
result_future: Future[ModelRunnerOutput | None] = Future()

outputs: list[ModelRunnerOutput | None] = [None] * len(output_futures)
remaining = len(output_futures)

def make_callback(idx):
def callback(fut):
if result_future.done():
return

try:
outputs[idx] = fut.result()
except CancelledError:
result_future.cancel()
except Exception as e:
result_future.set_exception(e)

# this check assumes io_thread_pool uses a single thread
nonlocal remaining
remaining -= 1
if not remaining:
result_future.set_result(self.aggregate(outputs, output_rank))

return callback

for i, output_future in enumerate(output_futures):
output_future.add_done_callback(make_callback(i))

def callback(fut):
if result_future.done():
return
try:
result_future.set_result(self.aggregate(fut.result(), output_rank))
except CancelledError:
result_future.cancel()
except Exception as e:
result_future.set_exception(e)

output_future.add_done_callback(callback)
return result_future


Expand Down
4 changes: 2 additions & 2 deletions vllm/v1/executor/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def collective_rpc(
args: tuple = (),
kwargs: dict | None = None,
non_block: Literal[True] = True,
) -> list[Future[_R]]:
) -> Future[list[_R]]:
pass

@abstractmethod
Expand Down Expand Up @@ -219,7 +219,7 @@ def sample_tokens(

def sample_tokens(
self, grammar_output: GrammarOutput | None, non_block: bool = False
) -> ModelRunnerOutput | Future[ModelRunnerOutput]:
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
output = self.collective_rpc( # type: ignore[call-overload]
"sample_tokens", args=(grammar_output,), non_block=non_block
)
Expand Down
128 changes: 67 additions & 61 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import time
import traceback
import weakref
from collections import deque
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import Future, InvalidStateError
from contextlib import suppress
from dataclasses import dataclass
from enum import Enum, auto
from functools import cached_property, partial
Expand Down Expand Up @@ -54,6 +56,30 @@
logger = init_logger(__name__)


class FutureWrapper(Future):
def __init__(self, futures_queue: deque[tuple["FutureWrapper", Callable]]):
self.futures_queue = futures_queue
super().__init__()

def result(self, timeout=None):
if timeout is not None:
raise RuntimeError("timeout not implemented")
# Drain any futures ahead of us in the queue.
while not self.done():
future, get_response = self.futures_queue.pop()
future.wait_for_response(get_response)
return super().result()

def wait_for_response(self, get_response: Callable):
try:
response = get_response()
with suppress(InvalidStateError):
self.set_result(response)
except Exception as e:
with suppress(InvalidStateError):
self.set_exception(e)


class MultiprocExecutor(Executor):
supports_pp: bool = True

Expand All @@ -64,7 +90,6 @@ def _init_executor(self) -> None:
self.is_failed = False
self.shutdown_event = threading.Event()
self.failure_callback: FailureCallback | None = None
self.io_thread_pool: ThreadPoolExecutor | None = None

self.world_size = self.parallel_config.world_size
tensor_parallel_size = self.parallel_config.tensor_parallel_size
Expand Down Expand Up @@ -132,12 +157,7 @@ def _init_executor(self) -> None:
uw.death_writer.close()
self._ensure_worker_termination([uw.proc for uw in unready_workers])

# Note: must use only 1 IO thread to keep dequeue sequence
# from the response queue.
# _async_aggregate_workers_output also assumes a single IO thread.
self.io_thread_pool = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="mp_exec_io"
)
self.futures_queue = deque[tuple[FutureWrapper, Callable]]()

self.output_rank = self._get_output_rank()
self.has_connector = self.vllm_config.kv_transfer_config is not None
Expand Down Expand Up @@ -195,14 +215,13 @@ def _execute_with_aggregation(
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
if not self.has_connector:
# get output only from a single worker (output_rank)
(output,) = self.collective_rpc(
return self.collective_rpc(
method,
args=args,
unique_reply_rank=self.output_rank,
non_block=non_block,
timeout=envs.VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS,
)
return output

# get output from all workers
outputs = self.collective_rpc(
Expand All @@ -223,20 +242,21 @@ def execute_dummy_batch(self) -> None:

def take_draft_token_ids(self) -> DraftTokenIds | None:
# OPTIMIZATION: Get output only from a single worker (output_rank)
outputs = self.collective_rpc(
return self.collective_rpc(
"take_draft_token_ids", unique_reply_rank=self.output_rank
)
return outputs[0]

def collective_rpc(
def collective_rpc( # type: ignore[override]
self,
method: str | Callable,
timeout: float | None = None,
args: tuple = (),
kwargs: dict | None = None,
non_block: bool = False,
unique_reply_rank: int | None = None,
) -> list[Any]:
) -> Any | list[Any] | Future[Any | list[Any]]:
"""Returns single result if unique_reply_rank is provided, otherwise list."""

if self.is_failed:
raise RuntimeError("Executor failed.")

Expand All @@ -246,63 +266,52 @@ def collective_rpc(
# NOTE: If the args are heterogeneous, then we pack them into a list,
# and unpack them in the method of every worker, because every worker
# knows their own rank.
try:
if isinstance(method, str):
send_method = method
else:
send_method = cloudpickle.dumps(
method, protocol=pickle.HIGHEST_PROTOCOL
)
self.rpc_broadcast_mq.enqueue(
(send_method, args, kwargs, unique_reply_rank)
)

workers = (
(self.workers[unique_reply_rank],)
if unique_reply_rank is not None
else self.workers
)
responses = []
if isinstance(method, str):
send_method = method
else:
send_method = cloudpickle.dumps(method, protocol=pickle.HIGHEST_PROTOCOL)
self.rpc_broadcast_mq.enqueue((send_method, args, kwargs, unique_reply_rank))

def get_response(
w: WorkerProcHandle,
dequeue_timeout: float | None = None,
cancel_event: threading.Event | None = None,
):
status, result = w.worker_response_mq.dequeue(
timeout=dequeue_timeout, cancel=cancel_event
)
workers = (
(self.workers[unique_reply_rank],)
if unique_reply_rank is not None
else self.workers
)

if status != WorkerProc.ResponseStatus.SUCCESS:
raise RuntimeError(
f"Worker failed with error '{result}', please check the"
" stack trace above for the root cause"
)
return result
shutdown_event = self.shutdown_event

def get_response():
responses = []
for w in workers:
dequeue_timeout = (
None if deadline is None else (deadline - time.monotonic())
)

if self.io_thread_pool is not None:
# We must consume worker_response_mq from a single thread.
result = self.io_thread_pool.submit( # type: ignore
get_response, w, dequeue_timeout, self.shutdown_event
try:
status, result = w.worker_response_mq.dequeue(
timeout=dequeue_timeout, cancel=shutdown_event
)
if not non_block:
result = result.result()
elif not non_block:
result = get_response(w, dequeue_timeout, self.shutdown_event)
else:
except TimeoutError as e:
raise TimeoutError(f"RPC call to {method} timed out.") from e
if status != WorkerProc.ResponseStatus.SUCCESS:
raise RuntimeError(
"non_block can only be used when max_concurrent_batches > 1"
f"Worker failed with error '{result}', please check the"
" stack trace above for the root cause"
)
responses.append(result)
return responses[0] if unique_reply_rank is not None else responses

if non_block:
future = FutureWrapper(self.futures_queue)
self.futures_queue.appendleft((future, get_response))
return future

# First drain any pending futures in the queue.
while self.futures_queue:
future, get_fut_response = self.futures_queue.pop()
future.wait_for_response(get_fut_response)

return responses
except TimeoutError as e:
raise TimeoutError(f"RPC call to {method} timed out.") from e
return get_response()

@staticmethod
def _ensure_worker_termination(worker_procs: list[BaseProcess]):
Expand Down Expand Up @@ -348,9 +357,6 @@ def shutdown(self):
self._ensure_worker_termination([w.proc for w in workers])

self.shutdown_event.set()
if self.io_thread_pool is not None:
self.io_thread_pool.shutdown(wait=False, cancel_futures=True)
del self.io_thread_pool

self.rpc_broadcast_mq = None

Expand Down
Loading