Skip to content

Commit aed0c87

Browse files
committed
Merge remote-tracking branch 'origin/develop' into develop
2 parents 688c50e + 6c924b3 commit aed0c87

16 files changed

Lines changed: 175 additions & 111 deletions

File tree

custom_ops/utils/auto_gen_w4afp8_gemm_kernel.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@
9494
[2560, 1536, 64, 0, 128],
9595
[1536, 2560, 64, 0, 128],
9696
[2560, 768, 64, 0, 128],
97+
[768, 2048, 128, 0, 128],
98+
[2048, 384, 128, 0, 128],
9799
]
98100

99101
dtype = ["BF16"]
File renamed without changes.

examples/mooncake_store/mooncake_config.json renamed to examples/cache_storage/mooncake_config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
"global_segment_size":8589934592,
55
"local_buffer_size":134217728,
66
"protocol":"rdma",
7-
"rdma_devices": "mlx5_1,mlx5_2,mlx5_3,mlx5_4",
7+
"rdma_devices": "",
88
"master_server_addr":"0.0.0.0:15001"
99
}
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ S1_PORT=52800
1717
ports=(
1818
$S0_PORT $((S0_PORT + 1)) $((S0_PORT + 2)) $((S0_PORT + 3))
1919
$S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3))
20-
$ROUTER_PORT
2120
)
2221
check_ports "${ports[@]}" || {
2322
echo "❌ Some ports are in use. Please release them."

fastdeploy/cache_manager/prefix_cache_manager.py

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -265,40 +265,41 @@ def launch_cache_manager(
265265
else:
266266
kvcache_storage_backend_str = "none"
267267

268-
for i in range(tensor_parallel_size):
269-
launch_cmd = (
270-
"FLAGS_allocator_strategy=auto_growth "
271-
+ visible_devices
272-
+ " NCCL_MAX_NCHANNELS=1 NCCL_BUFFSIZE=0"
273-
+ f" FD_ENABLE_SWAP_SPACE_CLEARING={envs.FD_ENABLE_SWAP_SPACE_CLEARING}"
274-
+ f" {sys.executable} {py_path}"
275-
+ f" --device_id {int(device_ids[i])}"
276-
+ f" --rank {i}"
277-
+ f" --splitwise_role {self.splitwise_role}"
278-
+ f" --num_layers {cache_config.model_cfg.num_hidden_layers}"
279-
+ f" --mp_num {tensor_parallel_size}"
280-
+ f" --cache_dtype {cache_config.cache_dtype}"
281-
+ f" --key_cache_shape {key_cache_shape}"
282-
+ val_cache_arg_str
283-
+ f" --cache_queue_port {cache_config.local_cache_queue_port}"
284-
+ f" --enable_splitwise {int(self.enable_splitwise)}"
285-
+ f" --pod_ip {pod_ip}"
286-
+ f" --engine_worker_queue_port {engine_worker_queue_port}"
287-
+ f" --num_cpu_blocks {cache_config.num_cpu_blocks}"
288-
+ f" --ipc_suffix {ipc_suffix}"
289-
+ f" --protocol {cache_config.cache_transfer_protocol}"
290-
+ f" --local_data_parallel_id {self.local_data_parallel_id}"
291-
+ f" --rdma_port {cache_config.local_rdma_comm_ports[i] if cache_config.local_rdma_comm_ports is not None else '0'}"
292-
+ f" --speculative_config '{self.speculative_config.to_json_string()}'"
293-
+ f" --default_dtype '{self.config.model_config.dtype}'"
294-
+ (" --create_cache_tensor" if create_cache_tensor else "")
295-
+ f" --kvcache_storage_backend {kvcache_storage_backend_str}"
296-
+ f" --write_policy {cache_config.write_policy}"
297-
+ f" --max_model_len {self.config.model_config.max_model_len}"
298-
+ f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1"
299-
)
300-
logger.info(f"Launch cache transfer manager, command:{launch_cmd}")
301-
cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid))
268+
if self.cache_config.swap_space or self.cache_config.kvcache_storage_backend:
269+
for i in range(tensor_parallel_size):
270+
launch_cmd = (
271+
"FLAGS_allocator_strategy=auto_growth "
272+
+ visible_devices
273+
+ " NCCL_MAX_NCHANNELS=1 NCCL_BUFFSIZE=0"
274+
+ f" FD_ENABLE_SWAP_SPACE_CLEARING={envs.FD_ENABLE_SWAP_SPACE_CLEARING}"
275+
+ f" {sys.executable} {py_path}"
276+
+ f" --device_id {int(device_ids[i])}"
277+
+ f" --rank {i}"
278+
+ f" --splitwise_role {self.splitwise_role}"
279+
+ f" --num_layers {cache_config.model_cfg.num_hidden_layers}"
280+
+ f" --mp_num {tensor_parallel_size}"
281+
+ f" --cache_dtype {cache_config.cache_dtype}"
282+
+ f" --key_cache_shape {key_cache_shape}"
283+
+ val_cache_arg_str
284+
+ f" --cache_queue_port {cache_config.local_cache_queue_port}"
285+
+ f" --enable_splitwise {int(self.enable_splitwise)}"
286+
+ f" --pod_ip {pod_ip}"
287+
+ f" --engine_worker_queue_port {engine_worker_queue_port}"
288+
+ f" --num_cpu_blocks {cache_config.num_cpu_blocks}"
289+
+ f" --ipc_suffix {ipc_suffix}"
290+
+ f" --protocol {cache_config.cache_transfer_protocol}"
291+
+ f" --local_data_parallel_id {self.local_data_parallel_id}"
292+
+ f" --rdma_port {cache_config.local_rdma_comm_ports[i] if cache_config.local_rdma_comm_ports is not None else '0'}"
293+
+ f" --speculative_config '{self.speculative_config.to_json_string()}'"
294+
+ f" --default_dtype '{self.config.model_config.dtype}'"
295+
+ (" --create_cache_tensor" if create_cache_tensor else "")
296+
+ f" --kvcache_storage_backend {kvcache_storage_backend_str}"
297+
+ f" --write_policy {cache_config.write_policy}"
298+
+ f" --max_model_len {self.config.model_config.max_model_len}"
299+
+ f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1"
300+
)
301+
logger.info(f"Launch cache transfer manager, command:{launch_cmd}")
302+
cache_manager_processes.append(subprocess.Popen(launch_cmd, shell=True, preexec_fn=os.setsid))
302303

303304
logger.info("PrefixCacheManager is waiting for kv cache to be initialized.")
304305
while np.sum(self.cache_ready_signal.value) != tensor_parallel_size:
@@ -308,13 +309,14 @@ def launch_cache_manager(
308309
while np.sum(self.swap_space_ready_signal.value) != tensor_parallel_size:
309310
time.sleep(1)
310311

311-
exit_code = cache_manager_processes[-1].poll()
312-
if exit_code is None:
313-
logger.info("Launch cache transfer manager successful")
314-
else:
315-
logger.info(
316-
"Launch cache transfer manager failed, see launch_cache_transfer_manager.log for more information"
317-
)
312+
if cache_manager_processes:
313+
exit_code = cache_manager_processes[-1].poll()
314+
if exit_code is None:
315+
logger.info("Launch cache transfer manager successful")
316+
else:
317+
logger.info(
318+
"Launch cache transfer manager failed, see launch_cache_transfer_manager.log for more information"
319+
)
318320

319321
# Start additional threads
320322
if cache_config.kvcache_storage_backend or self.num_cpu_blocks > 0:

fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ def create() -> "MooncakeStoreConfig":
5050
file_path = os.getenv("MOONCAKE_CONFIG_PATH")
5151

5252
if file_path is None:
53-
local_hostname = os.environ.get("MOONCAKE_LOCAL_HOSTNAME")
53+
local_hostname = os.environ.get("MOONCAKE_LOCAL_HOSTNAME", "localhost")
5454
metadata_server = os.environ.get("MOONCAKE_METADATA_SERVER")
55-
global_segment_size = os.environ.get("MOONCAKE_GLOBAL_SEGMENT_SIZE", DEFAULT_GLOBAL_SEGMENT_SIZE)
56-
local_buffer_size = os.environ.get("MOONCAKE_LOCAL_BUFFER_SIZE", DEFAULT_LOCAL_BUFFER_SIZE)
55+
global_segment_size = int(os.environ.get("MOONCAKE_GLOBAL_SEGMENT_SIZE", DEFAULT_GLOBAL_SEGMENT_SIZE))
56+
local_buffer_size = int(os.environ.get("MOONCAKE_LOCAL_BUFFER_SIZE", DEFAULT_LOCAL_BUFFER_SIZE))
5757
protocol = os.environ.get("MOONCAKE_PROTOCOL", "rdma")
5858
rdma_devices = os.environ.get("MOONCAKE_RDMA_DEVICES", "")
5959
master_server_addr = os.environ.get("MOONCAKE_MASTER_SERVER_ADDR")
@@ -63,10 +63,10 @@ def create() -> "MooncakeStoreConfig":
6363
with open(file_path) as fin:
6464
config = json.load(fin)
6565

66-
local_hostname = config.get("local_hostname")
66+
local_hostname = config.get("local_hostname", "localhost")
6767
metadata_server = config.get("metadata_server")
68-
global_segment_size = config.get("global_segment_size", DEFAULT_GLOBAL_SEGMENT_SIZE)
69-
local_buffer_size = config.get("local_buffer_size", DEFAULT_LOCAL_BUFFER_SIZE)
68+
global_segment_size = int(config.get("global_segment_size", DEFAULT_GLOBAL_SEGMENT_SIZE))
69+
local_buffer_size = int(config.get("local_buffer_size", DEFAULT_LOCAL_BUFFER_SIZE))
7070
protocol = config.get("protocol", "rdma")
7171
rdma_devices = config.get("rdma_devices", "")
7272
master_server_addr = config.get("master_server_addr")
@@ -75,6 +75,8 @@ def create() -> "MooncakeStoreConfig":
7575
# FIXME: use auto-select NICs in MooncakeStore will raise error and roll back to using TCP
7676
rdma_devices = get_rdma_nics()
7777
logger.info(f"No RDMA devices specified, defaulting to all available devices: {rdma_devices}")
78+
if metadata_server is None or master_server_addr is None:
79+
raise ValueError("Both MOONCAKE_METADATA_SERVER and MOONCAKE_MASTER_SERVER_ADDR must be provided.")
7880

7981
return MooncakeStoreConfig(
8082
local_hostname=local_hostname,

fastdeploy/entrypoints/engine_client.py

Lines changed: 32 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
from fastdeploy.input.preprocess import InputPreprocessor
3535
from fastdeploy.inter_communicator import (
3636
IPCSignal,
37-
KVCacheStatus,
3837
ModelWeightsStatus,
3938
PrefixTreeStatus,
4039
RearrangeExpertStatus,
@@ -529,6 +528,19 @@ def update_model_weight(self, timeout=300):
529528
2 : worker update finish and notify client
530529
"""
531530
with self.clear_update_lock:
531+
if self.fd_config.cache_config.swap_space:
532+
return False, "hierarchical cache updating is not supported"
533+
534+
if self.enable_prefix_caching:
535+
# prefix_tree_status_signal: CLEARED -> UPDATING -> NORMAL
536+
if self.prefix_tree_status_signal.value[0] == PrefixTreeStatus.CLEARED:
537+
self.prefix_tree_status_signal.value[0] = PrefixTreeStatus.UPDATING
538+
api_server_logger.info(f"Start to update prefix tree {self.prefix_tree_status_signal.value[0]}")
539+
while self.prefix_tree_status_signal.value[0] != PrefixTreeStatus.NORMAL:
540+
api_server_logger.info(f"..updating prefix tree {self.prefix_tree_status_signal.value[0]}")
541+
time.sleep(1)
542+
543+
# model_weights_status_signal: CLEARED -> UPDATING -> NORMAL
532544
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.NORMAL:
533545
return True, ""
534546
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.UPDATING:
@@ -537,34 +549,13 @@ def update_model_weight(self, timeout=300):
537549
return False, "worker is clearing model weight, cannot update now"
538550

539551
self.model_weights_status_signal.value[0] = ModelWeightsStatus.UPDATING
540-
if self.enable_prefix_caching or self.enable_splitwise:
541-
self.kv_cache_status_signal.value[0] = KVCacheStatus.UPDATING
542-
if self.enable_prefix_caching:
543-
self.prefix_tree_status_signal.value[0] = PrefixTreeStatus.UPDATING
544-
api_server_logger.info(f"start update model weight {self.model_weights_status_signal.value}")
545-
all_updated = False
546-
while timeout >= 0 and not all_updated:
547-
api_server_logger.info(
548-
f"Updating model weights.. "
549-
f"model_weights_status: {self.model_weights_status_signal.value[0]}, "
550-
f"prefix_tree_status: {self.prefix_tree_status_signal.value[0]}, "
551-
f"kv_cache_status: {self.kv_cache_status_signal.value[0]} "
552-
)
553-
weight_updated = self.model_weights_status_signal.value[0] == ModelWeightsStatus.NORMAL
554-
cache_updated = self.kv_cache_status_signal.value[0] == KVCacheStatus.NORMAL
555-
prefix_updated = self.prefix_tree_status_signal.value[0] == PrefixTreeStatus.NORMAL
556-
if self.enable_prefix_caching or self.enable_splitwise:
557-
if self.enable_prefix_caching:
558-
all_updated = weight_updated and cache_updated and prefix_updated
559-
else:
560-
all_updated = weight_updated and cache_updated
561-
else:
562-
all_updated = weight_updated
552+
api_server_logger.info(f"Start to update model weight {self.model_weights_status_signal.value[0]}")
553+
while timeout >= 0 and self.model_weights_status_signal.value[0] != ModelWeightsStatus.NORMAL:
554+
api_server_logger.info(f"..updating model weights {self.model_weights_status_signal.value[0]}")
563555
time.sleep(1)
564556
timeout -= 1
565557
if timeout < 0:
566558
return False, "Update model weight timeout"
567-
time.sleep(1)
568559
return True, ""
569560

570561
def clear_load_weight(self, timeout=300):
@@ -575,6 +566,19 @@ def clear_load_weight(self, timeout=300):
575566
"""
576567

577568
with self.clear_update_lock:
569+
if self.fd_config.cache_config.swap_space:
570+
return False, "hierarchical cache clearing is not supported"
571+
572+
if self.enable_prefix_caching:
573+
# prefix_tree_status_signal: NORMAL -> CLEARING -> CLEARED
574+
if self.prefix_tree_status_signal.value[0] == PrefixTreeStatus.NORMAL:
575+
self.prefix_tree_status_signal.value[0] = PrefixTreeStatus.CLEARING
576+
api_server_logger.info(f"Start to clear prefix tree {self.prefix_tree_status_signal.value[0]}")
577+
while self.prefix_tree_status_signal.value[0] != PrefixTreeStatus.CLEARED:
578+
api_server_logger.info(f"..clearing prefix tree {self.prefix_tree_status_signal.value[0]}")
579+
time.sleep(1)
580+
581+
# model_weights_status_signal: NORMAL -> CLEARING -> CLEARED
578582
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARED:
579583
return True, ""
580584
if self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARING:
@@ -583,36 +587,13 @@ def clear_load_weight(self, timeout=300):
583587
return False, "worker is updating model weight, cannot clear now"
584588

585589
self.model_weights_status_signal.value[0] = ModelWeightsStatus.CLEARING
586-
if self.enable_prefix_caching or self.enable_splitwise:
587-
self.kv_cache_status_signal.value[0] = KVCacheStatus.CLEARING
588-
if self.enable_prefix_caching:
589-
self.prefix_tree_status_signal.value[0] = PrefixTreeStatus.CLEARING
590-
591-
api_server_logger.info(f"start clear model weight {self.model_weights_status_signal.value}")
592-
all_cleared = False
593-
while timeout >= 0 and not all_cleared:
594-
api_server_logger.info(
595-
f"Clearing model weights.. "
596-
f"model_weights_status: {self.model_weights_status_signal.value[0]}, "
597-
f"prefix_tree_status: {self.prefix_tree_status_signal.value[0]}, "
598-
f"kv_cache_status: {self.kv_cache_status_signal.value[0]} "
599-
)
600-
weight_cleared = self.model_weights_status_signal.value[0] == ModelWeightsStatus.CLEARED
601-
cache_cleared = self.kv_cache_status_signal.value[0] == KVCacheStatus.CLEARED
602-
prefix_cleared = self.prefix_tree_status_signal.value[0] == PrefixTreeStatus.CLEARED
603-
if self.enable_prefix_caching or self.enable_splitwise:
604-
if self.enable_prefix_caching:
605-
all_cleared = weight_cleared and cache_cleared and prefix_cleared
606-
else:
607-
all_cleared = weight_cleared and cache_cleared
608-
else:
609-
all_cleared = weight_cleared
590+
api_server_logger.info(f"Start to clear model weight {self.model_weights_status_signal.value[0]}")
591+
while timeout >= 0 and self.model_weights_status_signal.value[0] != ModelWeightsStatus.CLEARED:
592+
api_server_logger.info(f"..clearing model weights {self.model_weights_status_signal.value[0]}")
610593
time.sleep(1)
611594
timeout -= 1
612-
613595
if timeout < 0:
614596
return False, "Clear model weight timeout"
615-
time.sleep(1)
616597
return True, ""
617598

618599
def check_model_weight_status(self):

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,9 @@ async def completion_stream_generator(
546546
reasoning_content="",
547547
arrival_time=arrival_time,
548548
logprobs=logprobs_res,
549-
prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res),
549+
prompt_logprobs=(
550+
clamp_prompt_logprobs(prompt_logprobs_res) if not request.return_token_ids else None
551+
),
550552
draft_logprobs=draft_logprobs_res,
551553
speculate_metrics=output_speculate_metrics,
552554
)

fastdeploy/model_executor/layers/quantization/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@
3636
]
3737

3838

39+
def _compute_hadamard_block_size(moe_intermediate_size: int, tp_size: int) -> int:
40+
if moe_intermediate_size % tp_size != 0:
41+
raise ValueError(
42+
f"moe_intermediate_size ({moe_intermediate_size}) must be divisible by " f"tp_size ({tp_size})"
43+
)
44+
45+
shard_size = moe_intermediate_size // tp_size
46+
block_size = shard_size & (-shard_size)
47+
block_size = min(block_size, 512)
48+
49+
return block_size
50+
51+
3952
def parse_quant_config(args, model_config, is_ernie, is_v1_loader):
4053
if args.quantization is not None and isinstance(args.quantization, str):
4154
args.quantization = parse_quantization(args.quantization)
@@ -89,7 +102,12 @@ def parse_quant_config(args, model_config, is_ernie, is_v1_loader):
89102
quantization_config["dense_quant_type"] = "block_wise_fp8"
90103
quantization_config["moe_quant_type"] = "w4afp8"
91104
tp_size = getattr(args, "tensor_parallel_size", 1)
92-
quantization_config["hadamard_block_size"] = 512 // tp_size
105+
moe_intermediate_size = getattr(model_config, "moe_intermediate_size", None)
106+
if moe_intermediate_size is not None:
107+
hadamard_block_size = _compute_hadamard_block_size(moe_intermediate_size, tp_size)
108+
quantization_config["hadamard_block_size"] = hadamard_block_size
109+
else:
110+
quantization_config["hadamard_block_size"] = 512
93111
quantization_config["quantization"] = "mix_quant"
94112
quant_config_name = "mix_quant"
95113
else:

0 commit comments

Comments
 (0)