Skip to content

Commit 76a47d9

Browse files
committed
fix refact abort
1 parent d71bdda commit 76a47d9

2 files changed

Lines changed: 135 additions & 3 deletions

File tree

fastdeploy/engine/common_engine.py

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,22 @@ def _insert_zmq_task_to_scheduler(self):
11291129
"Request is aborted since LLM Engine is paused.",
11301130
worker_pid=worker_pid,
11311131
)
1132+
# PD ghost prevention: notify decode side to recycle its
1133+
# scheduler entry, otherwise it would sit there as a ghost
1134+
# since prefill will never deliver any first token.
1135+
if (
1136+
self.cfg.scheduler_config.splitwise_role == "prefill"
1137+
and getattr(request, "disaggregate_info", None)
1138+
and self.split_connector is not None
1139+
):
1140+
try:
1141+
self.split_connector.send_drop_signal(
1142+
request.request_id, request.disaggregate_info
1143+
)
1144+
except Exception as e:
1145+
self.llm_logger.warning(
1146+
f"Failed to send drop signal for {request.request_id}: {e}"
1147+
)
11321148
continue
11331149
except Exception as e:
11341150
self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}")
@@ -1278,15 +1294,47 @@ def _control_pause(self, control_request: ControlRequest):
12781294
def _wait_inflight_drained(self):
12791295
"""
12801296
Wait until resource_manager.requests is completely empty.
1281-
No timeout — abort pipeline will complete.
1282-
Logs a warning every 30 seconds while waiting to help diagnose potential hangs.
1297+
Logs a warning and remove scheduler-only request every 30 seconds while waiting to help diagnose potential hangs.
12831298
"""
12841299
start_time = time.monotonic()
12851300
next_warn_time = start_time + 30
1301+
GHOST_REAP_AFTER = 30.0
12861302

12871303
while self.resource_manager.requests or self.scheduler.requests:
12881304
now = time.monotonic()
12891305

1306+
late_ids = list(
1307+
set(self.resource_manager.requests.keys())
1308+
- self.resource_manager.waiting_abort_req_id_set
1309+
- self.resource_manager.to_be_aborted_req_id_set
1310+
)
1311+
if late_ids:
1312+
self.resource_manager.add_abort_req_ids(late_ids)
1313+
self.llm_logger.info(f"Pause drain: late-arrived requests added to abort set: {late_ids}")
1314+
1315+
if now - start_time >= GHOST_REAP_AFTER:
1316+
scheduler_only_ids = list(
1317+
set(self.scheduler.requests.keys()) - set(self.resource_manager.requests.keys())
1318+
)
1319+
if scheduler_only_ids:
1320+
ghost_outputs = [
1321+
RequestOutput(
1322+
request_id=req_id,
1323+
finished=True,
1324+
error_code=499,
1325+
error_msg=(f"forced cleanup after {GHOST_REAP_AFTER}s"),
1326+
)
1327+
for req_id in scheduler_only_ids
1328+
]
1329+
self.scheduler.put_results(ghost_outputs)
1330+
self.llm_logger.warning(
1331+
f"Pause drain timeout: reaped {len(scheduler_only_ids)} "
1332+
f"scheduler-only ghost(s) after {GHOST_REAP_AFTER}s: "
1333+
f"{scheduler_only_ids}"
1334+
)
1335+
# Reset to avoid re-reaping on the next tick
1336+
start_time = now
1337+
12901338
if now >= next_warn_time:
12911339
self.llm_logger.warning(
12921340
"Still waiting for inflight requests to drain, "
@@ -1751,6 +1799,31 @@ def _fetch_requests():
17511799

17521800
items = self.engine_worker_queue.get_disaggregated_tasks()
17531801
for item in items:
1802+
msg_type = item[0]
1803+
1804+
# PD pause race: P drops a request via paused gate and notifies us
1805+
# to recycle our scheduler entry (otherwise it becomes a ghost that
1806+
# blocks pause/abort drain forever). Synthesize a finished
1807+
# RequestOutput so it walks the normal put_results -> _recycle path
1808+
# and the client gets a 499 error response.
1809+
if msg_type == "decode_drop":
1810+
drop_outputs = [
1811+
RequestOutput(
1812+
request_id=req_id,
1813+
finished=True,
1814+
error_code=499,
1815+
error_msg="Aborted: prefill dropped this request (paused gate)",
1816+
)
1817+
for req_id in item[1]
1818+
]
1819+
if drop_outputs:
1820+
self.scheduler.put_results(drop_outputs)
1821+
self.llm_logger.info(
1822+
"Decode recycled scheduler ghost(s) via P-side drop signal: "
1823+
f"{[r.request_id for r in drop_outputs]}"
1824+
)
1825+
continue
1826+
17541827
tasks = item[1]
17551828
if isinstance(tasks[0], Request):
17561829
self.llm_logger.debug(
@@ -1815,9 +1888,17 @@ def _process_prefilled_requests():
18151888
nonlocal prefilled_request_ouputs
18161889
ready_request_outputs = []
18171890
waiting_request_outputs = []
1891+
ghost_request_outputs = []
18181892

18191893
for req_output in prefilled_request_ouputs:
1820-
if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_output.request_id):
1894+
req_id = req_output.request_id
1895+
if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_id):
1896+
if (
1897+
req_id in self.resource_manager.waiting_abort_req_id_set
1898+
or req_id in self.resource_manager.to_be_aborted_req_id_set
1899+
):
1900+
ghost_request_outputs.append(req_output)
1901+
continue
18211902
# ensure the api_server and scheduler in decode have
18221903
# received the request sent by the client
18231904
waiting_request_outputs.append(req_output)
@@ -1828,6 +1909,22 @@ def _process_prefilled_requests():
18281909
self.llm_logger.debug(f"there are enough resource for prefilled request: {req_output.request_id}")
18291910

18301911
prefilled_request_ouputs = waiting_request_outputs
1912+
1913+
for req_output in ghost_request_outputs:
1914+
req_id = req_output.request_id
1915+
self.llm_logger.warning(
1916+
f"Pause drain: reaping prefilled-output ghost {req_id} "
1917+
"(scheduler never registered, marked for abort -- breaks deadlock)"
1918+
)
1919+
try:
1920+
self.resource_manager.pre_recycle_resource(req_id)
1921+
except Exception as e:
1922+
self.llm_logger.warning(f"pre_recycle_resource({req_id}) failed: {e}")
1923+
self.resource_manager.waiting_abort_req_id_set.discard(req_id)
1924+
self.resource_manager.to_be_aborted_req_id_set.discard(req_id)
1925+
if req_id in self.token_processor.tokens_counter:
1926+
del self.token_processor.tokens_counter[req_id]
1927+
18311928
if self.cfg.splitwise_version == "v1":
18321929
# decode return first token to client
18331930
self.scheduler.put_results(ready_request_outputs)

fastdeploy/splitwise/splitwise_connector.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,27 @@ def send_first_token(self, prefill_msg, tasks_list):
236236
)
237237
self._send_message(addr, "decode", tasks_list)
238238

239+
def send_drop_signal(self, request_id: str, disaggregate_info: dict):
240+
"""
241+
Notify the decode side that this prefill request has been dropped
242+
(e.g. paused gate rejected it on P). The decode side should recycle
243+
its scheduler entry for this request_id, otherwise it would sit
244+
there forever as a ghost and pause/abort drain would hang.
245+
"""
246+
if not disaggregate_info:
247+
return
248+
decode_ip = disaggregate_info.get("decode_ip")
249+
decode_port = disaggregate_info.get("decode_connector_port")
250+
if not decode_ip or not decode_port:
251+
self.logger.warning(
252+
f"send_drop_signal: missing decode_ip/decode_connector_port in "
253+
f"disaggregate_info for {request_id}; skip"
254+
)
255+
return
256+
addr = f"{decode_ip}:{decode_port}"
257+
self.logger.info(f"send_drop_signal: addr={addr}, request_id={request_id}")
258+
self._send_message(addr, "drop", {"request_id": request_id})
259+
239260
def check_decode_allocated(self, task):
240261
"""Check whether the requests have been allocated resources in decode."""
241262
self.logger.debug(f"check_decode_allocated: {task.request_id}")
@@ -382,6 +403,8 @@ def _process_message(self, frames: List[bytes]):
382403
self._handle_prefill(payload)
383404
elif msg_type == "decode":
384405
self._handle_decode(payload)
406+
elif msg_type == "drop":
407+
self._handle_drop(payload)
385408
elif msg_type == "cache_sync":
386409
for task in payload:
387410
self.logger.info(f"_process_message: cache_sync task: {task}")
@@ -412,3 +435,15 @@ def _handle_decode(self, payload):
412435
for task in payload:
413436
tasks.append(RequestOutput.from_dict(task))
414437
self.engine_worker_queue.put_disaggregated_tasks(("decode", tasks))
438+
439+
def _handle_drop(self, payload):
440+
"""
441+
Handle drop signal from prefill: forward to engine worker queue so the
442+
decode engine main loop can recycle the corresponding scheduler entry.
443+
"""
444+
request_id = payload.get("request_id") if isinstance(payload, dict) else None
445+
if not request_id:
446+
self.logger.warning(f"_handle_drop: invalid payload {payload}")
447+
return
448+
self.logger.info(f"_handle_drop: request_id={request_id}")
449+
self.engine_worker_queue.put_disaggregated_tasks(("decode_drop", [request_id]))

0 commit comments

Comments
 (0)