Skip to content

Commit d351238

Browse files
cadedanieljoerunde
authored andcommitted
[Misc] [Core] Implement RFC "Augment BaseExecutor interfaces to enable hardware-agnostic speculative decoding" (vllm-project#3837)
1 parent cee255f commit d351238

20 files changed

Lines changed: 453 additions & 277 deletions

tests/core/block/e2e/test_correctness.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
1717
# Allow only 5 sequences of ~1024 tokens in worst case.
1818
"block_size": 16,
19-
"forced_num_gpu_blocks": 5 * (64 + 1),
19+
"num_gpu_blocks_override": 5 * (64 + 1),
2020
}])
2121
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
2222
@pytest.mark.parametrize("baseline_llm_kwargs", [{
@@ -162,14 +162,14 @@ def test_v1_v2_greedy_equality_with_cow(baseline_llm_generator,
162162
163163
# Allow only 2 sequences of ~128 tokens in worst case.
164164
# Note 8 = 128/block_size
165-
"forced_num_gpu_blocks": 2 * (8 + 1),
165+
"num_gpu_blocks_override": 2 * (8 + 1),
166166
},
167167
{
168168
"block_size": 8,
169169
170170
# Allow only 2 sequences of ~128 tokens in worst case.
171171
# Note 16 = 128/block_size
172-
"forced_num_gpu_blocks": 2 * (16 + 1),
172+
"num_gpu_blocks_override": 2 * (16 + 1),
173173
}
174174
])
175175
@pytest.mark.parametrize("baseline_llm_kwargs", [{

tests/lora/test_worker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import tempfile
44
from unittest.mock import patch
55

6-
from vllm.config import (DeviceConfig, LoRAConfig, ModelConfig, ParallelConfig,
7-
SchedulerConfig)
6+
from vllm.config import (CacheConfig, DeviceConfig, LoRAConfig, ModelConfig,
7+
ParallelConfig, SchedulerConfig)
88
from vllm.lora.models import LoRAMapping
99
from vllm.lora.request import LoRARequest
1010
from vllm.worker.worker import Worker
@@ -27,6 +27,10 @@ def test_worker_apply_lora(sql_lora_files):
2727
parallel_config=ParallelConfig(1, 1, False),
2828
scheduler_config=SchedulerConfig(32, 32, 32),
2929
device_config=DeviceConfig("cuda"),
30+
cache_config=CacheConfig(block_size=16,
31+
gpu_memory_utilization=1.,
32+
swap_space=0,
33+
cache_dtype="auto"),
3034
local_rank=0,
3135
rank=0,
3236
lora_config=LoRAConfig(max_lora_rank=8, max_cpu_loras=32,

tests/spec_decode/test_spec_decode_worker.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,8 @@ def test_init_device():
512512

513513

514514
@torch.inference_mode()
515-
def test_init_cache_engine():
516-
"""Verify SpecDecodeWorker invokes init_cache_engine on proposer/scorer
515+
def test_initialize_cache():
516+
"""Verify SpecDecodeWorker invokes initialize_cache on proposer/scorer
517517
workers.
518518
"""
519519
draft_worker = mock_worker(cls=MultiStepWorker)
@@ -525,23 +525,22 @@ def test_init_cache_engine():
525525
worker = SpecDecodeWorker(draft_worker, target_worker, rejection_sampler,
526526
metrics_collector)
527527

528-
cache_config = MagicMock()
528+
kwargs = {"num_gpu_blocks": 1024, "num_cpu_blocks": 1023}
529+
worker.initialize_cache(**kwargs)
529530

530-
worker.init_cache_engine(cache_config)
531-
532-
draft_worker.init_cache_engine.assert_called_once_with(cache_config)
533-
target_worker.init_cache_engine.assert_called_once_with(cache_config)
531+
draft_worker.initialize_cache.assert_called_once_with(**kwargs)
532+
target_worker.initialize_cache.assert_called_once_with(**kwargs)
534533

535534

536535
@pytest.mark.parametrize('available_gpu_blocks', [1, 1024])
537536
@pytest.mark.parametrize('available_cpu_blocks', [500])
538537
@pytest.mark.parametrize('target_cache_block_size_bytes', [2 * 2 * 4096])
539538
@pytest.mark.parametrize('draft_kv_size_bytes', [0, 2 * 2 * 768, 2 * 2 * 4096])
540539
@pytest.mark.skip_global_cleanup
541-
def test_profile_num_available_blocks(available_gpu_blocks: int,
542-
available_cpu_blocks: int,
543-
target_cache_block_size_bytes: int,
544-
draft_kv_size_bytes: int):
540+
def test_determine_num_available_blocks(available_gpu_blocks: int,
541+
available_cpu_blocks: int,
542+
target_cache_block_size_bytes: int,
543+
draft_kv_size_bytes: int):
545544
"""Verify SpecDecodeWorker correctly profiles num available GPU blocks.
546545
Specifically, it should run profiling in the scorer worker, and then evenly
547546
split the blocks between proposer and scorer worker.
@@ -552,7 +551,7 @@ def test_profile_num_available_blocks(available_gpu_blocks: int,
552551
rejection_sampler.token_id_dtype = torch.int64
553552
metrics_collector = MagicMock(spec=AsyncMetricsCollector)
554553

555-
target_worker.profile_num_available_blocks.return_value = (
554+
target_worker.determine_num_available_blocks.return_value = (
556555
available_gpu_blocks, available_cpu_blocks)
557556
target_worker.get_cache_block_size_bytes.return_value = (
558557
target_cache_block_size_bytes)
@@ -561,17 +560,9 @@ def test_profile_num_available_blocks(available_gpu_blocks: int,
561560
worker = SpecDecodeWorker(draft_worker, target_worker, rejection_sampler,
562561
metrics_collector)
563562

564-
# These values do not directly impact the adjusted block size calculation,
565-
# so they can be fixed.
566-
gpu_memory_utilization = 0.9
567-
cpu_swap_space = 100
568-
block_size = 16
569-
570-
num_gpu_blocks, num_cpu_blocks = worker.profile_num_available_blocks(
571-
block_size, gpu_memory_utilization, cpu_swap_space, cache_dtype="auto")
563+
num_gpu_blocks, num_cpu_blocks = worker.determine_num_available_blocks()
572564

573-
target_worker.profile_num_available_blocks.assert_called_once_with(
574-
block_size, gpu_memory_utilization, cpu_swap_space, "auto")
565+
target_worker.determine_num_available_blocks.assert_called_once()
575566
assert num_cpu_blocks == available_cpu_blocks
576567

577568
assert num_gpu_blocks == split_num_cache_blocks_evenly(

tests/spec_decode/utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def create_worker(cls: type,
117117
parallel_config=engine_config.parallel_config,
118118
scheduler_config=engine_config.scheduler_config,
119119
device_config=engine_config.device_config,
120+
cache_config=engine_config.cache_config,
120121
local_rank=0,
121122
rank=0,
122123
distributed_init_method=distributed_init_method,
@@ -128,8 +129,9 @@ def create_worker(cls: type,
128129

129130
engine_config.cache_config.num_gpu_blocks = num_gpu_blocks
130131
engine_config.cache_config.num_cpu_blocks = 0
131-
worker.init_cache_engine(engine_config.cache_config)
132-
worker.warm_up_model()
132+
worker.initialize_cache(
133+
num_gpu_blocks=engine_config.cache_config.num_gpu_blocks,
134+
num_cpu_blocks=engine_config.cache_config.num_cpu_blocks)
133135

134136
return worker
135137

tests/worker/test_swap.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ def test_swap() -> None:
1111
dtype="half",
1212
load_format="dummy")
1313
engine_config = engine_args.create_engine_config()
14-
engine_config.cache_config.num_gpu_blocks = 100
15-
engine_config.cache_config.num_cpu_blocks = 100
14+
engine_config.cache_config.num_gpu_blocks = 1000
15+
engine_config.cache_config.num_cpu_blocks = 1000
1616

1717
# Create the worker.
1818
distributed_init_method = get_distributed_init_method(
@@ -22,6 +22,7 @@ def test_swap() -> None:
2222
parallel_config=engine_config.parallel_config,
2323
scheduler_config=engine_config.scheduler_config,
2424
device_config=engine_config.device_config,
25+
cache_config=engine_config.cache_config,
2526
local_rank=0,
2627
rank=0,
2728
distributed_init_method=distributed_init_method,
@@ -31,8 +32,9 @@ def test_swap() -> None:
3132
# Initialize the worker.
3233
worker.init_device()
3334
worker.load_model()
34-
worker.init_cache_engine(engine_config.cache_config)
35-
worker.warm_up_model()
35+
worker.initialize_cache(
36+
num_gpu_blocks=engine_config.cache_config.num_gpu_blocks,
37+
num_cpu_blocks=engine_config.cache_config.num_cpu_blocks)
3638

3739
# Randomly initialize the cache.
3840
gpu_cache = worker.cache_engine.gpu_cache

vllm/config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ class CacheConfig:
334334
vLLM execution.
335335
swap_space: Size of the CPU swap space per GPU (in GiB).
336336
cache_dtype: Data type for kv cache storage.
337-
forced_num_gpu_blocks: Number of GPU blocks to use. This overrides the
337+
num_gpu_blocks_override: Number of GPU blocks to use. This overrides the
338338
profiled num_gpu_blocks if specified. Does nothing if None.
339339
"""
340340

@@ -344,14 +344,14 @@ def __init__(
344344
gpu_memory_utilization: float,
345345
swap_space: int,
346346
cache_dtype: str,
347-
forced_num_gpu_blocks: Optional[int] = None,
347+
num_gpu_blocks_override: Optional[int] = None,
348348
sliding_window: Optional[int] = None,
349349
enable_prefix_caching: bool = False,
350350
) -> None:
351351
self.block_size = block_size
352352
self.gpu_memory_utilization = gpu_memory_utilization
353353
self.swap_space_bytes = swap_space * _GB
354-
self.forced_num_gpu_blocks = forced_num_gpu_blocks
354+
self.num_gpu_blocks_override = num_gpu_blocks_override
355355
self.cache_dtype = cache_dtype
356356
self.sliding_window = sliding_window
357357
self.enable_prefix_caching = enable_prefix_caching

vllm/engine/arg_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class EngineArgs:
5555
max_cpu_loras: Optional[int] = None
5656
device: str = 'auto'
5757
ray_workers_use_nsight: bool = False
58-
forced_num_gpu_blocks: Optional[int] = None
58+
num_gpu_blocks_override: Optional[int] = None
5959
num_lookahead_slots: int = 0
6060

6161
# Related to Vision-language models such as llava
@@ -246,7 +246,7 @@ def add_cli_args(
246246
'the model executor, which can range from 0 to 1.'
247247
'If unspecified, will use the default value of 0.9.')
248248
parser.add_argument(
249-
'--forced-num-gpu-blocks',
249+
'--num-gpu-blocks-override',
250250
type=int,
251251
default=None,
252252
help='If specified, ignore GPU profiling result and use this number'
@@ -426,7 +426,7 @@ def create_engine_config(self, ) -> EngineConfig:
426426
cache_config = CacheConfig(self.block_size,
427427
self.gpu_memory_utilization,
428428
self.swap_space, self.kv_cache_dtype,
429-
self.forced_num_gpu_blocks,
429+
self.num_gpu_blocks_override,
430430
model_config.get_sliding_window(),
431431
self.enable_prefix_caching)
432432
parallel_config = ParallelConfig(

vllm/engine/llm_engine.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ def __init__(
127127
speculative_config=speculative_config,
128128
)
129129

130+
self._initialize_kv_caches()
131+
130132
# If usage stat is enabled, collect relevant info.
131133
if is_usage_stats_enabled():
132134
from vllm.model_executor.model_loader import (
@@ -178,6 +180,26 @@ def __init__(
178180
labels=dict(model_name=model_config.model))
179181
self.stat_logger.info("cache_config", self.cache_config)
180182

183+
def _initialize_kv_caches(self) -> None:
184+
"""Initialize the KV cache in the worker(s).
185+
186+
The workers will determine the number of blocks in both the GPU cache
187+
and the swap CPU cache.
188+
"""
189+
num_gpu_blocks, num_cpu_blocks = (
190+
self.model_executor.determine_num_available_blocks())
191+
192+
if self.cache_config.num_gpu_blocks_override is not None:
193+
num_gpu_blocks_override = self.cache_config.num_gpu_blocks_override
194+
logger.info(f"Overriding {num_gpu_blocks=} with "
195+
f"{num_gpu_blocks_override=}")
196+
num_gpu_blocks = num_gpu_blocks_override
197+
198+
self.cache_config.num_gpu_blocks = num_gpu_blocks
199+
self.cache_config.num_cpu_blocks = num_cpu_blocks
200+
201+
self.model_executor.initialize_cache(num_gpu_blocks, num_cpu_blocks)
202+
181203
@classmethod
182204
def from_engine_args(
183205
cls,

vllm/executor/cpu_executor.py

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ def __init__(self, model_config: ModelConfig, cache_config: CacheConfig,
3535

3636
# Instantiate the worker and load the model to CPU.
3737
self._init_worker()
38-
self._init_cache()
3938

4039
def _init_worker(self):
4140
from vllm.worker.cpu_worker import CPUWorker
@@ -46,10 +45,11 @@ def _init_worker(self):
4645
distributed_init_method = get_distributed_init_method(
4746
get_ip(), get_open_port())
4847
self.driver_worker = CPUWorker(
49-
self.model_config,
50-
self.parallel_config,
51-
self.scheduler_config,
52-
self.device_config,
48+
model_config=self.model_config,
49+
parallel_config=self.parallel_config,
50+
scheduler_config=self.scheduler_config,
51+
device_config=self.device_config,
52+
cache_config=self.cache_config,
5353
local_rank=0,
5454
rank=0,
5555
distributed_init_method=distributed_init_method,
@@ -60,35 +60,21 @@ def _init_worker(self):
6060
self.driver_worker.init_device()
6161
self.driver_worker.load_model()
6262

63-
def _init_cache(self) -> None:
64-
num_cpu_blocks = self.driver_worker.get_cpu_cache_block_num(
65-
block_size=self.cache_config.block_size,
66-
cache_space=self.cache_config.cpu_kvcache_space_bytes,
67-
cache_dtype=self.cache_config.cache_dtype,
68-
)
69-
63+
def determine_num_available_blocks(self) -> tuple[int, int]:
64+
"""Determine the number of available KV blocks by invoking the
65+
underlying worker.
66+
"""
67+
return self.driver_worker.determine_num_available_blocks()
68+
69+
def initialize_cache(self, num_gpu_blocks: int,
70+
num_cpu_blocks: int) -> None:
71+
"""Initialize the KV cache by invoking the underlying worker.
72+
"""
73+
# NOTE: We log here to avoid multiple logs when number of workers is
74+
# greater than one. We could log in the engine, but not all executors
75+
# have GPUs.
7076
logger.info(f"# CPU blocks: {num_cpu_blocks}")
71-
if num_cpu_blocks <= 0:
72-
raise ValueError("No available memory for the cache blocks. "
73-
"Try increasing `VLLM_CPU_KVCACHE_SPACE` when "
74-
"initializing the engine.")
75-
76-
max_seq_len = self.cache_config.block_size * num_cpu_blocks
77-
if self.model_config.max_model_len > max_seq_len:
78-
raise ValueError(
79-
f"The model's max seq len ({self.model_config.max_model_len}) "
80-
"is larger than the maximum number of tokens that can be "
81-
f"stored in KV cache ({max_seq_len}). Try increasing "
82-
"`VLLM_CPU_KVCACHE_SPACE` or decreasing `max_model_len` when "
83-
"initializing the engine.")
84-
85-
# Note: To reuse the cache management procedure,
86-
# use cpu cache as 'gpu cache'.
87-
self.cache_config.num_gpu_blocks = num_cpu_blocks # type: ignore
88-
self.cache_config.num_cpu_blocks = 0 # type: ignore
89-
90-
# Initialize the cache.
91-
self.driver_worker.init_cache_engine(cache_config=self.cache_config)
77+
self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
9278

9379
def execute_model(self,
9480
seq_group_metadata_list: List[SequenceGroupMetadata],
@@ -104,13 +90,13 @@ def execute_model(self,
10490
return output
10591

10692
def add_lora(self, lora_request: LoRARequest) -> bool:
107-
raise NotImplementedError("LoRA is not implemented for cpu backend.")
93+
return self.driver_worker.add_lora(lora_request)
10894

10995
def remove_lora(self, lora_id: int) -> bool:
110-
raise NotImplementedError("LoRA is not implemented for cpu backend.")
96+
return self.driver_worker.remove_lora(lora_id)
11197

11298
def list_loras(self) -> List[int]:
113-
raise NotImplementedError("LoRA is not implemented for cpu backend.")
99+
return self.driver_worker.list_loras()
114100

115101
def check_health(self) -> None:
116102
# CPUExecutor will always be healthy as long as

vllm/executor/executor_base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,29 @@ def __init__(
3030
) -> None:
3131
raise NotImplementedError
3232

33+
@abstractmethod
34+
def determine_num_available_blocks(self) -> tuple[int, int]:
35+
"""Determine the number of available blocks for the GPU KV cache and
36+
swappable CPU KV cache.
37+
38+
Normally, this should simply delegate to the underlying Worker. Some
39+
ExecutorBase may require modification of the result, e.g. to ensure the
40+
selected cache sizes are compatible with all workers.
41+
42+
Returns a tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
43+
are blocks that are "active" on the device and can be appended to.
44+
num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
45+
appended to.
46+
"""
47+
raise NotImplementedError
48+
49+
@abstractmethod
50+
def initialize_cache(self, num_gpu_blocks: int,
51+
num_cpu_blocks: int) -> None:
52+
"""Initialize the KV cache with the given size in blocks.
53+
"""
54+
raise NotImplementedError
55+
3356
@abstractmethod
3457
def execute_model(self,
3558
seq_group_metadata_list: List[SequenceGroupMetadata],

0 commit comments

Comments
 (0)