Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions benchmarks/benchmark_cache_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def main(args):
num_put_requests = 0
request_id = 0
for req in reqs:
fake_slot_mapping = torch.arange(req.token_mask[req.token_mask].sum(), dtype=torch.int64)
fake_slot_mapping = torch.arange(req.token_mask[req.token_mask].sum(), dtype=torch.int64).numpy()
local_vars = {
'cache_engine': cache_engine,
'req': req,
Expand All @@ -41,23 +41,25 @@ def main(args):
if req.request_type == "get":
num_get_requests += 1
if not args.only_put:
profiler.runctx('graph, return_mask, transfer_call_back, finished_ops_ids = '
profiler.runctx('graph, return_mask, transfer_call_back, op_callback_dict, finished_ops_ids = '
'cache_engine.get(request_id, req.token_ids, req.token_mask, '
'fake_slot_mapping, -1, -1)',
globals(), local_vars)
else:
graph, return_mask, transfer_call_back, finished_ops_ids = \
graph, return_mask, transfer_call_back, op_callback_dict, finished_ops_ids = \
cache_engine.get(request_id, req.token_ids, req.token_mask,
fake_slot_mapping, -1, -1)
local_vars.update({
'graph': graph,
'return_mask': return_mask,
'transfer_call_back': transfer_call_back,
'op_callback_dict': op_callback_dict,
'finished_ops_ids': finished_ops_ids
})
profiler.runctx('transfer_call_back()', globals(), local_vars)

return_mask = local_vars['return_mask']
op_callback_dict = local_vars['op_callback_dict']
cache_hit_ratio = return_mask.sum() / req.token_mask.sum()
cache_hit_ratio_list.append(cache_hit_ratio)
flexkv_logger.info(f"need get {req.token_mask.sum()} tokens, "
Expand All @@ -66,16 +68,17 @@ def main(args):
elif req.request_type == "put":
num_put_requests += 1
if not args.only_get:
profiler.runctx('graph, return_mask, transfer_call_back, finished_ops_ids = '
profiler.runctx('graph, return_mask, transfer_call_back, op_callback_dict, finished_ops_ids = '
'cache_engine.put(request_id, req.token_ids, req.token_mask, fake_slot_mapping)',
globals(), local_vars)
else:
graph, return_mask, transfer_call_back, finished_ops_ids = \
graph, return_mask, transfer_call_back, op_callback_dict, finished_ops_ids = \
cache_engine.put(request_id, req.token_ids, req.token_mask, fake_slot_mapping)
local_vars.update({
'graph': graph,
'return_mask': return_mask,
'transfer_call_back': transfer_call_back,
'op_callback_dict': op_callback_dict,
'finished_ops_ids': finished_ops_ids
})

Expand Down Expand Up @@ -105,7 +108,7 @@ def parse_args():
parser = ArgumentParser()
parser.add_argument("--config",
type=str,
default="./benchmarks/example_config.json")
default="./benchmarks/example_config.yml")
parser.add_argument("--only-get", action="store_true")
parser.add_argument("--only-put", action="store_true")
parser.add_argument("--num-users", type=int, default=20)
Expand Down
18 changes: 7 additions & 11 deletions benchmarks/benchmark_single_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import torch

from flexkv.server.client import KVTPClient
from flexkv.common.storage import KVCacheLayout
from flexkv.common.storage import KVCacheLayout, KVCacheLayoutType
from flexkv.common.debug import flexkv_logger
from flexkv.common.config import ModelConfig, CacheConfig
from utils import load_config
Expand All @@ -33,7 +33,7 @@ def run_tp_client(dp_client_id, tp_rank, server_recv_port, model_config, cache_c
num_gpu_blocks = cache_config.num_gpu_blocks

gpu_kv_layout = KVCacheLayout(
type=cache_config.gpu_kv_layout_type,
type=KVCacheLayoutType.LAYERFIRST,
num_layer=model_config.num_layers,
num_block=num_gpu_blocks,
tokens_per_block=cache_config.tokens_per_block,
Expand Down Expand Up @@ -66,13 +66,12 @@ def shutdown_tp_client(tp_client_processes):
def benchmark_flexkv(model_config: ModelConfig,
cache_config: CacheConfig,
benchmark_config: BenchmarkConfig,
gpu_register_port: str,
server_recv_port: str):
):
if model_config.tp_size * model_config.dp_size > torch.cuda.device_count():
raise ValueError(f"tp_size {model_config.tp_size} * dp_size {model_config.dp_size} is greater than "
f"the number of available GPUs {torch.cuda.device_count()}")
print(f"{benchmark_config = }")
kvmanager = KVManager(model_config, cache_config, gpu_register_port, server_recv_port)
kvmanager = KVManager(model_config, cache_config)
kvmanager.start()

tp_client_processes = []
Expand All @@ -85,7 +84,7 @@ def benchmark_flexkv(model_config: ModelConfig,
for tp_rank in range(model_config.tp_size):
tp_client_process = Process(
target=run_tp_client,
args=(0, tp_rank, gpu_register_port,
args=(0, tp_rank, kvmanager.gpu_register_port,
model_config, cache_config),
daemon=True
)
Expand Down Expand Up @@ -161,7 +160,7 @@ def benchmark_flexkv(model_config: ModelConfig,

def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, default="benchmarks/example_config.json")
parser.add_argument("--config", type=str, default="benchmarks/example_config.yml")
# benchmark config
parser.add_argument("--num-layers", type=int, default=-1)
parser.add_argument("--batch-size", type=int, default=1)
Expand All @@ -184,8 +183,5 @@ def parse_args():
# pad sequence length to divisible by tokens_per_block
benchmark_config.sequence_length = \
((benchmark_config.sequence_length - 1) // cache_config.tokens_per_block + 1) * cache_config.tokens_per_block
import uuid
gpu_register_port = f"ipc:///tmp/flexkv_gpu_{uuid.uuid4().hex[:8]}"
server_recv_port = f"ipc:///tmp/flexkv_srv_{uuid.uuid4().hex[:8]}"

benchmark_flexkv(model_config, cache_config, benchmark_config, gpu_register_port, server_recv_port)
benchmark_flexkv(model_config, cache_config, benchmark_config)
53 changes: 28 additions & 25 deletions benchmarks/benchmark_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from flexkv.transfer.worker import GPUCPUTransferWorker, CPUSSDDiskTransferWorker, WorkerHandle, tpGPUCPUTransferWorker
from flexkv.storage.allocator import CPUAllocator, GPUAllocator, SSDAllocator
from flexkv.common.storage import KVCacheLayoutType, KVCacheLayout
from flexkv.common.config import ModelConfig, CacheConfig
from flexkv.common.config import ModelConfig, CacheConfig, GLOBAL_CONFIG_FROM_ENV
from flexkv.common.debug import flexkv_logger

from utils import load_config

# flexkv_logger.set_level("OFF")

Expand All @@ -32,40 +32,42 @@ class BenchmarkConfig:
def make_configs(args: dict) -> Tuple[ModelConfig, CacheConfig, BenchmarkConfig]:
config_file = args.config
try:
with open(config_file) as f:
config = json.load(f)
model_config = ModelConfig(**config["ModelConfig"])
model_config.dtype = eval(f"torch.{model_config.dtype}")
cache_config = CacheConfig(**config["CacheConfig"])
cache_config.num_gpu_blocks = args.num_blocks
bench_config = BenchmarkConfig()
bench_config.transfer_type = TransferType(args.transfer_type)
bench_config.num_layers_to_transfer = args.num_layers
bench_config.num_blocks_to_transfer = args.num_blocks
bench_config.shuffle_ids = args.shuffle_ids
bench_config.warmup_round = args.warmup_round
bench_config.benchmark_round = args.benchmark_round
bench_config.bidirectional = args.bi
return model_config, cache_config, bench_config
model_config, cache_config = load_config(config_file)
if args.transfer_type == "H2D" or args.transfer_type == "D2H":
cache_config.enable_ssd = False
elif args.transfer_type == "H2DISK" or args.transfer_type == "DISK2H":
assert cache_config.enable_ssd, "SSD cache must be enabled for DISK2H or H2DISK benchmark"
bench_config = BenchmarkConfig(
transfer_type=TransferType(args.transfer_type),
num_layers_to_transfer=args.num_layers,
num_blocks_to_transfer=args.num_blocks,
shuffle_ids=args.shuffle_ids,
warmup_round=args.warmup_round,
benchmark_round=args.benchmark_round,
bidirectional=args.bi
)
cache_config.num_ssd_blocks = max(cache_config.num_ssd_blocks, bench_config.num_blocks_to_transfer)
return model_config, cache_config, bench_config
except Exception as e:
raise ValueError(f"Failed to load config file {config_file}: {e}") from None

def create_cpu_gpu_worker(
model_config: ModelConfig,
cache_config: CacheConfig) -> Tuple[WorkerHandle, mp.Queue]:
cache_config: CacheConfig,
num_gpu_blocks: int) -> Tuple[WorkerHandle, mp.Queue]:
mp.set_start_method('spawn', force=True)
cpu_layout = KVCacheLayout(
type=KVCacheLayoutType(cache_config.cpu_kv_layout_type),
type=GLOBAL_CONFIG_FROM_ENV.cpu_layout_type,
num_layer=model_config.num_layers,
num_block=cache_config.num_cpu_blocks,
tokens_per_block=cache_config.tokens_per_block,
num_head=model_config.num_kv_heads,
head_size=model_config.head_size,
)
gpu_layout = KVCacheLayout(
type=KVCacheLayoutType.LAYERWISE,
type=KVCacheLayoutType.LAYERFIRST,
num_layer=model_config.num_layers,
num_block=cache_config.num_gpu_blocks,
num_block=num_gpu_blocks,
tokens_per_block=cache_config.tokens_per_block,
num_head=model_config.num_kv_heads,
head_size=model_config.head_size,
Expand Down Expand Up @@ -132,15 +134,15 @@ def create_cpu_ssd_worker(
cache_config: CacheConfig) -> Tuple[WorkerHandle, mp.Queue]:
mp.set_start_method('spawn', force=True)
cpu_layout = KVCacheLayout(
type=KVCacheLayoutType(cache_config.cpu_kv_layout_type),
type=GLOBAL_CONFIG_FROM_ENV.cpu_layout_type,
num_layer=model_config.num_layers,
num_block=cache_config.num_cpu_blocks,
tokens_per_block=cache_config.tokens_per_block,
num_head=model_config.num_kv_heads,
head_size=model_config.head_size,
)
ssd_layout = KVCacheLayout(
type=KVCacheLayoutType(cache_config.ssd_kv_layout_type),
type=GLOBAL_CONFIG_FROM_ENV.ssd_layout_type,
num_layer=model_config.num_layers,
num_block=cache_config.num_ssd_blocks,
tokens_per_block=cache_config.tokens_per_block,
Expand All @@ -157,6 +159,7 @@ def create_cpu_ssd_worker(
dtype=model_config.dtype,
num_chunks=model_config.num_layers,
cache_dir=cache_config.ssd_cache_dir,
max_file_size_gb=GLOBAL_CONFIG_FROM_ENV.max_file_size_gb,
)
finished_ops_queue = mp.Queue()
# Create a shared memory buffer for transfer operations
Expand Down Expand Up @@ -216,7 +219,7 @@ def bench_worker(args):
bidirectional = bench_config.bidirectional

if transfer_type == TransferType.H2D or transfer_type == TransferType.D2H:
worker_handle, finished_ops_queue = create_cpu_gpu_worker(model_config, cache_config)
worker_handle, finished_ops_queue = create_cpu_gpu_worker(model_config, cache_config, num_blocks_to_transfer)
elif transfer_type == TransferType.H2DISK or transfer_type == TransferType.DISK2H:
worker_handle, finished_ops_queue = create_cpu_ssd_worker(model_config, cache_config)
else:
Expand Down Expand Up @@ -325,7 +328,7 @@ def parse_args():
default=16)
parser.add_argument("--config",
type=str,
default="./benchmarks/example_config.json")
default="./benchmarks/example_config.yml")
parser.add_argument("--shuffle-ids",
action="store_true")
parser.add_argument("--warmup-round",
Expand Down
46 changes: 0 additions & 46 deletions benchmarks/example_config.json

This file was deleted.

13 changes: 13 additions & 0 deletions benchmarks/example_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
num_layers: 64
num_kv_heads: 8
head_size: 128
dtype: bfloat16
use_mla: false
tp_size: 1
dp_size: 1
tokens_per_block: 16

cpu_cache_gb: 8
ssd_cache_gb: 16
ssd_cache_dir: ./ssd_cache1/;./ssd_cache2/
enable_gds: false
Loading