Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1593,13 +1593,14 @@ def _control_abort_requests(self, control_req: ControlRequest):
engine_recv_first_token_time=request.metrics.engine_recv_first_token_time if request.metrics else now,
request_start_time=request.metrics.arrival_time if request.metrics else now,
)
eos_token_ids = getattr(request, "eos_token_ids", [0])
Comment thread
qwes5s5 marked this conversation as resolved.
Comment thread
qwes5s5 marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug getattr(request, "eos_token_ids", [0])eos_token_idsNone 时不会返回 fallback 值 [0]

Request 类中 eos_token_ids 定义为 Optional[list[int]] = None(request.py:92/149),该属性始终存在但可能为 Nonegetattr 只在属性不存在时才返回默认值,当属性存在但值为 None 时,getattr 仍然返回 None,后续 eos_token_ids[0] 会触发 TypeError: 'NoneType' object is not subscriptable

建议修复:

eos_token_ids = getattr(request, "eos_token_ids", None) or [0]

或更明确地:

eos_token_ids = request.eos_token_ids if request.eos_token_ids else [0]

result = RequestOutput(
request_id=req_id,
finished=True,
outputs=CompletionOutput(
index=0,
send_idx=len(partial_token_ids),
token_ids=[self.data_processor.eos_token_ids[0]],
token_ids=[eos_token_ids[0]],
),
metrics=abort_metrics,
error_code=200,
Expand Down Expand Up @@ -1643,10 +1644,19 @@ def _wait_abort_complete(self, target_req_ids, stall_timeout=1):
reset progress state if any, then continue monitoring
"""
target_set = set(target_req_ids)
target_set = target_set & (set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()))
prev_remaining_count = len(target_set)
last_progress_time = time.time()
remaining = target_set & self.resource_manager.get_reqs_in_aborting()
Comment thread
qwes5s5 marked this conversation as resolved.
while remaining:
alive_reqs = set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())
finished_reqs = target_set - alive_reqs
if finished_reqs:
self.llm_logger.info(f"abort targets already finished, skip: {finished_reqs}")
for req_id in finished_reqs:
self.resource_manager.waiting_abort_req_id_set.discard(req_id)
self.resource_manager.to_be_aborted_req_id_set.discard(req_id)
target_set -= finished_reqs
remaining = target_set & self.resource_manager.get_reqs_in_aborting()
if not remaining:
self.llm_logger.info(f"all {len(target_set)} abort reqs cleaned")
Expand Down
6 changes: 4 additions & 2 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def recycle_abort_task(self, request_id):
self.stop_flags[request.idx] = True # 设置停止标志
del self.requests[request_id]
del self.req_dict[request_id]
self.to_be_aborted_req_id_set.remove(request_id)
self.to_be_aborted_req_id_set.discard(request_id)
self.update_metrics()

def _trigger_abort(self, request_id, scheduled_reqs):
Expand All @@ -293,7 +293,7 @@ def _trigger_abort(self, request_id, scheduled_reqs):
abort_request.cached_block_num = 0
scheduled_reqs.append(self._prepare_abort_task(abort_request))
self.to_be_aborted_req_id_set.add(request_id)
self.waiting_abort_req_id_set.remove(request_id)
self.waiting_abort_req_id_set.discard(request_id)

def _info_each_block(self):
"""
Expand Down Expand Up @@ -1544,6 +1544,8 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
del self.requests[req_id]
if req_id in self.req_dict:
del self.req_dict[req_id]
self.waiting_abort_req_id_set.discard(req_id)
self.to_be_aborted_req_id_set.discard(req_id)

# Do not block the main thread here
# Write cache to storage if kvcache_storage_backend is enabled
Expand Down
8 changes: 4 additions & 4 deletions fastdeploy/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class ChatCompletionResponseChoice(BaseModel):
logprobs: Optional[LogProbs] = None
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]]
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]]
speculate_metrics: Optional[SpeculateMetrics] = None
Comment thread
qwes5s5 marked this conversation as resolved.


Expand Down Expand Up @@ -335,7 +335,7 @@ class ChatCompletionResponseStreamChoice(BaseModel):
logprobs: Optional[LogProbs] = None
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
arrival_time: Optional[float] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down Expand Up @@ -371,7 +371,7 @@ class CompletionResponseChoice(BaseModel):
draft_logprobs: Optional[CompletionLogprobs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down Expand Up @@ -417,7 +417,7 @@ class CompletionResponseStreamChoice(BaseModel):
prompt_tokens: Optional[str] = None
completion_tokens: Optional[str] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down
4 changes: 4 additions & 0 deletions tests/engine/test_common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3700,6 +3700,8 @@ def test_wait_abort_complete_progress(self):
"""_wait_abort_complete exits when background thread cleans up."""
eng = self._make_abort_engine()
eng.resource_manager.waiting_abort_req_id_set = {"req-1_0"}
# Add the request to requests dict so it won't be filtered out
eng.resource_manager.requests = {"req-1_0": self._make_fake_request()}

call_count = [0]

Expand All @@ -3718,6 +3720,8 @@ def test_wait_abort_complete_force_cleanup_stuck_in_to_be_aborted(self):
"""Stall timeout triggers force cleanup for requests in to_be_aborted_req_id_set."""
eng = self._make_abort_engine()
eng.resource_manager.to_be_aborted_req_id_set = {"req-1_0"}
# Add the request to requests dict so it won't be filtered out
eng.resource_manager.requests = {"req-1_0": self._make_fake_request()}

def mock_recycle(req_id):
eng.resource_manager.to_be_aborted_req_id_set.discard(req_id)
Expand Down
Loading