Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,22 @@ def _insert_zmq_task_to_scheduler(self):
"Request is aborted since LLM Engine is paused.",
worker_pid=worker_pid,
)
# PD ghost prevention: notify decode side to recycle its
# scheduler entry, otherwise it would sit there as a ghost
# since prefill will never deliver any first token.
if (
self.cfg.scheduler_config.splitwise_role == "prefill"
and getattr(request, "disaggregate_info", None)
and self.split_connector is not None
):
try:
self.split_connector.send_drop_signal(
request.request_id, request.disaggregate_info
)
except Exception as e:
self.llm_logger.warning(
f"Failed to send drop signal for {request.request_id}: {e}"
)
continue
except Exception as e:
self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}")
Expand Down Expand Up @@ -1278,15 +1294,47 @@ def _control_pause(self, control_request: ControlRequest):
def _wait_inflight_drained(self):
"""
Wait until resource_manager.requests is completely empty.
No timeout — abort pipeline will complete.
Logs a warning every 30 seconds while waiting to help diagnose potential hangs.
Logs a warning and remove scheduler-only request every 30 seconds while waiting to help diagnose potential hangs.
"""
start_time = time.monotonic()
next_warn_time = start_time + 30
GHOST_REAP_AFTER = 30.0

while self.resource_manager.requests or self.scheduler.requests:
now = time.monotonic()

late_ids = list(
set(self.resource_manager.requests.keys())
- self.resource_manager.waiting_abort_req_id_set
- self.resource_manager.to_be_aborted_req_id_set
)
if late_ids:
self.resource_manager.add_abort_req_ids(late_ids)
self.llm_logger.info(f"Pause drain: late-arrived requests added to abort set: {late_ids}")

if now - start_time >= GHOST_REAP_AFTER:
scheduler_only_ids = list(
set(self.scheduler.requests.keys()) - set(self.resource_manager.requests.keys())
)
if scheduler_only_ids:
ghost_outputs = [
RequestOutput(
request_id=req_id,
finished=True,
error_code=499,
error_msg=(f"forced cleanup after {GHOST_REAP_AFTER}s"),
)
for req_id in scheduler_only_ids
]
self.scheduler.put_results(ghost_outputs)
self.llm_logger.warning(
f"Pause drain timeout: reaped {len(scheduler_only_ids)} "
f"scheduler-only ghost(s) after {GHOST_REAP_AFTER}s: "
f"{scheduler_only_ids}"
)
# Reset to avoid re-reaping on the next tick
start_time = now

if now >= next_warn_time:
self.llm_logger.warning(
"Still waiting for inflight requests to drain, "
Expand Down Expand Up @@ -1751,6 +1799,31 @@ def _fetch_requests():

items = self.engine_worker_queue.get_disaggregated_tasks()
for item in items:
msg_type = item[0]

# PD pause race: P drops a request via paused gate and notifies us
# to recycle our scheduler entry (otherwise it becomes a ghost that
# blocks pause/abort drain forever). Synthesize a finished
# RequestOutput so it walks the normal put_results -> _recycle path
# and the client gets a 499 error response.
if msg_type == "decode_drop":
drop_outputs = [
RequestOutput(
request_id=req_id,
finished=True,
error_code=499,
error_msg="Aborted: prefill dropped this request (paused gate)",
)
for req_id in item[1]
]
if drop_outputs:
self.scheduler.put_results(drop_outputs)
self.llm_logger.info(
"Decode recycled scheduler ghost(s) via P-side drop signal: "
f"{[r.request_id for r in drop_outputs]}"
)
continue

tasks = item[1]
if isinstance(tasks[0], Request):
self.llm_logger.debug(
Expand Down Expand Up @@ -1815,9 +1888,17 @@ def _process_prefilled_requests():
nonlocal prefilled_request_ouputs
ready_request_outputs = []
waiting_request_outputs = []
ghost_request_outputs = []

for req_output in prefilled_request_ouputs:
if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_output.request_id):
req_id = req_output.request_id
if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_id):
if (
req_id in self.resource_manager.waiting_abort_req_id_set
or req_id in self.resource_manager.to_be_aborted_req_id_set
):
ghost_request_outputs.append(req_output)
continue
# ensure the api_server and scheduler in decode have
# received the request sent by the client
waiting_request_outputs.append(req_output)
Expand All @@ -1828,6 +1909,22 @@ def _process_prefilled_requests():
self.llm_logger.debug(f"there are enough resource for prefilled request: {req_output.request_id}")

prefilled_request_ouputs = waiting_request_outputs

for req_output in ghost_request_outputs:
req_id = req_output.request_id
self.llm_logger.warning(
f"Pause drain: reaping prefilled-output ghost {req_id} "
"(scheduler never registered, marked for abort -- breaks deadlock)"
)
try:
self.resource_manager.pre_recycle_resource(req_id)
except Exception as e:
self.llm_logger.warning(f"pre_recycle_resource({req_id}) failed: {e}")
self.resource_manager.waiting_abort_req_id_set.discard(req_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _process_prefilled_requests 中直接操作 resource_manager 内部集合,绕过封装和锁

resource_manager_v1.py 中对 waiting_abort_req_id_set / to_be_aborted_req_id_set 的修改均在 self.lock 保护下进行(参见 recycle_abort_taskadd_abort_req_ids 等方法)。此处直接调用 .discard() 未持有锁,破坏了封装一致性;在高并发场景下存在潜在竞态。

建议在 resource_manager_v1.py 中新增 recycle_ghost_resource 方法,将 pre_recycle_resource + 两个 set 的 discard 合并到一次持锁操作中:

def recycle_ghost_resource(self, request_id: str):
    """Recycle a ghost request that was never registered in scheduler."""
    with self.lock:
        if request_id in self.requests:
            req = self.requests[request_id]
            self.tasks_list[req.idx] = None
            self.stop_flags[req.idx] = True
            self._free_blocks(req)
            del self.requests[request_id]
            if request_id in self.req_dict:
                del self.req_dict[request_id]
        self.waiting_abort_req_id_set.discard(request_id)
        self.to_be_aborted_req_id_set.discard(request_id)

self.resource_manager.to_be_aborted_req_id_set.discard(req_id)
if req_id in self.token_processor.tokens_counter:
del self.token_processor.tokens_counter[req_id]

if self.cfg.splitwise_version == "v1":
# decode return first token to client
self.scheduler.put_results(ready_request_outputs)
Expand Down
35 changes: 35 additions & 0 deletions fastdeploy/splitwise/splitwise_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,27 @@ def send_first_token(self, prefill_msg, tasks_list):
)
self._send_message(addr, "decode", tasks_list)

def send_drop_signal(self, request_id: str, disaggregate_info: dict):
"""
Notify the decode side that this prefill request has been dropped
(e.g. paused gate rejected it on P). The decode side should recycle
its scheduler entry for this request_id, otherwise it would sit
there forever as a ghost and pause/abort drain would hang.
"""
if not disaggregate_info:
return
decode_ip = disaggregate_info.get("decode_ip")
decode_port = disaggregate_info.get("decode_connector_port")
if not decode_ip or not decode_port:
self.logger.warning(
f"send_drop_signal: missing decode_ip/decode_connector_port in "
f"disaggregate_info for {request_id}; skip"
)
return
addr = f"{decode_ip}:{decode_port}"
self.logger.info(f"send_drop_signal: addr={addr}, request_id={request_id}")
self._send_message(addr, "drop", {"request_id": request_id})

def check_decode_allocated(self, task):
"""Check whether the requests have been allocated resources in decode."""
self.logger.debug(f"check_decode_allocated: {task.request_id}")
Expand Down Expand Up @@ -382,6 +403,8 @@ def _process_message(self, frames: List[bytes]):
self._handle_prefill(payload)
elif msg_type == "decode":
self._handle_decode(payload)
elif msg_type == "drop":
self._handle_drop(payload)
elif msg_type == "cache_sync":
for task in payload:
self.logger.info(f"_process_message: cache_sync task: {task}")
Expand Down Expand Up @@ -412,3 +435,15 @@ def _handle_decode(self, payload):
for task in payload:
tasks.append(RequestOutput.from_dict(task))
self.engine_worker_queue.put_disaggregated_tasks(("decode", tasks))

def _handle_drop(self, payload):
"""
Handle drop signal from prefill: forward to engine worker queue so the
decode engine main loop can recycle the corresponding scheduler entry.
"""
request_id = payload.get("request_id") if isinstance(payload, dict) else None
if not request_id:
self.logger.warning(f"_handle_drop: invalid payload {payload}")
return
self.logger.info(f"_handle_drop: request_id={request_id}")
self.engine_worker_queue.put_disaggregated_tasks(("decode_drop", [request_id]))
Loading