-
Notifications
You must be signed in to change notification settings - Fork 767
[Core] A nixl backend dedicated for object storage #1939
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Signed-off-by: Zirui Liu <[email protected]>
Summary of ChangesHello @ziruiliu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new 'NixlObjectBackend' for LMCache, designed to provide a robust and flexible object storage solution. It addresses limitations of the existing S3 connector by supporting self-signed certificates for private S3-compatible endpoints. A key innovation is the use of deterministic keys for cache lines, which allows the KV cache to be shared across nodes and persist through process restarts. The backend also includes optimizations for existence checks with a new presence cache and supports asynchronous write operations, enhancing overall performance and resilience. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new NixlObjectBackend for object storage, which is a significant and valuable addition. The implementation correctly leverages nixl for batched transfers and introduces features like a presence cache and asynchronous writes. The code is well-structured.
My review focuses on several key areas:
- Thread Safety: I've identified potential race conditions in the presence cache and statistics counters that could lead to incorrect behavior under concurrent access.
- Efficiency: There's an opportunity to improve the efficiency of blocking operations by avoiding busy-waiting.
- Correctness: A few methods, such as
removeandbatched_get_non_blocking, have implementations that don't fully match their intent, which could lead to bugs or unexpected behavior. - Maintainability: Some method names could be clearer to better reflect their functionality.
I've provided specific suggestions to address these points. Overall, this is a great contribution, and with these refinements, it will be even more robust and efficient.
| def remove(self, key: CacheEngineKey, force: bool = True) -> bool: | ||
| """ | ||
| Remove the key from the storage backend. | ||
|
|
||
| :param key: The key to remove. | ||
| :param force: Whether to force removal (not used in this implementation) | ||
| """ | ||
| self._cache_discard(key.chunk_hash) | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The remove method only discards the key from the local presence cache (self._cache_discard). It does not actually remove the object from the remote object storage. This will lead to orphaned data in the storage backend, which can cause unexpected behavior and unnecessary storage costs. The implementation should be updated to issue a delete request to the object storage via the nixl agent. If object deletion is not yet supported, this method should raise NotImplementedError.
| class SetPresenceCache: | ||
| """Default presence cache using a thread-safe Python set.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self._keys: set[int] = set() | ||
|
|
||
| def add(self, key: int) -> None: | ||
| self._keys.add(key) | ||
|
|
||
| def discard(self, key: int) -> None: | ||
| self._keys.discard(key) | ||
|
|
||
| def contains(self, key: int) -> bool: | ||
| return key in self._keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SetPresenceCache class is not thread-safe, despite its docstring. The add and discard methods modify self._keys without any synchronization, which can lead to race conditions when called from multiple threads (e.g., in batched_async_contains). To ensure thread safety, a threading.Lock should be used to protect access to self._keys.
| class SetPresenceCache: | |
| """Default presence cache using a thread-safe Python set.""" | |
| def __init__(self) -> None: | |
| self._keys: set[int] = set() | |
| def add(self, key: int) -> None: | |
| self._keys.add(key) | |
| def discard(self, key: int) -> None: | |
| self._keys.discard(key) | |
| def contains(self, key: int) -> bool: | |
| return key in self._keys | |
| class SetPresenceCache: | |
| """Default presence cache using a thread-safe Python set.""" | |
| def __init__(self) -> None: | |
| self._keys: set[int] = set() | |
| self._lock = threading.Lock() | |
| def add(self, key: int) -> None: | |
| with self._lock: | |
| self._keys.add(key) | |
| def discard(self, key: int) -> None: | |
| with self._lock: | |
| self._keys.discard(key) | |
| def contains(self, key: int) -> bool: | |
| with self._lock: | |
| return key in self._keys |
| while state != "DONE" and state != "ERR": | ||
| try: | ||
| state = self.nixl_agent.check_xfer_state(handle) | ||
| except nixlBind.nixlBackendError: | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The post_blocking method implements a busy-wait loop to check the transfer state, which can lead to high CPU usage and is inefficient. Consider adding a short sleep inside the loop to yield the CPU, similar to what's done in _wait_for_transfer.
| while state != "DONE" and state != "ERR": | |
| try: | |
| state = self.nixl_agent.check_xfer_state(handle) | |
| except nixlBind.nixlBackendError: | |
| raise | |
| while state != "DONE" and state != "ERR": | |
| try: | |
| time.sleep(0.001) # Avoid busy-waiting | |
| state = self.nixl_agent.check_xfer_state(handle) | |
| except nixlBind.nixlBackendError: | |
| raise |
| async def batched_get_non_blocking( | ||
| self, | ||
| lookup_id: str, | ||
| keys: list[CacheEngineKey], | ||
| transfer_spec: Any = None, | ||
| ) -> list[MemoryObj]: | ||
| """ | ||
| Non blocking interface to get the kv cache from the storage backend. | ||
| :param List[CacheEngineKey] keys: The keys of the MemoryObjs. | ||
| :return: a list of memory objects. | ||
| """ | ||
| obj_list = self.storage_to_mem(keys, False) | ||
| assert None not in obj_list | ||
| return cast(list[MemoryObj], obj_list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The async method batched_get_non_blocking directly calls self.storage_to_mem(keys, False), which is a blocking function. This will block the entire asyncio event loop, defeating the purpose of it being non-blocking. To make it truly asynchronous, you should run the blocking call in a separate thread using asyncio.to_thread.
async def batched_get_non_blocking(
self,
lookup_id: str,
keys: list[CacheEngineKey],
transfer_spec: Any = None,
) -> list[MemoryObj]:
"""
Non blocking interface to get the kv cache from the storage backend.
:param List[CacheEngineKey] keys: The keys of the MemoryObjs.
:return: a list of memory objects.
"""
obj_list = await asyncio.to_thread(self.storage_to_mem, keys, False)
assert None not in obj_list
return cast(list[MemoryObj], obj_list)| def post_blocking_async(self, handle: NixlXferHandle): | ||
| """Non-blocking async post for WRITE operations.""" | ||
| state = self.nixl_agent.transfer(handle) | ||
| return state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name post_blocking_async is confusing as it suggests both blocking and asynchronous behavior. Based on its implementation and docstring ("Non-blocking async post"), a more descriptive name like post_non_blocking or post_async would improve clarity. Remember to update the call site in mem_to_storage as well.
| def post_blocking_async(self, handle: NixlXferHandle): | |
| """Non-blocking async post for WRITE operations.""" | |
| state = self.nixl_agent.transfer(handle) | |
| return state | |
| def post_async(self, handle: NixlXferHandle): | |
| """Non-blocking async post for WRITE operations.""" | |
| state = self.nixl_agent.transfer(handle) | |
| return state |
| if resp[0] is None: | ||
| return False | ||
| return True | ||
| except Exception as exc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching a broad Exception can hide bugs or unexpected errors. It's better to catch more specific exceptions that you expect self.nixl_agent.query_memory to raise, such as nixlBind.nixlBackendError which is caught elsewhere in this file. This makes error handling more robust and predictable.
| except Exception as exc: | |
| except nixlBind.nixlBackendError as exc: |
| def _cache_contains(self, chunk_hash: int) -> bool: | ||
| if not self.enable_presence_cache or self.key_presence_cache is None: | ||
| return False | ||
| found = self.key_presence_cache.contains(chunk_hash) | ||
| self.hit_counter += 1 if found else 0 | ||
| self.total_counter += 1 | ||
| if self.total_counter % 100 == 0: | ||
| logger.debug(f"Cache hit: {self.hit_counter} vs {self.total_counter}") | ||
| return found |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The counters self.hit_counter and self.total_counter are updated in _cache_contains without any locking. Since _cache_contains can be called from multiple threads via batched_async_contains, this can lead to race conditions and inaccurate statistics. These counters should be protected by a lock. You could reuse self.progress_lock for this purpose.
| def _cache_contains(self, chunk_hash: int) -> bool: | |
| if not self.enable_presence_cache or self.key_presence_cache is None: | |
| return False | |
| found = self.key_presence_cache.contains(chunk_hash) | |
| self.hit_counter += 1 if found else 0 | |
| self.total_counter += 1 | |
| if self.total_counter % 100 == 0: | |
| logger.debug(f"Cache hit: {self.hit_counter} vs {self.total_counter}") | |
| return found | |
| def _cache_contains(self, chunk_hash: int) -> bool: | |
| if not self.enable_presence_cache or self.key_presence_cache is None: | |
| return False | |
| found = self.key_presence_cache.contains(chunk_hash) | |
| with self.progress_lock: | |
| self.hit_counter += 1 if found else 0 | |
| self.total_counter += 1 | |
| if self.total_counter % 100 == 0: | |
| logger.debug(f"Cache hit: {self.hit_counter} vs {self.total_counter}") | |
| return found |
| self._wait_for_transfer( | ||
| handle, initial_state, keys, | ||
| storage_reg_descs, storage_xfer_handler, | ||
| mem_objs | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The arguments to self._wait_for_transfer are formatted in a way that is hard to read. For better readability and to adhere to common Python style guides, consider formatting the arguments on separate lines, aligned with the opening parenthesis.
self._wait_for_transfer(
handle,
initial_state,
keys,
storage_reg_descs,
storage_xfer_handler,
mem_objs,
)Signed-off-by: Zirui Liu <[email protected]>
|
I love that you're doing this. We will give it a spin with our all-flash Ceph cluster. The CRT client does have upsides, namely distributing connections in the client connection pool across s3 endpoints returned via DNS. We played with a variation of nixl/obj connector that was modified to use S3CrtClient (to get multipathing), but there were enough other issues that we abandoned it in favor of the native s3 connector in lmcache. Making the nixl object content addressable is a huge step forward, and I also appreciate the lookup caching! |
Thank you Kyle for testing with this branch. I am now working with Nvidia nixl team to get an unified version of nixl storage backend. Please stay tuned |
|
if it helps in anyway, we also tested this branch with kserve/llminferenceservice observing good perf difference and also a streamlined config when compared to lmcache+s3 (shm keeps on filling up) and lmcache+nixl[storage]+s3 (inconsistent numbers intermittently). This is a small test I've added to test the basic flow https://github.com/leelavg/LMCache/blob/nixl-test/tests/test_nixl_object.py Thanks. |
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
|
Close this one. This scalable nixl object backend is integrated into an unified nixl storage backend, see PR #2024 |
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>
This commit incorporates PR LMCache#1939 into NixlStorageBackend, under the name of NixlDynamicStorageBackend, in order to prevent having 2 nixl storage backends as well as lessen code duplication. In order to use this new dynamic storage mode, which creates object keys when needed instead of a limited pre-allocated amount, extra_config.nixl_pool_size should be set to 0. This mode is currently only supported for nixl OBJ backend. Signed-off-by: Tomer Shmilovich <[email protected]>

Why
CacheEngineKey, letting us reuse objects after a reboot without rebuilding metadata. Therefore the new nixl_object_backend is able to share KV cache data between multiple nodes, and survives process restartsee #1557, original Nixl storage backend needs nixl_pool_size to pre-allocate certain number of files or object names at initialization with random names. With the new backend, there is no nixl_pool_size value because objects are named with key's hash value. Exatr config looks like:
What
NixlObjectBackendbuilt on top ofNixlAgenthelpers for memory registration and transfer so all reads/writes go through Nixl-provided batched descriptors.batched_async_containsby callin contains() parallely in multiple batches, fail fast if there is any failure in the small batchs, instead of issuing all contains() at oncequery_memorycalls during lookups. The cache could be false-positive, which would trigger failure in following retrieval. However failure in retrieval could be recovered by async KV loading in LMCache or vllm's #19330