-
-
Notifications
You must be signed in to change notification settings - Fork 11.9k
[Core][Hybrid allocator + connector] Support hybrid allocator + kv cache connector #30166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces support for using the hybrid KV cache allocator with a KV cache connector, which is a significant enhancement for models with sliding window attention. The goal is to reduce memory pressure and prevent data contention by allocating KV cache blocks more precisely. The changes are extensive, modifying the core allocation logic in SingleTypeKVCacheManager and propagating these changes up to the KVCacheCoordinator and Scheduler. While the overall approach is sound, the implementation contains several temporary workarounds and comments marked as "REMOVE BEFORE MERGE", which are critical to address. I've identified issues in the KV connector factory, the LMCache connector implementation, and potential bugs or data correctness concerns in single_type_kv_cache_manager.py and block_pool.py. These must be resolved to ensure the stability and correctness of the new functionality.
| ## REMOVE BEFORE MERGE (YIFAN): Revert this warning back to raising | ||
| # an ValueError. | ||
| logger.warning( | ||
| "Connector %s does not support HMA but HMA is enabled. Please set " | ||
| "--disable-hybrid-kv-cache-manager to disable HMA.", | ||
| connector_cls.__name__, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change from raising a ValueError to a logger.warning is marked with a "REMOVE BEFORE MERGE" comment. Using a connector that does not support Hybrid Memory Allocation (HMA) when HMA is enabled can lead to incorrect behavior or hard-to-debug runtime errors. It is much safer to fail fast with an exception. This change should be reverted to raise ValueError before merging to prevent potential issues in production.
raise ValueError(
f"Connector {connector_cls.__name__} does not support HMA but "
f"HMA is enabled. Please set `--disable-hybrid-kv-cache-manager`.
)| ## REMOVE BEFORE MERGE (YIFAN): this is temporary workaround to work with | ||
| # LMCache. Remove this once having LMCache-side support for new interfaces. | ||
| vllm_config.kv_cache_config = kv_cache_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block contains a "REMOVE BEFORE MERGE" comment, indicating a temporary workaround. Directly modifying vllm_config by assigning to kv_cache_config is a side effect that can lead to unexpected behavior elsewhere in the system. This workaround should be removed, and a proper solution that avoids mutating the config object should be implemented as noted in the comment.
| ## REMOVE BEFORE MERGE (YIFAN): this is temporary workaround to work with | ||
| # LMCache. Remove this once having LMCache-side support for new interfaces. | ||
| def request_finished_all_groups( | ||
| self, | ||
| request: "Request", | ||
| block_ids: tuple[list[int], ...], | ||
| ) -> tuple[bool, dict[str, Any] | None]: | ||
| # NOTE: LMCache overloads request_finished so `block_ids` here can be | ||
| # either list[int] or tuple[list[int], ...]. This could be changed in | ||
| # the future to separate these two methods. | ||
| return self._lmcache_engine.request_finished(request, block_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The request_finished_all_groups method is marked as a temporary workaround with a "REMOVE BEFORE MERGE" comment. It appears to be a shim for a new interface required by the hybrid allocator. This temporary implementation should be replaced with a proper solution, and the dependency on this fix in LMCache should be resolved before this pull request is merged.
| # REMOVE BEFORE MERGE (YIFAN): why len(new_computed_blocks) | ||
| # rather than len(req_blocks)? | ||
| self.num_cached_block[request_id] = len(new_computed_blocks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is flagged with a "REMOVE BEFORE MERGE" comment that questions the logic. Setting self.num_cached_block[request_id] to len(new_computed_blocks) seems incorrect, as it doesn't account for previously existing blocks for the request. This could lead to an incorrect count of cached blocks, potentially causing issues in subsequent caching logic. It should likely be set to len(req_blocks) to reflect the total number of blocks for the request. Please clarify or fix this.
| # REMOVE BEFORE MERGE (YIFAN): why len(new_computed_blocks) | |
| # rather than len(req_blocks)? | |
| self.num_cached_block[request_id] = len(new_computed_blocks) | |
| self.num_cached_block[request_id] = len(req_blocks) |
| ## TODO(Yifan): here token_ids may be over-estimated for | ||
| ## sliding window layers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TODO comment indicates that token_ids might be over-estimated for sliding window layers. This could lead to incorrect data in BlockStored events, which could be problematic for external systems consuming these events. If external systems rely on exact token IDs for correctness, this over-estimation could be a significant issue. This should be addressed to ensure data integrity for event consumers.
|
Hi @ivanium, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, |
Signed-off-by: Yifan Qiao <[email protected]> Co-authored-by: KuntaiDu <[email protected]>
…indow, and leading padding with null blocks Signed-off-by: Yifan Qiao <[email protected]> fixes Signed-off-by: Yifan Qiao <[email protected]> fix get_num_blocks_to_allocate Signed-off-by: Yifan Qiao <[email protected]>
…ocks Signed-off-by: Yifan Qiao <[email protected]>
Signed-off-by: Yifan Qiao <[email protected]>
Signed-off-by: Yifan Qiao <[email protected]>
…ll blocks inside the single_type_block_manager Signed-off-by: Yifan Qiao <[email protected]>
…ng that in a follow-up PR Signed-off-by: Yifan Qiao <[email protected]>
Signed-off-by: Yifan Qiao <[email protected]>
Signed-off-by: Yifan Qiao <[email protected]>
Signed-off-by: Yifan Qiao <[email protected]>
Signed-off-by: Yifan Qiao <[email protected]>
Signed-off-by: Yifan Qiao <[email protected]>
223fb4d to
fa53140
Compare
|
Good work! In terms of landing this PR, @heheda12345 previously suggested me to separate into small PRs and I would prefer the same for this PR. Example: |
| # Some blocks may be null blocks when enabling sparse attention or sliding | ||
| # window attention. For now, we only have sliding window attention, and | ||
| # null blocks must be at the beginning. | ||
| first_non_null_blk_idx = 0 | ||
| for i, blk in enumerate(new_full_blocks): | ||
| if not blk.is_null: | ||
| first_non_null_blk_idx = i | ||
| break | ||
|
|
||
| for i, blk in enumerate(new_full_blocks[first_non_null_blk_idx:]): | ||
| assert not blk.is_null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # Some blocks may be null blocks when enabling sparse attention or sliding | |
| # window attention. For now, we only have sliding window attention, and | |
| # null blocks must be at the beginning. | |
| first_non_null_blk_idx = 0 | |
| for i, blk in enumerate(new_full_blocks): | |
| if not blk.is_null: | |
| first_non_null_blk_idx = i | |
| break | |
| for i, blk in enumerate(new_full_blocks[first_non_null_blk_idx:]): | |
| assert not blk.is_null | |
| for i, blk in enumerate(new_full_blocks[first_non_null_blk_idx:]): | |
| if blk.is_null: | |
| continue |
what about this?
| BlockStored( | ||
| block_hashes=new_hashes, | ||
| parent_block_hash=parent_block_hash, | ||
| ## TODO(Yifan): here token_ids may be over-estimated for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kv cache event + hybrid allocator is not supported now.
| num_evictable_blocks_to_allocate.append( | ||
| num_evictable_blocks_to_allocate_single_group | ||
| ) | ||
| return num_new_blocks_to_allocate, num_evictable_blocks_to_allocate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need num_evictable_blocks_to_allocate here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here.
I guess num_evictable_blocks_to_allocate refers to the blocks that has ref_cnt==0 and num_new_blocks_to_allocate refers to unallocated blocks, but I'm not sure if the logic of processing them should be different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need num_new_blocks_to_allocate for the actual allocation, but we also need num_new_blocks_to_allocate + num_evictable_blocks_to_allocate to check if remaining available memory is sufficient for allocating this request. So num_evictable_blocks_to_allocate is returned to calculate the total amount of additional memory usage to allocate this request
| # it as needed to be allocated. | ||
| num_evictable_computed_blocks = sum( | ||
| blk.ref_cnt == 0 and not blk.is_null for blk in new_computed_blocks | ||
| num_skipped_tokens = self.get_num_skipped_tokens(total_computed_tokens) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to call this function in every step?
| num_skipped_tokens = self.get_num_skipped_tokens(total_computed_tokens) | ||
|
|
||
| # Fast-path: nothing is skipped. | ||
| if num_skipped_tokens <= 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something like:
num_evictable_computed_blocks = xxxx
max(num_evictable_computed_blocks + (num_computed_token - len(new_computed_blocks) / block_size, (num_computed_tokens-num_skipped_tokens)/block_size)
| --------------------------------------------------------------------- | ||
| | < to be allocated > | | ||
| --------------------------------------------------------------------- | ||
| | < to be cached > | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new is not to be cached. It includes unverified spec decode tokens
| --------------------------------------------------------------------- | ||
| | < to be cached > | | ||
| --------------------------------------------------------------------- | ||
| | Prefix-cached tokens from both vLLM | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ext are not cached yet. They will be cached after all tokens are loaded.
| --------------------------------------------------------------------- | ||
| | ref_cnt | | ||
| | increased| | ||
| --------------------------------------------------------------------- | ||
| | ref_cnt not | | ||
| | increased yet| | ||
| --------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge to one line?
| --------------------------------------------------------------------- | ||
| | not cached by | | ||
| | vLLM, but | | ||
| | cached by | | ||
| | connector | | ||
| --------------------------------------------------------------------- | ||
| | < cached by vLLM > | | ||
| --------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge to one line?
| num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate( | ||
| ( | ||
| num_new_blocks_to_allocate, | ||
| num_evictable_blocks_to_allocate, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we shouldn't expand the concept of num_evictable_blocks_to_allocate to more places
| num_blocks_to_allocate_per_group: list[int], | ||
| num_tokens: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be difficult for ppl to understand why these two are diffferent
| if num_null_blocks_to_pad > 0: | ||
| req_blocks.extend([self._null_block] * num_null_blocks_to_pad) | ||
| # Add the remaining computed blocks. | ||
| req_blocks.extend(new_computed_blocks[num_null_blocks_to_pad:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that blocks in new_computed_blocks are already touched.
| # Only allocate real new blocks; cached hits should already be present | ||
| # in req_blocks via save_new_computed_blocks. | ||
| num_blocks_to_padding = num_new_blocks - num_blocks_to_allocate | ||
| assert num_blocks_to_padding >= 0, ( | ||
| f"Invalid padding: need {num_new_blocks}, allocate {num_blocks_to_allocate}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need padding in this function. Yon can add padding to either save_new_computed_blocks (if allocating for local computed and external computed in one shot) or the allocate_for_connector (if in two shots)
| else: | ||
| # A running request. Should not have new computed blocks. | ||
| assert len(new_computed_blocks) == 0 | ||
| num_skipped_tokens = self.get_num_skipped_tokens(total_computed_tokens) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about changing this function to handle both local and external computed:
total_computed_tokens = num_local_computed_tokens + num_external_computed_tokens
if request_id not in self.num_cached_block:
# A running request. Should not have new computed blocks.
assert len(new_computed_blocks) == 0
return
req_blocks = self.req_to_blocks[request_id]
assert len(req_blocks) == 0
num_skipped_blocks = self.get_num_skipped_tokens(total_computed_tokens) // block_size
if num_skipped_blocks > 0:
# sparse like sliding window
req_blocks.extend([null_block] * num_skipped_blocks)
new_computed_blocks = new_computed_blocks[num_skipped_blocks:]
self.block_pool.touch(new_computed_blocks)
req_blocks.extend(new_computed_blocks)
self.num_cached_block[request_id] = len(req_blocks)
if num_external_computed_tokens > 0
# happens when external connector
req_blocks.extend(block_pool.get_new_blocks(cdiv(total_computed_tokens, block_size) - len(req_blocks))
| | Prefix-cached tokens from both vLLM | | ||
| | and connector. Can be safely removed if | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| | Prefix-cached tokens from both vLLM | | |
| | and connector. Can be safely removed if | | |
| | Prefix-cached tokens from either vLLM | | |
| | or connector. Can be safely removed if | |
| and num_lookahead_tokens == 0 | ||
| and num_external_computed_tokens == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still assert num_new_tokens == 0
| blk.ref_cnt == 0 and not blk.is_null for blk in new_computed_blocks | ||
| num_skipped_tokens = self.get_num_skipped_tokens(total_computed_tokens) | ||
|
|
||
| # Fast-path: nothing is skipped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fast path for decode
Purpose
This is a contiuation work along PR #23624 to support hybrid KV cache manager + KV cache connector.
Design doc with details drafted by @KuntaiDu: link
In short, the current hybrid KV cache manager will try to allocate all tokens for sliding window layers similar to full attention layers, and then in the next scheduling step, the manager will free unuseful tokens (those outside the sliding window) and turn them into prefix cache in GRAM. This PR, instead, aims to allocate KV cache only for tokens in the sliding window for sliding window layers. This addresses two issues:
allocate-then-freepattern will cause data contention, where the connector might copy some KV cache blocks for one request in the background but the manager frees and reuses them for another request.This PR currently supports only LMCache connector. The support for the other connectors will be added in follow-up PRs.
cc @KuntaiDu @heheda12345
Test Plan
The test script is a modification from the one in PR #25712.
The script should be run with LMCache-side support: LMCache/LMCache#1436.
Caution
Please apply the following patch to LMCache if getting import errors for
cdiv:Patch
diff --git a/lmcache/integration/vllm/vllm_v1_adapter.py b/lmcache/integration/vllm/vllm_v1_adapter.py index a849097..4db64df 100644 --- a/lmcache/integration/vllm/vllm_v1_adapter.py +++ b/lmcache/integration/vllm/vllm_v1_adapter.py @@ -18,7 +18,10 @@ from vllm.distributed.parallel_state import ( get_tp_group, ) from vllm.sampling_params import SamplingParams -from vllm.utils import cdiv +try: + from vllm.utils import cdiv +except ImportError: + from vllm.utils.math_utils import cdivTo run this script on H100, please save the following code into
test_connector_w_hybrid_kv_allocator.py, andpython test_connector_w_hybrid_kv_allocator.py.`test_connector_w_hybrid_kv_allocator.py`
Test Result
Previous, we cannot allocate KV cache for the 3rd request which tries to allocate long prefixes and load external KV cache even for sliding window layers. With this PR, the 3rd request can allocate only KV caches needed for sliding window layers and is able to be scheduled and finish with correct results.
Detailed output
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.