Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion benchmarks/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"enable_remote": false,
"tokens_per_block": 16,
"use_gds": false,
"use_pinned_memory": true,
"gpu_kv_layout_type": "LAYERWISE",
"cpu_kv_layout_type": "BLOCKWISE",
"ssd_kv_layout_type": "BLOCKWISE",
Expand Down
1 change: 0 additions & 1 deletion docs/vllm_adapter/README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ cat <<EOF > ./flexkv_config.json
"cache_config": {
"enable_cpu": true,
"num_cpu_blocks": 10240,
"use_pinned_memory": true
},
"num_log_interval_requests": 200
}
Expand Down
3 changes: 1 addition & 2 deletions docs/vllm_adapter/README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ cat <<EOF > ./flexkv_config.json
"cache_config": {
"enable_cpu": true,
"num_cpu_blocks": 10240,
"use_pinned_memory": true
},
"num_log_interval_requests": 200
}
Expand Down Expand Up @@ -81,4 +80,4 @@ bash benchmarks/flexkv_benchmark/serving_vllm.sh
# 启动性能测试
bash benchmarks/flexkv_benchmark/multiturn_benchmark.sh
```
在 vLLM 0.10.0 版本中应用patch `examples/vllm_adaption_legacy/flexkv_vllm_0_10_0.patch`,测试方法同上。
在 vLLM 0.10.0 版本中应用patch `examples/vllm_adaption_legacy/flexkv_vllm_0_10_0.patch`,测试方法同上。
25 changes: 12 additions & 13 deletions examples/run_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@

def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()

# NAME
parser.add_argument("--enable-cpu",
action=argparse.BooleanOptionalAction,
parser.add_argument("--enable-cpu",
action=argparse.BooleanOptionalAction,
default=True)
parser.add_argument("--enable-ssd",
action=argparse.BooleanOptionalAction,
parser.add_argument("--enable-ssd",
action=argparse.BooleanOptionalAction,
default=False,)
parser.add_argument("--enable-remote",
action=argparse.BooleanOptionalAction,
parser.add_argument("--enable-remote",
action=argparse.BooleanOptionalAction,
default=False,)
parser.add_argument("--model-path", type=str, help="model path", default="")
parser.add_argument("--tp-size", type=int, default=1)
Expand Down Expand Up @@ -54,7 +54,7 @@ def parse_args() -> argparse.Namespace:
if __name__ == "__main__":
args = parse_args()
hf_config = AutoConfig.from_pretrained(args.model_path)

num_layers=hf_config.num_hidden_layers
if hasattr(hf_config, 'num_key_value_heads'):
num_kv_heads=hf_config.num_key_value_heads
Expand All @@ -65,7 +65,7 @@ def parse_args() -> argparse.Namespace:
head_size=(hf_config.head_dim if hasattr(hf_config, 'head_dim')
else hf_config.hidden_size//hf_config.num_attention_heads)
use_mla=hf_config.architectures[0].startswith("Deepseek")

# TODO: different model config may have different attribute name
model_config = ModelConfig(
num_layers=num_layers,
Expand All @@ -76,14 +76,13 @@ def parse_args() -> argparse.Namespace:
dp_size=args.dp_size,
dtype=hf_config.torch_dtype
)

cache_config = CacheConfig(
enable_cpu=args.enable_cpu,
enable_ssd=args.enable_ssd,
enable_remote=args.enable_remote,
use_gds=False,
enable_trace=False,
use_pinned_memory=False,
ssd_cache_iouring_entries=512,
tokens_per_block=args.block_size,
num_cpu_blocks=args.num_cpu_blocks,
Expand All @@ -93,6 +92,6 @@ def parse_args() -> argparse.Namespace:
remote_cache_size_mode=args.remote_cache_size_mode,
remote_cache_path=args.remote_cache_path,
)

kvserver = KVServer(model_config, cache_config, args.server_recv_port)
kvserver.run()
kvserver.run()
55 changes: 27 additions & 28 deletions examples/scheduler_server_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
def run_tp_client_process(dp_client_id, tp_rank, device_id, server_recv_port, model_config, gpu_kv_layout):
"""Run TP client process"""
from flexkv.server.client import KVTPClient

print(f"Starting TP client: dp_client_id={dp_client_id}, tp_rank={tp_rank}, device_id={device_id}")

try:
# Set CUDA device for this process
if torch.cuda.is_available():
Expand All @@ -27,7 +27,7 @@ def run_tp_client_process(dp_client_id, tp_rank, device_id, server_recv_port, mo
torch.cuda.init()
# Clear cache
torch.cuda.empty_cache()

tp_client = KVTPClient(server_recv_port, dp_client_id, device_id)

# Create GPU blocks for this TP client
Expand All @@ -51,7 +51,7 @@ def run_tp_client_process(dp_client_id, tp_rank, device_id, server_recv_port, mo
# Keep TP client running
while True:
time.sleep(1)

except Exception as e:
print(f"TP client {tp_rank} error: {e}")
import traceback
Expand Down Expand Up @@ -84,7 +84,6 @@ def main():
enable_ssd=False,
enable_remote=False,
use_gds=False,
use_pinned_memory=True,
tokens_per_block=tokens_per_block,
num_cpu_blocks=num_cpu_blocks,
)
Expand All @@ -106,14 +105,14 @@ def main():
cache_config=cache_config,
server_recv_port="ipc:///tmp/scheduler_server_example" # TPClient connects to this port
)

# Start background server thread to handle TPClient registration
scheduler_server.start_server_thread()
print(f"SchedulerServer started!")

print("SchedulerServer started!")
print(f"TPClient can connect to: {scheduler_server.get_server_port()}")
print("Starting TP client processes...")

# Start TP client processes
tp_client_processes = []
for tp_rank in range(tp_size):
Expand All @@ -123,7 +122,7 @@ def main():
if device_id >= available_gpus:
device_id = device_id % available_gpus
print(f"Warning: Using GPU {device_id} for TP rank {tp_rank} (not enough GPUs)")

tp_client_process = Process(
target=run_tp_client_process,
args=(0, tp_rank, device_id, scheduler_server.get_server_port(), model_config, gpu_kv_layout),
Expand All @@ -134,32 +133,32 @@ def main():
print(f"Started TP client process for rank {tp_rank} on device {device_id}")

print("Waiting for all TP clients to register...")

time.sleep(5)

# Now we can directly use scheduler_server without network communication
# Example: Create some test data (following benchmark_kvmanager.py pattern)
batch_size = 4
seq_len = 128

print("\n=== Generating test data ===")
# Generate separate sequences for each request (correct approach)
batch_token_ids = []
batch_slot_mappings = []
batch_token_masks = []

for i in range(batch_size):
# Each sequence is independent (seq_len,) shape
token_ids = torch.randint(0, 1000, (seq_len,))
slot_mapping = torch.arange(i * seq_len, (i + 1) * seq_len)
token_mask = torch.ones(seq_len, dtype=torch.bool)

batch_token_ids.append(token_ids)
batch_slot_mappings.append(slot_mapping)
batch_token_masks.append(token_mask)

print(f"Generated {batch_size} sequences, each with {seq_len} tokens")

print("\n=== Executing PUT Operations ===")
# PUT operations - each sequence processed separately
start_time = time.time()
Expand All @@ -173,7 +172,7 @@ def main():
if task_id:
put_task_ids.append(task_id)
print(f"PUT task {task_id} created for sequence {i}")

put_time = (time.time() - start_time) * 1000
print(f"Created {len(put_task_ids)} PUT tasks, time: {put_time:.2f}ms")
time.sleep(2)
Expand All @@ -190,10 +189,10 @@ def main():
if task_id:
get_task_ids.append(task_id)
print(f"GET task {task_id} created for sequence {i}")

get_time = (time.time() - start_time) * 1000
print(f"Created {len(get_task_ids)} GET tasks, time: {get_time:.2f}ms")

print("\n=== Waiting for All Tasks to Complete ===")
# Wait for all tasks to complete - can wait for multiple tasks at once
all_task_ids = put_task_ids + get_task_ids
Expand All @@ -202,7 +201,7 @@ def main():
masks = scheduler_server.wait(all_task_ids)
wait_time = (time.time() - start_time) * 1000
print(f"All {len(all_task_ids)} tasks completed, time: {wait_time:.2f}ms")

# Analyze results
if masks:
total_tokens = 0
Expand All @@ -211,7 +210,7 @@ def main():
tokens = mask.sum().item() if hasattr(mask, 'sum') else len(mask)
total_tokens += tokens
print(f"Task {task_id}: {tokens} tokens processed")

print("\n=== Trying Non-blocking Wait ===")
# Create a few more tasks and try non-blocking wait
extra_task_ids = []
Expand All @@ -223,7 +222,7 @@ def main():
)
if task_id:
extra_task_ids.append(task_id)

if extra_task_ids:
# Immediately try to wait (might not be completed yet)
masks = scheduler_server.try_wait(extra_task_ids)
Expand All @@ -233,15 +232,15 @@ def main():
print(f"Tasks {extra_task_ids} not ready yet, will wait...")
masks = scheduler_server.wait(extra_task_ids)
print(f"Tasks {extra_task_ids} completed after wait")

print("\n✅ All operations completed successfully!")


# Clean up resources
print("\n=== Shutting down SchedulerServer ===")
scheduler_server.shutdown()
print("SchedulerServer has been shut down")

# Terminate TP client processes
print("Terminating TP client processes...")
for i, process in enumerate(tp_client_processes):
Expand All @@ -253,4 +252,4 @@ def main():


if __name__ == "__main__":
main()
main()
Loading