Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,84 @@ def test_schedule_skip_tokenizer_init_structured_output_request():
assert len(scheduler.waiting) == 1


def test_kv_connector_finished_sending_race_condition():
"""
Test the race condition where a request is in finished_sending
but not actually in a finished state on the scheduler side.

This can happen when:
1. Worker-side NIXL connector times out waiting for decode workers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this timeout should only be started once the request has finished

The timeout is started at the delay_free_blocks spot in ConnectorScheduler.request_finished()

2. Worker reports request in finished_sending to prevent stranding blocks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requests are added to finished_sending by returning them from ConnectorWorker.get_finished() in two cases:

  1. We've received the required number of xfer notifications
  2. The timeout expired

Did you mean one of these? Or some other way that a request is added to finished_sending?

(e.g. are you thinking of some connector other than NIXL, or ...?)

3. Scheduler-side request hasn't reached a finished state yet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decode side is somehow getting these block IDs before the prefill side has finished?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking about this possibility - on the prefill side we get notification from decode that the blocks for this request has been transferred. But the request is still not finished on the prefill side ...

Assumption: it impossible for the decode side to be notifying prefill about a request before prefill returns kv_transfer_params, which happens here:

    def _free_request(self, request: Request) -> dict[str, Any] | None:
	assert request.is_finished()

        delay_free_blocks, kv_xfer_params = self._connector_finished(request)
        ...
        if not delay_free_blocks:
            self._free_blocks(request)

        return kv_xfer_params

If the assumption above is correct, then the scenario looks impossible - the request must be finished before prefill returns kv_transfer_params?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another theory - the request is preempted after it finished and is waiting for KV blocks to fetch? Doesn't seem possible - we only choose requests to preempt from Scheduler.running


Before the fix, this would crash with AssertionError in _free_blocks.
After the fix, it should log a warning and skip block freeing.
"""
from vllm.v1.outputs import KVConnectorOutput

# Setup scheduler with KV connector
scheduler = create_scheduler(
enable_prefix_caching=True,
use_kv_connector=True,
)

# Create and schedule a request
requests = create_requests(num_requests=1, max_tokens=10)
request = requests[0]
scheduler.add_request(request)

# Schedule the request
scheduler_output = scheduler.schedule()
assert len(scheduler_output.scheduled_new_reqs) == 1
assert request.request_id in scheduler.requests

# Simulate model execution - generate one token but DON'T finish
model_runner_output = ModelRunnerOutput(
req_ids=[request.request_id],
req_id_to_index={request.request_id: 0},
sampled_token_ids=[[100]], # One token, not EOS
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
)

# Update with this output - request should still be RUNNING
scheduler.update_from_output(scheduler_output, model_runner_output)
assert request.status == RequestStatus.RUNNING
assert len(scheduler.running) == 1

# Now simulate the race condition: create a KVConnectorOutput that
# reports this request as finished_sending, even though the request
# is still RUNNING on the scheduler side.
# This simulates the timeout scenario in NIXL connector.
kv_connector_output = KVConnectorOutput(finished_sending={request.request_id})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a similar situation to the abort-after-finished race condition in #25067 - I wrote a unit test to artificially simulate it, but it took quite a bit of digging to understand how it could happen and figure out how we wanted to handle it


# Schedule again to trigger the race condition
scheduler_output2 = scheduler.schedule()
model_runner_output2 = ModelRunnerOutput(
req_ids=[request.request_id],
req_id_to_index={request.request_id: 0},
sampled_token_ids=[[101]], # Another token, not EOS
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
kv_connector_output=kv_connector_output,
)

# This should handle the race condition gracefully
# by logging a warning and skipping block freeing.
_ = scheduler.update_from_output(scheduler_output2, model_runner_output2)

# Verify the request is still in the system and running
# (i.e., it was NOT incorrectly freed)
assert request.request_id in scheduler.requests
assert request.status == RequestStatus.RUNNING
assert len(scheduler.running) == 1

# The request should NOT have been freed
assert request.request_id not in scheduler.finished_req_ids


def test_priority_scheduling_preemption_and_resumption_when_out_of_kv():
"""Test that priority scheduling preempts lower priority requests
when out of KV cache space."""
Expand Down
20 changes: 18 additions & 2 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,8 +1365,24 @@ def _update_from_kv_xfer_finished(self, kv_connector_output: KVConnectorOutput):
self.finished_recving_kv_req_ids.add(req_id)
for req_id in kv_connector_output.finished_sending or ():
logger.debug("Finished sending KV transfer for request %s", req_id)
assert req_id in self.requests
self._free_blocks(self.requests[req_id])
request = self.requests.get(req_id)
if request is None:
logger.warning(
"Got finished sending KV transfer for request %s, "
"but the request is already freed.",
req_id,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was deliberately removed by #25067 - it's a bug if it happens, the warning is not actionable by users, at best it should be a "fix this bug!" debug statement

elif not request.is_finished():
logger.warning(
"Got finished sending KV transfer for request %s, "
"but the request is not finished (status=%s). "
"This may indicate the request was aborted or the KV "
"transfer timed out before the request completed.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, not actionable by a user - it's either an expected scenario that we can safely ignore (with no logging) or something that's a bug if it happens

req_id,
request.status,
)
else:
self._free_blocks(request)
Comment on lines 1366 to +1385

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Track premature KV transfers to avoid leaked blocks

When a worker reports finished_sending before the scheduler marks the request as finished, the new branch only logs a warning and returns. No state is recorded that the transfer has already completed. Once the request does eventually finish, _free_request() still calls _connector_finished(), which typically returns delay_free_blocks=True for remote decode, so _free_blocks() is never invoked unless another finished_sending arrives. Since the worker already emitted its only finished_sending event during the timeout, the request is left in self.requests and its KV blocks remain allocated indefinitely, leaking cache space and preventing the scheduler from recycling memory. The handler needs to persist that the transfer already finished (or immediately free when the request later finishes) rather than merely warn.

Useful? React with 👍 / 👎.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question to wonder what will happen if ConnectorScheduler.request_finished() gets called after the request was already reported as finished_sending


def _update_requests_with_invalid_blocks(
self, requests: Iterable[Request], invalid_block_ids: set[int]
Expand Down