From c22e6dd879e3d61747c9c79a737269941ac08c2c Mon Sep 17 00:00:00 2001 From: zhuofan1123 Date: Tue, 18 Nov 2025 22:39:09 -0800 Subject: [PATCH 1/7] fix benchmark --- benchmarks/benchmark_single_batch.py | 4 ++-- benchmarks/benchmark_workers.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmark_single_batch.py b/benchmarks/benchmark_single_batch.py index 91b7c3b948..c629b545ea 100644 --- a/benchmarks/benchmark_single_batch.py +++ b/benchmarks/benchmark_single_batch.py @@ -25,10 +25,10 @@ class BenchmarkConfig: cache_ratio: float clear_cpu_cache: bool -def run_tp_client(dp_client_id, tp_rank, server_recv_port, model_config, cache_config): +def run_tp_client(dp_client_id, tp_rank, gpu_register_port, model_config, cache_config): """Run tp_client process""" device_id = tp_rank + dp_client_id * model_config.tp_size - tp_client = KVTPClient(server_recv_port, dp_client_id, device_id) + tp_client = KVTPClient(gpu_register_port, dp_client_id, device_id) num_gpu_blocks = cache_config.num_gpu_blocks diff --git a/benchmarks/benchmark_workers.py b/benchmarks/benchmark_workers.py index 009d6cec69..895a4b3e28 100644 --- a/benchmarks/benchmark_workers.py +++ b/benchmarks/benchmark_workers.py @@ -63,6 +63,7 @@ def create_cpu_gpu_worker( tokens_per_block=cache_config.tokens_per_block, num_head=model_config.num_kv_heads, head_size=model_config.head_size, + is_mla=model_config.use_mla, ) gpu_layout = KVCacheLayout( type=KVCacheLayoutType.LAYERFIRST, @@ -71,6 +72,7 @@ def create_cpu_gpu_worker( tokens_per_block=cache_config.tokens_per_block, num_head=model_config.num_kv_heads, head_size=model_config.head_size, + is_mla=model_config.use_mla, ) gpu_layout = gpu_layout.div_head(model_config.tp_size) if not model_config.use_mla else gpu_layout cpu_handle = CPUAllocator.allocate( @@ -140,6 +142,7 @@ def create_cpu_ssd_worker( tokens_per_block=cache_config.tokens_per_block, num_head=model_config.num_kv_heads, head_size=model_config.head_size, + is_mla=model_config.use_mla ) ssd_layout = KVCacheLayout( type=GLOBAL_CONFIG_FROM_ENV.ssd_layout_type, @@ -148,6 +151,7 @@ def create_cpu_ssd_worker( tokens_per_block=cache_config.tokens_per_block, num_head=model_config.num_kv_heads, head_size=model_config.head_size, + is_mla=model_config.use_mla ) cpu_handle = CPUAllocator.allocate( layout=cpu_layout, From ee1c710dfe090effbdb9927db8f475d62c94bfd9 Mon Sep 17 00:00:00 2001 From: zhuofan1123 Date: Tue, 18 Nov 2025 22:41:02 -0800 Subject: [PATCH 2/7] fix incorrect MatchResult --- csrc/radix_tree.cpp | 82 +++++++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 29 deletions(-) diff --git a/csrc/radix_tree.cpp b/csrc/radix_tree.cpp index 9259678d0b..073cbe3279 100644 --- a/csrc/radix_tree.cpp +++ b/csrc/radix_tree.cpp @@ -1,9 +1,9 @@ -#include -#include +#include #include +#include #include +#include #include -#include #include "cache_utils.h" #include "radix_tree.h" @@ -49,11 +49,16 @@ CRadixNode *CRadixNode::split(int prefix_length) { auto &new_block_hashes = new_node->get_block_hashes(); auto &new_physical_blocks = new_node->get_physical_blocks(); - new_block_hashes.insert(new_block_hashes.end(), block_hashes.cbegin(), block_hashes.cbegin() + prefix_length); - new_physical_blocks.insert(new_physical_blocks.end(), physical_blocks.cbegin(), physical_blocks.cbegin() + prefix_length); + new_block_hashes.insert(new_block_hashes.end(), block_hashes.cbegin(), + block_hashes.cbegin() + prefix_length); + new_physical_blocks.insert(new_physical_blocks.end(), + physical_blocks.cbegin(), + physical_blocks.cbegin() + prefix_length); - block_hashes.erase(block_hashes.begin(), block_hashes.begin() + prefix_length); - physical_blocks.erase(physical_blocks.begin(), physical_blocks.begin() + prefix_length); + block_hashes.erase(block_hashes.begin(), + block_hashes.begin() + prefix_length); + physical_blocks.erase(physical_blocks.begin(), + physical_blocks.begin() + prefix_length); parent->set_child(new_node->get_head_hash(), new_node); new_node->set_parent(parent); @@ -70,9 +75,10 @@ void CRadixNode::merge_child() { assert(child->is_leaf()); block_hashes.insert(block_hashes.end(), child->get_block_hashes().cbegin(), - child->get_block_hashes().cend()); - physical_blocks.insert(physical_blocks.end(), child->get_physical_blocks().cbegin(), - child->get_physical_blocks().cend()); + child->get_block_hashes().cend()); + physical_blocks.insert(physical_blocks.end(), + child->get_physical_blocks().cbegin(), + child->get_physical_blocks().cend()); set_time(std::max(get_time(), child->get_time())); children.clear(); @@ -91,17 +97,24 @@ std::deque *CRadixNode::shrink(int length) { auto remaining_length = size() - length; auto shrink_blocks = new std::deque(); - shrink_blocks->insert(shrink_blocks->end(), physical_blocks.begin() + remaining_length, physical_blocks.end()); + shrink_blocks->insert(shrink_blocks->end(), + physical_blocks.begin() + remaining_length, + physical_blocks.end()); - block_hashes.erase(block_hashes.begin() + remaining_length, block_hashes.end()); - physical_blocks.erase(physical_blocks.begin() + remaining_length, physical_blocks.end()); + block_hashes.erase(block_hashes.begin() + remaining_length, + block_hashes.end()); + physical_blocks.erase(physical_blocks.begin() + remaining_length, + physical_blocks.end()); return shrink_blocks; } CRadixNode *CRadixTreeIndex::insert(torch::Tensor &physical_block_ids, - torch::Tensor &block_hashes, int num_blocks, int num_insert_blocks, bool ready, - CRadixNode *last_node, int num_matched_blocks, int last_node_matched_length) { + torch::Tensor &block_hashes, int num_blocks, + int num_insert_blocks, bool ready, + CRadixNode *last_node, + int num_matched_blocks, + int last_node_matched_length) { if (num_insert_blocks == -1) { num_insert_blocks = num_blocks; } @@ -131,8 +144,10 @@ CRadixNode *CRadixTreeIndex::insert(torch::Tensor &physical_block_ids, auto block_hashes_ptr = block_hashes.data_ptr(); auto physical_block_ids_ptr = physical_block_ids.data_ptr(); for (auto i = 0; i + num_matched_blocks < num_insert_blocks; i++) { - new_block_hashes.insert(new_block_hashes.end(), block_hashes_ptr[i+num_matched_blocks]); - new_physical_blocks.insert(new_physical_blocks.end(), physical_block_ids_ptr[i]); + new_block_hashes.insert(new_block_hashes.end(), + block_hashes_ptr[i + num_matched_blocks]); + new_physical_blocks.insert(new_physical_blocks.end(), + physical_block_ids_ptr[i]); } if (last_node_matched_length < last_node->size()) { @@ -156,7 +171,9 @@ CRadixNode *CRadixTreeIndex::insert(torch::Tensor &physical_block_ids, int CRadixTreeIndex::evict(torch::Tensor &evicted_blocks, int num_evicted) { int64_t *evicted_blocks_ptr = evicted_blocks.data_ptr(); int has_evicted = 0; - std::priority_queue, CRadixNode::Compare> candidate; + std::priority_queue, + CRadixNode::Compare> + candidate; for (auto it = leaf_list.begin(); it != leaf_list.end(); it++) { if ((*it)->evictable()) { @@ -202,8 +219,9 @@ int CRadixTreeIndex::evict(torch::Tensor &evicted_blocks, int num_evicted) { return has_evicted; } -std::shared_ptr CRadixTreeIndex::match_prefix( - torch::Tensor &block_hashes, int num_blocks, bool update_cache_info) { +std::shared_ptr +CRadixTreeIndex::match_prefix(torch::Tensor &block_hashes, int num_blocks, + bool update_cache_info) { auto current_node = root; auto last_ready_node = root; auto prefix_blocks_num = 0; @@ -218,34 +236,39 @@ std::shared_ptr CRadixTreeIndex::match_prefix( current_node->update_time(hit_reward_seconds); } - child_hash = HashType(block_hashes_ptr[prefix_blocks_num + current_node->size()]); + child_hash = + HashType(block_hashes_ptr[prefix_blocks_num + current_node->size()]); if (current_node->lookup_child(child_hash)) { if (current_node->is_ready()) { last_ready_node = current_node; ready_prefix_blocks_num += current_node->size(); } prefix_blocks_num += current_node->size(); - physical_blocks->insert(physical_blocks->end(), current_node->get_physical_blocks().begin(), - current_node->get_physical_blocks().end()); + physical_blocks->insert(physical_blocks->end(), + current_node->get_physical_blocks().begin(), + current_node->get_physical_blocks().end()); current_node = current_node->get_child(child_hash); } else { auto matched_length = 0; if (is_root(current_node) == false) { - auto cmp_length = std::min(current_node->size(), num_blocks - prefix_blocks_num); + auto cmp_length = + std::min(current_node->size(), num_blocks - prefix_blocks_num); auto left = 0; auto right = cmp_length; while (left < right) { auto mid = (left + right) / 2; - if (current_node->get_hash(mid) == HashType(block_hashes_ptr[prefix_blocks_num+mid])) { + if (current_node->get_hash(mid) == + HashType(block_hashes_ptr[prefix_blocks_num + mid])) { left = mid + 1; } else { right = mid; } } matched_length = left; - physical_blocks->insert(physical_blocks->end(), current_node->get_physical_blocks().begin(), - current_node->get_physical_blocks().begin() + matched_length); + physical_blocks->insert( + physical_blocks->end(), current_node->get_physical_blocks().begin(), + current_node->get_physical_blocks().begin() + matched_length); } else { matched_length = 0; } @@ -261,8 +284,9 @@ std::shared_ptr CRadixTreeIndex::match_prefix( } } - return std::make_shared(prefix_blocks_num, ready_prefix_blocks_num, last_node_matched_length, - last_ready_node, current_node, physical_blocks); + return std::make_shared( + ready_prefix_blocks_num, prefix_blocks_num, last_node_matched_length, + last_ready_node, current_node, physical_blocks); } } // namespace flexkv From 0453bff80f41978e2e1c0089ccb08f5e44e91eb5 Mon Sep 17 00:00:00 2001 From: zhuofan1123 Date: Tue, 18 Nov 2025 22:41:33 -0800 Subject: [PATCH 3/7] use int64_t for offset --- csrc/transfer_ssd.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/csrc/transfer_ssd.cpp b/csrc/transfer_ssd.cpp index 4cafc266dc..fdeed88240 100644 --- a/csrc/transfer_ssd.cpp +++ b/csrc/transfer_ssd.cpp @@ -55,14 +55,14 @@ static void _transfer_iouring_impl( ssd_block_id /= num_files_per_device; // block id in single file if (enable_block_first_transfer) { - int layers_chunk_size_in_bytes = + int64_t layers_chunk_size_in_bytes = cpu_layer_stride_in_bytes * (end_layer - start_layer); - int cpu_layers_chunk_offset = start_layer * cpu_layer_stride_in_bytes; - int ssd_layers_chunk_offset = start_layer * ssd_layer_stride_in_bytes; + int64_t cpu_layers_chunk_offset = start_layer * cpu_layer_stride_in_bytes; + int64_t ssd_layers_chunk_offset = start_layer * ssd_layer_stride_in_bytes; void *cpu_block_ptr = reinterpret_cast(cpu_tensor_ptr) + block_stride_in_bytes * cpu_block_id + cpu_layers_chunk_offset; - int ssd_block_offset = + int64_t ssd_block_offset = ssd_block_id * block_stride_in_bytes + ssd_layers_chunk_offset; ssize_t bytes_transfer = 0; From 5edfc64c63aa17811a95715d733566fb54211a1d Mon Sep 17 00:00:00 2001 From: zhuofan1123 Date: Tue, 18 Nov 2025 22:49:03 -0800 Subject: [PATCH 4/7] fix bugs && update docs --- docs/flexkv_config_reference/README_en.md | 4 ++-- docs/flexkv_config_reference/README_zh.md | 4 ++-- flexkv/common/config.py | 6 +++--- flexkv/common/storage.py | 2 +- flexkv/common/transfer.py | 4 +++- flexkv/kvtask.py | 2 +- 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/docs/flexkv_config_reference/README_en.md b/docs/flexkv_config_reference/README_en.md index a416656dfc..31696322b7 100644 --- a/docs/flexkv_config_reference/README_en.md +++ b/docs/flexkv_config_reference/README_en.md @@ -106,8 +106,8 @@ Some configurations can only be set through environment variables. | Environment Variable | Type | Default | Description | |---------------------|------|---------|-------------| | `FLEXKV_MAX_FILE_SIZE_GB` | float | -1 | Maximum size of a single SSD file, -1 means unlimited | -| `FLEXKV_IORING_ENTRIES` | int | 512 | io_uring queue depth. Recommended to set to `512` to improve concurrent I/O performance | -| `FLEXKV_IORING_FLAGS` | int | 0 | io_uring flags, default is 0 | +| `FLEXKV_IOURING_ENTRIES` | int | 512 | io_uring queue depth. Recommended to set to `512` to improve concurrent I/O performance | +| `FLEXKV_IOURING_FLAGS` | int | 0 | io_uring flags, default is 0 | diff --git a/docs/flexkv_config_reference/README_zh.md b/docs/flexkv_config_reference/README_zh.md index 34821c5dd4..3c65d80948 100644 --- a/docs/flexkv_config_reference/README_zh.md +++ b/docs/flexkv_config_reference/README_zh.md @@ -104,8 +104,8 @@ enable_gds: false | 环境变量 | 类型 | 默认值 | 说明 | |--------|------|--------|------| | `FLEXKV_MAX_FILE_SIZE_GB` | float | -1 | 单个 SSD 文件的最大大小,-1表示不限 | -| `FLEXKV_IORING_ENTRIES` | int | 512 | io_uring 队列深度,推荐设为 `512` 以提升并发 IO 性能 | -| `FLEXKV_IORING_FLAGS` | int | 0 | io_uring 标志位,默认为 0| +| `FLEXKV_IOURING_ENTRIES` | int | 512 | io_uring 队列深度,推荐设为 `512` 以提升并发 IO 性能 | +| `FLEXKV_IOURING_FLAGS` | int | 0 | io_uring 标志位,默认为 0| diff --git a/flexkv/common/config.py b/flexkv/common/config.py index 0d1d2a622f..5695a88cf9 100644 --- a/flexkv/common/config.py +++ b/flexkv/common/config.py @@ -66,7 +66,7 @@ class CacheConfig: transfer_sms_h2d=int(os.getenv('FLEXKV_TRANSFER_SMS_H2D', 8)), transfer_sms_d2h=int(os.getenv('FLEXKV_TRANSFER_SMS_D2H', 8)), - iouring_entries=int(os.getenv('FLEXKV_IORING_ENTRIES', 512)), + iouring_entries=int(os.getenv('FLEXKV_IOURING_ENTRIES', 512)), iouring_flags=int(os.getenv('FLEXKV_IORING_FLAGS', 0)), max_file_size_gb=float(os.getenv('FLEXKV_MAX_FILE_SIZE_GB', -1)), # -1 means no limit @@ -110,7 +110,7 @@ def load_user_config_from_file(config_file: str) -> UserConfig: if config_file.endswith('.json'): with open(config_file) as f: config = json.load(f) - elif config_file.endswith('.yaml'): + elif config_file.endswith(('.yaml', '.yml')): with open(config_file) as f: config = yaml.safe_load(f) else: @@ -158,7 +158,7 @@ def update_default_config_from_user_config(model_config: ModelConfig, if cache_config.num_ssd_blocks % len(cache_config.ssd_cache_dir) != 0: cache_config.num_ssd_blocks = \ - cache_config.num_ssd_blocks // len(cache_config.ssd_cache_dir) * len(cache_config.ssd_cache_dir) + (cache_config.num_ssd_blocks // len(cache_config.ssd_cache_dir) + 1) * len(cache_config.ssd_cache_dir) flexkv_logger.warning(f"num_ssd_blocks is not a multiple of num_ssd_devices, " f"adjust num_ssd_blocks to {cache_config.num_ssd_blocks}") diff --git a/flexkv/common/storage.py b/flexkv/common/storage.py index 53ab859c2d..ffd1d2cd46 100644 --- a/flexkv/common/storage.py +++ b/flexkv/common/storage.py @@ -28,7 +28,7 @@ class KVCacheLayout: tokens_per_block: int num_head: int head_size: int - is_mla: bool = False + is_mla: bool _kv_shape: Optional[torch.Size] = None def __eq__(self, other: object) -> bool: diff --git a/flexkv/common/transfer.py b/flexkv/common/transfer.py index eea0f72c9e..4e9eccd149 100644 --- a/flexkv/common/transfer.py +++ b/flexkv/common/transfer.py @@ -62,7 +62,9 @@ class TransferOp: def __post_init__(self) -> None: if self.transfer_type != TransferType.VIRTUAL and \ self.src_block_ids.size != self.dst_block_ids.size: - raise ValueError("src_block_ids and dst_block_ids must have the same number of physical blocks") + raise ValueError(f"src_block_ids and dst_block_ids must have the same number of physical blocks, but got " + f"src_block_ids.size={self.src_block_ids.size}, " + f"dst_block_ids.size={self.dst_block_ids.size}") with TransferOp._lock: self.op_id = TransferOp._next_op_id TransferOp._next_op_id += 1 diff --git a/flexkv/kvtask.py b/flexkv/kvtask.py index 3bfd348bcf..62b1746fba 100644 --- a/flexkv/kvtask.py +++ b/flexkv/kvtask.py @@ -124,7 +124,7 @@ def __init__(self, master_host=master_host, master_ports=master_ports )) - self.transfer_handles[0]._handle.send_config_to_remotes() + self.transfer_handles[-1]._handle.send_config_to_remotes() self.tasks: ExpiringDict[int, KVTask] = ExpiringDict(max_age_seconds=1800, max_len=100000) # 30 minutes self.graph_to_task: Dict[int, int] = {} From e3ef2e62c482c11d5ab940c238325cb00411e8fb Mon Sep 17 00:00:00 2001 From: zhuofan1123 Date: Tue, 18 Nov 2025 22:51:30 -0800 Subject: [PATCH 5/7] update config file --- examples/vllm_adaption/flexkv_config.json | 6 ------ examples/vllm_adaption/flexkv_config.yml | 4 ++++ 2 files changed, 4 insertions(+), 6 deletions(-) delete mode 100644 examples/vllm_adaption/flexkv_config.json create mode 100644 examples/vllm_adaption/flexkv_config.yml diff --git a/examples/vllm_adaption/flexkv_config.json b/examples/vllm_adaption/flexkv_config.json deleted file mode 100644 index 10f311c4ba..0000000000 --- a/examples/vllm_adaption/flexkv_config.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "cpu_cache_gb": 32, - "ssd_cache_gb": 1024, - "ssd_cache_dir": "/data/flexkv_ssd/", - "enable_gds": false - } \ No newline at end of file diff --git a/examples/vllm_adaption/flexkv_config.yml b/examples/vllm_adaption/flexkv_config.yml new file mode 100644 index 0000000000..ebe664a873 --- /dev/null +++ b/examples/vllm_adaption/flexkv_config.yml @@ -0,0 +1,4 @@ +cpu_cache_gb: 32 +ssd_cache_gb: 64 +ssd_cache_dir: ./ssd_cache/ +enable_gds: false From ec6608028b5537092224b7c6f2f17427461bc03b Mon Sep 17 00:00:00 2001 From: zhuofan1123 Date: Tue, 18 Nov 2025 22:53:38 -0800 Subject: [PATCH 6/7] fix env name --- flexkv/transfer_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexkv/transfer_manager.py b/flexkv/transfer_manager.py index c110659f5b..72c1a5ec38 100644 --- a/flexkv/transfer_manager.py +++ b/flexkv/transfer_manager.py @@ -132,8 +132,8 @@ def shutdown(self) -> None: self.transfer_engine.shutdown() def get_master_host_and_ports_from_env() -> Tuple[str, Tuple[str, str, str]]: - master_host = os.getenv("MASTER_HOST", "localhost") - master_ports = os.getenv("MASTER_PORTS", "5556,5557,5558") + master_host = os.getenv("FLEXKV_MASTER_HOST", "localhost") + master_ports = os.getenv("FLEXKV_MASTER_PORTS", "5556,5557,5558") master_ports = tuple(master_ports.split(",")) return "tcp://" + master_host, master_ports From 87be2cdeadfad95fd9fafa6f23266ac95046b6d2 Mon Sep 17 00:00:00 2001 From: zhuofan1123 Date: Tue, 18 Nov 2025 23:11:21 -0800 Subject: [PATCH 7/7] remove useless exceptions --- flexkv/cache/cache_engine.py | 31 +++++++++++---------- flexkv/cache/mempool.py | 4 +-- flexkv/cache/radixtree.py | 13 +++------ flexkv/common/exceptions.py | 47 -------------------------------- tests/test_cache_engine.py | 9 +++--- tests/test_cache_engine_accel.py | 10 +++---- 6 files changed, 30 insertions(+), 84 deletions(-) delete mode 100644 flexkv/common/exceptions.py diff --git a/flexkv/cache/cache_engine.py b/flexkv/cache/cache_engine.py index 17e5242e0e..2fa0038f21 100644 --- a/flexkv/cache/cache_engine.py +++ b/flexkv/cache/cache_engine.py @@ -30,7 +30,6 @@ from flexkv.cache.transfer_pattern import add_virtal_op_for_mutiple_finished_ops from flexkv.common.block import SequenceMeta from flexkv.common.config import CacheConfig, ModelConfig, GLOBAL_CONFIG_FROM_ENV -from flexkv.common.exceptions import InvalidConfigError, NotEnoughSpaceError from flexkv.common.transfer import ( DeviceType, TransferOpGraph, TransferOp, TransferType ) @@ -55,11 +54,11 @@ def __init__(self, evict_ratio: float, hit_reward_seconds: int = 0): if not isinstance(device_type, DeviceType): - raise InvalidConfigError(f"Unknown device type: {device_type}") + raise ValueError(f"Unknown device type: {device_type}") if num_total_blocks <= 0: - raise InvalidConfigError(f"Invalid num_total_blocks: {num_total_blocks}") + raise ValueError(f"Invalid num_total_blocks: {num_total_blocks}") if tokens_per_block <= 0 or (tokens_per_block & (tokens_per_block - 1)) != 0: - raise InvalidConfigError(f"Invalid tokens_per_block: {tokens_per_block}, " + raise ValueError(f"Invalid tokens_per_block: {tokens_per_block}, " f"tokens_per_block must be a power of 2") self.device_type = device_type @@ -137,9 +136,9 @@ def take(self, if protected_node is not None: self.index.unlock(protected_node) if strict and num_required_blocks > self.mempool.num_free_blocks: - raise NotEnoughSpaceError("Not enough free blocks to take, ", - required=num_required_blocks, - available=self.mempool.num_free_blocks) + raise RuntimeError(f"Not enough free blocks to take, " + f"required: {num_required_blocks}, " + f"available: {self.mempool.num_free_blocks}") num_allocated_blocks = min(num_required_blocks, self.mempool.num_free_blocks) return self.mempool.allocate_blocks(num_allocated_blocks) @@ -154,11 +153,11 @@ def __init__(self, evict_ratio: float, hit_reward_seconds: int = 0): if not isinstance(device_type, DeviceType): - raise InvalidConfigError(f"Unknown device type: {device_type}") + raise ValueError(f"Unknown device type: {device_type}") if num_total_blocks <= 0: - raise InvalidConfigError(f"Invalid num_total_blocks: {num_total_blocks}") + raise ValueError(f"Invalid num_total_blocks: {num_total_blocks}") if tokens_per_block <= 0 or (tokens_per_block & (tokens_per_block - 1)) != 0: - raise InvalidConfigError(f"Invalid tokens_per_block: {tokens_per_block}, " + raise ValueError(f"Invalid tokens_per_block: {tokens_per_block}, " f"tokens_per_block must be a power of 2") self.device_type = device_type @@ -218,9 +217,9 @@ def take(self, if protected_node is not None: self.index.unlock(protected_node) if strict and num_required_blocks > self.mempool.num_free_blocks: - raise NotEnoughSpaceError("Not enough free blocks to take, ", - required=num_required_blocks, - available=self.mempool.num_free_blocks) + raise RuntimeError("Not enough free blocks to take, ", + f"required: {num_required_blocks}, " + f"available: {self.mempool.num_free_blocks}") num_allocated_blocks = min(num_required_blocks, self.mempool.num_free_blocks) return self.mempool.allocate_blocks(num_allocated_blocks) @@ -626,7 +625,9 @@ def _get_impl_local(self, ) transfer_graph.add_transfer_op(op_gds_transfer) finished_ops_ids.append(op_gds_transfer.op_id) - op_node_to_ready[op_gds_transfer.op_id] = (DeviceType.SSD, ssd_node_to_unlock, ssd_node_to_unlock.size()) + op_node_to_ready[op_gds_transfer.op_id] = (DeviceType.SSD, + ssd_node_to_unlock, + ssd_node_to_unlock.size()) else: fragment2_cpu_blocks = self.cpu_cache_engine.take( num_required_blocks=fragment2_num_blocks, @@ -970,7 +971,7 @@ def _put_impl_local(self, protected_node = cpu_matched_result.last_node, strict=False ) - + if self.cache_config.enable_ssd: fragment2_ssd_blocks = self.ssd_cache_engine.take( num_required_blocks=fragment2_num_blocks, diff --git a/flexkv/cache/mempool.py b/flexkv/cache/mempool.py index d00077181f..7ab86730dc 100644 --- a/flexkv/cache/mempool.py +++ b/flexkv/cache/mempool.py @@ -3,8 +3,6 @@ import numpy as np -from flexkv.common.exceptions import NotEnoughSpaceError - class Mempool: def __init__( @@ -29,7 +27,7 @@ def allocate_blocks(self, num: int) -> np.ndarray: if num < 0: raise ValueError(f"num must be greater than 0, but got {num}") if num > self._num_free: - raise NotEnoughSpaceError("Not enough free blocks", required=num, available=self._num_free) + raise ValueError(f"Not enough free blocks, required: {num}, available: {self._num_free}") if num > len(self._free_ids) - self._free_ids_offset: self._update_free_ids() diff --git a/flexkv/cache/radixtree.py b/flexkv/cache/radixtree.py index ba735d1244..35d32f4b72 100644 --- a/flexkv/cache/radixtree.py +++ b/flexkv/cache/radixtree.py @@ -22,7 +22,6 @@ from flexkv.common.block import SequenceMeta from flexkv.common.hash_utils import HashType, Hasher -from flexkv.common.exceptions import LogicError @dataclass @@ -288,13 +287,11 @@ def evict(self, num_evicted: int) -> np.ndarray: return evicted_blocks def lock(self, node: RadixNode) -> None: - if node.lock_cnt < 0: - raise LogicError("before lock, lock_cnt < 0") + assert node.lock_cnt >= 0 node.lock_cnt += 1 def unlock(self, node: RadixNode) -> None: - if node.lock_cnt <= 0: - raise LogicError("before unlock, lock_cnt <= 0") + assert node.lock_cnt > 0 node.lock_cnt -= 1 def set_ready(self, node: RadixNode, is_ready: bool = True, ready_length: int = -1) -> None: @@ -303,10 +300,8 @@ def set_ready(self, node: RadixNode, is_ready: bool = True, ready_length: int = ready_length -= node.size() num_node = 1 while ready_length > 0: - if node.parent is None: - raise LogicError("node is None in set_ready") - else: - node = node.parent + assert node.parent is not None + node = node.parent ready_length -= node.size() node.is_ready = True num_node += 1 diff --git a/flexkv/common/exceptions.py b/flexkv/common/exceptions.py deleted file mode 100644 index 20e0cff493..0000000000 --- a/flexkv/common/exceptions.py +++ /dev/null @@ -1,47 +0,0 @@ -from typing import Optional - -class FlexKVError(Exception): - def __init__(self, message: str = "", error_type: Optional[str] = None): - self.message = message - self.error_type = error_type - super().__init__(self.__str__()) - - def __str__(self) -> str: - if self.error_type is not None: - return f"[{self.error_type}] {self.message}" - return self.message - -class InvalidConfigError(FlexKVError): - def __init__(self, message: str = ""): - super().__init__(message, "Invalid config") - -class LogicError(FlexKVError): - def __init__(self, message: str = ""): - super().__init__(message, "Logic error") - -class TransferError(FlexKVError): - def __init__(self, message: str = "", src: Optional[str] = None, dst: Optional[str] = None): - self.src = src - self.dst = dst - if src or dst: - message = f"{message} (Source: {src or '-'}, Destination: {dst or '-'})" - super().__init__(message, "Transfer failed") - -class NotEnoughSpaceError(FlexKVError): - def __init__(self, message: str = "", required: Optional[int] = None, available: Optional[int] = None): - self.required = required - self.available = available - if required is not None and available is not None: - message = f"{message} (Required: {required}, Available: {available})" - super().__init__(message, "Not enough space") - -class TimeOutError(FlexKVError): - def __init__(self, message: str = "", timeout: Optional[int] = None): - self.timeout = timeout - if timeout is not None: - message = f"{message} (Timeout: {timeout})" - super().__init__(message, "Time out") - -class HashCollisionError(FlexKVError): - def __init__(self, message: str = ""): - super().__init__(message, "Hash collision") diff --git a/tests/test_cache_engine.py b/tests/test_cache_engine.py index 70a12ffeb0..b1f3a25706 100644 --- a/tests/test_cache_engine.py +++ b/tests/test_cache_engine.py @@ -6,7 +6,6 @@ from flexkv.cache.mempool import Mempool from flexkv.cache.cache_engine import CacheEngine from flexkv.common.transfer import DeviceType -from flexkv.common.exceptions import InvalidConfigError, NotEnoughSpaceError from flexkv.common.block import SequenceMeta @pytest.fixture @@ -33,7 +32,7 @@ def cache_engine(request: pytest.FixtureRequest) -> CacheEngine: ) def test_config_init(config: dict, should_raise: bool): if should_raise: - with pytest.raises(InvalidConfigError) as e: + with pytest.raises(ValueError) as e: CacheEngine(**config) else: engine = CacheEngine(**config) @@ -56,7 +55,7 @@ def test_mempool(): mempool.allocate_blocks(16)]) assert mempool.num_free_blocks == 0 - with pytest.raises(NotEnoughSpaceError): + with pytest.raises(ValueError): mempool.allocate_blocks(1) mempool.recycle_blocks(block_ids) @@ -164,7 +163,7 @@ def test_take_and_recycle(cache_engine: CacheEngine): with pytest.raises(ValueError): cache_engine.take(-1) - with pytest.raises(NotEnoughSpaceError): + with pytest.raises(RuntimeError): cache_engine.take(num_total_blocks, protected_node=radixnode, strict=True) physical_blocks2 = cache_engine.take(num_total_blocks, protected_node=radixnode, strict=False) @@ -174,7 +173,7 @@ def test_take_and_recycle(cache_engine: CacheEngine): cache_engine.recycle(physical_blocks2) cache_engine.lock_node(radixnode) - with pytest.raises(NotEnoughSpaceError): + with pytest.raises(RuntimeError): cache_engine.take(num_total_blocks, protected_node=radixnode, strict=True) cache_engine.unlock(radixnode) cache_engine.set_ready(radixnode, True, radixnode.size()) diff --git a/tests/test_cache_engine_accel.py b/tests/test_cache_engine_accel.py index 3c0e2b3cbe..8db5cfe21d 100644 --- a/tests/test_cache_engine_accel.py +++ b/tests/test_cache_engine_accel.py @@ -1,3 +1,4 @@ +from multiprocessing import Value import random import pytest @@ -6,7 +7,6 @@ from flexkv.cache.mempool import Mempool from flexkv.cache.cache_engine import CacheEngineAccel from flexkv.common.transfer import DeviceType -from flexkv.common.exceptions import InvalidConfigError, NotEnoughSpaceError from flexkv.common.block import SequenceMeta @pytest.fixture @@ -33,7 +33,7 @@ def cache_engine(request: pytest.FixtureRequest) -> CacheEngineAccel: ) def test_config_init(config: dict, should_raise: bool): if should_raise: - with pytest.raises(InvalidConfigError) as e: + with pytest.raises(ValueError) as e: CacheEngineAccel(**config) else: engine = CacheEngineAccel(**config) @@ -56,7 +56,7 @@ def test_mempool(): mempool.allocate_blocks(16)]) assert mempool.num_free_blocks == 0 - with pytest.raises(NotEnoughSpaceError): + with pytest.raises(ValueError): mempool.allocate_blocks(1) mempool.recycle_blocks(block_ids) @@ -160,7 +160,7 @@ def test_take_and_recycle(cache_engine: CacheEngineAccel): with pytest.raises(ValueError): cache_engine.take(-1) - with pytest.raises(NotEnoughSpaceError): + with pytest.raises(RuntimeError): cache_engine.take(num_total_blocks, protected_node=radixnode, strict=True) physical_blocks2 = cache_engine.take(num_total_blocks, protected_node=radixnode, strict=False) @@ -170,7 +170,7 @@ def test_take_and_recycle(cache_engine: CacheEngineAccel): cache_engine.recycle(physical_blocks2) cache_engine.lock_node(radixnode) - with pytest.raises(NotEnoughSpaceError): + with pytest.raises(RuntimeError): cache_engine.take(num_total_blocks, protected_node=radixnode, strict=True) cache_engine.unlock(radixnode) cache_engine.set_ready(radixnode, True, radixnode.size())