-
-
Notifications
You must be signed in to change notification settings - Fork 13.1k
[Feature] add session based streaming input support to v1 #28973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run You ask your reviewers to trigger select CI tests on top of Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add If you have any questions, please reach out to us on Slack at https://slack.vllm.ai. 🚀 |
|
This pull request has merge conflicts that must be resolved before it can be |
ed44071 to
3651e29
Compare
Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
a52700d to
17cbbec
Compare
Signed-off-by: Joshua Deng <[email protected]>
…sion to true Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
Signed-off-by: Joshua Deng <[email protected]>
| RequestStatus.FINISHED_LENGTH_CAPPED: FinishReason.LENGTH, | ||
| RequestStatus.FINISHED_ABORTED: FinishReason.ABORT, | ||
| RequestStatus.FINISHED_IGNORED: FinishReason.LENGTH, | ||
| RequestStatus.WAITING_FOR_STREAMING_REQ: FinishReason.STOP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we count RequestStatus.WAITING_FOR_STREAMING_REQ as a finished state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to provide a finish reason when creating EngineCoreOutput, otherwise it would be none. the client will expect a finish reason to know generation stopped (temporarily for streaming), so we can consume the output chunk, and it won't hang.
vllm/v1/engine/output_processor.py
Outdated
| def _update_streaming_request_state( | ||
| self, request: EngineCoreRequest, prompt: str | None | ||
| ) -> None: | ||
| req_state = self.request_states[request.request_id] | ||
| if req_state.prompt and prompt: | ||
| req_state.prompt += prompt | ||
| if req_state.prompt_token_ids is not None and request.prompt_token_ids: | ||
| req_state.prompt_token_ids.extend(request.prompt_token_ids) | ||
| req_state.prompt_embeds = request.prompt_embeds | ||
| if req_state.stats is not None: | ||
| req_state.stats.arrival_time = request.arrival_time | ||
| req_state.is_prefilling = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to raise error if we are updating a non-streaming/close_session=True request.
vllm/v1/engine/processor.py
Outdated
| data_parallel_rank=data_parallel_rank, | ||
| close_streaming_session=True, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-streaming requests, do we default close_streaming_session to None or True? I personally prefer True and not distinguishing between streaming and non-streaming requests. We can view a non-streaming request as a request that a user has finished their input in the first round.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now it's defaulted to None, per Patrick's comment, but i'll change it to True
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I think None is better is because it cleanly declares close_streaming_session as irrelevant for non-streaming use case (which it is I think). We can see non-streaming requests as "single-round" streaming requests, but I think we don't do this really at the moment (for example because we're abstracting the scheduler into a specialized streaming_scheduler design that should never be used for non-streaming requests)
So streaming requests have to go trough the new streaming_scheduler and are not allowed to call certain functionality from the non-streaming scheduler. By declaring the variable close_streaming_session "not used" with None I think we have an easy mechanism to verify that something doesn't accidentally ends up in the wrong path.
For example we can nicely assert everywhere in streaming_scheduler that close_streaming_session is not None (similar for functions that are used for both streaming and non-streaming assert close_streaming_session is None can be useful for statements / functions that are not relevant for streaming.
But obviously no strong opinion and I'm not super familiar with general design logic of vllm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see now that you also proposed to merge the streaming_scheduler here: https://github.com/vllm-project/vllm/pull/28973/files#r2579527152 => if we're able to merge everything into one then yeah agree
vllm/v1/engine/async_llm.py
Outdated
| # For streaming sessions, generator completion is normal | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate a bit for the behavior here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this is needed because let's say for caller side we have this code
output_generator = await generate()
async for request_output in output_generator:
client_handle_output
--> here triggers GeneratorExit
do_more_things
for non-streaming before, in GeneratorExit, it's fine to just do abort(request_id) which will free everything.
Now, for streaming session, we shouldn't abort(request_id) because client hasn't close session yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change wouldn't be needed with the alternative proposed API
vllm/v1/core/sched/scheduler.py
Outdated
| def _handle_non_stopped( | ||
| self, | ||
| request: Request, | ||
| status_before_stop: RequestStatus, | ||
| mark_running_stopped: Callable[[Request], None], | ||
| model_runner_output: ModelRunnerOutput, | ||
| ) -> None: | ||
| pass | ||
|
|
||
| def _handle_finished( | ||
| self, | ||
| finished_req_ids: set[str], | ||
| outputs: dict[int, list[EngineCoreOutput]], | ||
| ) -> None: | ||
| pass | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are these functions for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for our internal use case of streaming, we want to be able to mutate request for sessions that are runnning still, and it's handled by overriding this function _handle_non_stopped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can remove this for now to avoid confusion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah also I think this seems to be a bit model specific and it's weird to have that kind of logic in the shared scheduler.
vllm/v1/engine/__init__.py
Outdated
| priority: int = 0 | ||
|
|
||
| trace_headers: Mapping[str, str] | None = None | ||
| close_streaming_session: bool | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We can maybe call this field something like more_tokens_coming (probably also not the best name but you got the point)? Then we can default this field to False for all requests but only set this to True for requests that have more streaming inputs coming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe re-use resumable that was used in another PR: #25463 and which I think is quite intuitive
| from vllm.v1.request import Request, RequestStatus | ||
|
|
||
|
|
||
| class StreamingScheduler(Scheduler): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difficulty of merging StreamingScheduler and normal scheduler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- for stopped requests (normally we would finish and free request) for streaming we make status WAITING_FOR_STREAMING_REQ
- for streaming requests, we create NewRequestData with _all_token_ids (not just prompt), to provide all tokens for the prompt field to include all past inputs (including decoded outputs). Updated streaming requests will create new entries in InputBatch, so we need the full input history to ensure alignment
these are the main 2 issues, however I think if we leverage the more_tokens_coming field we can gate this logic from non streaming behavior and merge StreamingScheduler into normal scheduler
|
This pull request has merge conflicts that must be resolved before it can be |
| session_request._all_token_ids[-1] | ||
| == session_request._output_token_ids[-1] | ||
| ) | ||
| del session_request._all_token_ids[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also want to highlight this behavior. This removes the last output token (which hasn't been scheduled) from _all_token_ids, as the new request's prompt tokens will replace it. The last output token (unscheduled) thus will not go in the kv cache as it will be replaced by the streaming update request's prompt, but will be returned to the client.
We needed this behavior for our internal use case, but we can handle it another way for oss, i.e. make the new scheduled tokens the last output token + new prompt tokens
|
Thanks @joshuadeng for all of the updates. I've some more in another branch based on top of this one, which should address the remaining concerns I had. Here is a summary:
Other notes:
Please take a look and let me know what you think, in the meantime I'll try to test too. |
|
@joshuadeng I have now tested/fixed the changes, and added a bunch of e2e tests (in that same branch). |
thanks, this is great! will take a closer look and merge your changes if it all looks good |
|
@njhill looks good overall, left a comment in joshuadeng#1 |
|
This pull request has merge conflicts that must be resolved before it can be |
Signed-off-by: Patrick von Platen <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
and don't support prompt_embeds with input streaming for now Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
nick's streaming changes 2
Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshuadeng @patrickvonplaten @zhuohan123 @ErickLuo90 @ywang96 thanks for all of the work / input / reviews of this! And thanks for the patience while iterating.
I've updated the PR summary above with the final design (though we expect there will be follow-on changes, especially w.r.t. handling of generated tokens).
…ct#28973) Signed-off-by: Joshua Deng <[email protected]> Signed-off-by: Patrick von Platen <[email protected]> Signed-off-by: Nick Hill <[email protected]> Signed-off-by: Roger Wang <[email protected]> Co-authored-by: Roger Wang <[email protected]> Co-authored-by: Patrick von Platen <[email protected]> Co-authored-by: Nick Hill <[email protected]> Signed-off-by: 陈建华 <[email protected]>
…ct#28973) Signed-off-by: Joshua Deng <[email protected]> Signed-off-by: Patrick von Platen <[email protected]> Signed-off-by: Nick Hill <[email protected]> Signed-off-by: Roger Wang <[email protected]> Co-authored-by: Roger Wang <[email protected]> Co-authored-by: Patrick von Platen <[email protected]> Co-authored-by: Nick Hill <[email protected]> Signed-off-by: Josephasafg <[email protected]>
…ct#28973) Signed-off-by: Joshua Deng <[email protected]> Signed-off-by: Patrick von Platen <[email protected]> Signed-off-by: Nick Hill <[email protected]> Signed-off-by: Roger Wang <[email protected]> Co-authored-by: Roger Wang <[email protected]> Co-authored-by: Patrick von Platen <[email protected]> Co-authored-by: Nick Hill <[email protected]> Signed-off-by: rayleeku <[email protected]>
…ct#28973) Signed-off-by: Joshua Deng <[email protected]> Signed-off-by: Patrick von Platen <[email protected]> Signed-off-by: Nick Hill <[email protected]> Signed-off-by: Roger Wang <[email protected]> Co-authored-by: Roger Wang <[email protected]> Co-authored-by: Patrick von Platen <[email protected]> Co-authored-by: Nick Hill <[email protected]> Signed-off-by: rayleeku <[email protected]>
…ct#28973) Signed-off-by: Joshua Deng <[email protected]> Signed-off-by: Patrick von Platen <[email protected]> Signed-off-by: Nick Hill <[email protected]> Signed-off-by: Roger Wang <[email protected]> Co-authored-by: Roger Wang <[email protected]> Co-authored-by: Patrick von Platen <[email protected]> Co-authored-by: Nick Hill <[email protected]>
Purpose
Proposal for session-based streaming with sequenced updates with minimal changes to the core scheduler and engine interfaces.
Latest design
AsyncLLM.generatecan now optionally take an async generator that produces a stream ofStreamingInputobjects, instead of a single prompt:The
SamplingParamsfor each input can differ, if omitted will use those provided in the originalgeneratecall.The
StreamingInputchunks are handled internally as separate requests, where the prompt of each request is the cumulative concatenation of all input prompts so far + their corresponding output tokens - excluding the final sampled token from each request. All generated tokens are returned to the caller in the session's output stream.So for streaming inputs
[A1, B1, C1],[A2, B2],[A3, B3]:[A1, B1, C1], generates[D1][A1, B1, C1, A2, B2], generates[C2, D2, E2](D1discarded)[A1, B1, C1, A2, B2, C2, D2], generates[C3, D3](E2discarded)Streamed output tokens would be
D1, C2, D2, E2, C3, D3. Note that we expect to generalize/parameterize the behaviour w.r.t. which output tokens to retain in successive prompts as a follow-on based on use case requirements.Inputs are considered completed for a particular session when the provided async generator exits or is closed/garbage collected.
It is not necessary (but also not required) to wait for all of the outputs corresponding to a particular input chunk prior to sending the next one, they are queued internally.
Certain input types/parameters are not yet supported with streaming input:
prompt_embeds,n > 1,output_kind == FINAL_ONLY, and stop strings.Original PR summary (superseded)
design (please request access):
https://docs.google.com/document/d/16iE0pUsjdlfEdcghiCSLlicKFdvVtxn2aiic3kL9RVY/edit?usp=sharing
High‑level design
streaming_sequence_id = 0for a givenrequest_id.streaming_queue).WAITING_FOR_STREAMING_REQ:streaming_queuein place and the session goes back toWAITINGto decode the next chunk.close_session = True, we mark the session finished withstop_reason = "close_session"and free resources.num_new_tokenscorrectness and KV alignment).Test Plan
pytest tests/v1/streaming/test_streaming_scheduler.pypytest tests/v1/streaming/test_streaming_async_llm.pypytest tests/v1/streaming/test_streaming_gpu_model_runner.pyTest Result
they pass
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.Note
Cursor Bugbot is generating a summary for commit e38b36d. Configure here.
Note
Enables multi-turn, session-based streaming with resumable updates and correct KV/MM alignment across the stack.
StreamingInputand new APIsAsyncLLM.generate_streamingandAsyncLLM.generate_from_stream; updategenerate/add_requestto supportresumableand reuse request queuesRequestwithresumableandstreaming_queueplusStreamingUpdate; addRequestStatus.WAITING_FOR_STREAMING_REQ_update_request_as_session(prunes last unscheduled output, merges mm offsets, updates tokens/params), propagateresumable, exclude waiting-for-streaming from unfinished counts, and buildNewRequestDatavia_make_new_request_datawith copiedprompt_token_idsto avoid aliasingscheduled_new_reqsvia_update_streaming_request(remove fromInputBatch, refresh fields, clearoutput_token_ids)resumableresumableintoEngineCoreOutputWritten by Cursor Bugbot for commit e38b36d. This will update automatically on new commits. Configure here.
Note
Introduces resumable, session-based streaming that queues incremental inputs and preserves KV/MM alignment end-to-end.
StreamingInputand extendAsyncLLM.generateto accept async generators; plumbresumablethroughadd_requestand reuse per-requestRequestOutputCollectorRequestwithresumable,streaming_queue, andStreamingUpdate; addRequestStatus.WAITING_FOR_STREAMING_REQand expose it in__str___update_request_as_session(prunes last unscheduled output, merges MM offsets, updates tokens/params), propagateresumable, exclude waiting-for-streaming from unfinished counts, and buildNewRequestDatavia_make_new_request_datausing a copiedprompt_token_idsto avoid aliasingscheduled_new_reqsvia_update_streaming_request(remove fromInputBatch, refresh fields, clearoutput_token_ids)resumable; propagateresumableinEngineCoreOutputWritten by Cursor Bugbot for commit eba8018. This will update automatically on new commits. Configure here.