Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/backend/pd_disaggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,19 @@ PD Disaggregation with Mooncake supports the following environment variables for
|:--------:|:-----------:|:--------:
| **`SGLANG_DISAGGREGATION_THREAD_POOL_SIZE`** | Controls the total number of worker threads for KVCache transfer operations per TP rank | A dynamic value calculated by `int(0.75 * os.cpu_count()) // 8)`, which is limited to be larger than 4 and less than 12 to ensure efficiency and prevent thread race conditions |
| **`SGLANG_DISAGGREGATION_QUEUE_SIZE`** | Sets the number of parallel transfer queues. KVCache transfer requests from multiple decode instances will be sharded into these queues so that they can share the threads and the transfer bandwidth at the same time. If it is set to `1`, then we transfer requests one by one according to fcfs strategy | `4` |
| **`SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT`** | Timeout (seconds) for receiving destination KV indices during request initialization | `120` |
| **`SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT`** | Timeout (seconds) for receiving destination KV indices during request initialization | `300` |

If a greater mean TTFT is acceptable, you can `export SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=600` (10 minutes) to relax the timeout condition.
Please be aware that this setting will cause prefill instances to take a longer time to clean up the affected memory resources when a running decode node loses connection.

#### Decode Server Configuration
| Variable | Description | Default |
|:--------:|:-----------:|:--------:
| **`SGLANG_DISAGGREGATION_HEARTBEAT_INTERVAL`** | Interval (seconds) between health checks to prefill bootstrap servers | `5.0` |
| **`SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE`** | Consecutive heartbeat failures before marking prefill server offline | `2` |
| **`SGLANG_DISAGGREGATION_WAITING_TIMEOUT`** | Timeout (seconds) for receiving KV Cache after request initialization | `300` |

If a greater mean TTFT is acceptable, you can `export SGLANG_DISAGGREGATION_WAITING_TIMEOUT=600` (10 minutes) to relax the timeout condition.


## NIXL
Expand Down
39 changes: 35 additions & 4 deletions python/sglang/srt/disaggregation/mooncake/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,11 @@ def __init__(
threading.Thread(
target=self.transfer_worker, args=(queue, executor), daemon=True
).start()

self.bootstrap_time_out = get_int_env_var(
"SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT", 120
# If a timeout happens on the prefill side, it means prefill instances
# fail to receive the KV indices from the decode instance of this request.
# These timeout requests should be aborted to release the tree cache.
self.bootstrap_timeout = get_int_env_var(
"SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT", 300
)
elif self.disaggregation_mode == DisaggregationMode.DECODE:
self.heartbeat_failures = {}
Expand All @@ -209,6 +211,12 @@ def __init__(
self.connection_pool: Dict[str, Dict[str, Union[str, int]]] = {}
self.prefill_tp_size_table: Dict[str, int] = {}
self.prefill_dp_size_table: Dict[str, int] = {}
# If a timeout happens on the decode side, it means decode instances
# fail to receive the KV Cache transfer done signal after bootstrapping.
# These timeout requests should be aborted to release the tree cache.
self.waiting_timeout = get_int_env_var(
"SGLANG_DISAGGREGATION_WAITING_TIMEOUT", 300
)
else:
raise ValueError(
f"Unsupported DisaggregationMode: {self.disaggregation_mode}"
Expand Down Expand Up @@ -938,7 +946,12 @@ def poll(self) -> KVPoll:
if self.init_time is not None:
now = time.time()
elapsed = now - self.init_time
if elapsed >= self.kv_mgr.bootstrap_time_out:
if elapsed >= self.kv_mgr.bootstrap_timeout:
logger.warning_once(
"Some requests timed out when bootstrapping, "
"which means prefill instances fail to receive the KV indices from the decode instance of this request. "
"If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=600' (10 minutes) to relax the timeout condition. "
)
self.kv_mgr.record_failure(
self.bootstrap_room,
f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.Bootstrapping",
Expand Down Expand Up @@ -987,6 +1000,7 @@ def __init__(
self.session_id = self.kv_mgr.get_session_id()
self.kv_mgr.update_status(self.bootstrap_room, KVPoll.Bootstrapping)
self.conclude_state = None
self.init_time = None
self.data_parallel_rank = data_parallel_rank

if self.bootstrap_addr not in self.kv_mgr.prefill_dp_size_table:
Expand Down Expand Up @@ -1222,14 +1236,31 @@ def init(self, kv_indices: npt.NDArray[np.int32], aux_index: Optional[int] = Non
str(self.required_dst_info_num).encode("ascii"),
]
)
self.init_time = time.time()

def poll(self) -> KVPoll:
if self.conclude_state is None:
status = self.kv_mgr.check_status(self.bootstrap_room)
if status in (KVPoll.Success, KVPoll.Failed):
self.conclude_state = status
elif status == KVPoll.WaitingForInput:
if self.init_time is not None:
now = time.time()
elapsed = now - self.init_time
if elapsed >= self.kv_mgr.waiting_timeout:
logger.warning_once(
"Some requests fail to receive KV Cache transfer done signal after bootstrapping. "
"If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_WAITING_TIMEOUT=600' (10 minutes) to relax the timeout condition. "
)
self.kv_mgr.record_failure(
self.bootstrap_room,
f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.WaitingForInput",
)
self.conclude_state = KVPoll.Failed
return KVPoll.Failed

return status

else:
return self.conclude_state

Expand Down
Loading