Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
c88d2eb
add streaming session support to vllm v1
joshuadeng Nov 19, 2025
74667dd
use request directly and remove WAITING_FOR_SESSION_REQ
joshuadeng Nov 24, 2025
ff49738
update tests and fix bugs
joshuadeng Nov 24, 2025
17cbbec
fix test_request
joshuadeng Nov 24, 2025
aa69c81
fix model runner typing
joshuadeng Nov 24, 2025
2a6be20
remove streaming async llm and output processor and default close ses…
joshuadeng Nov 24, 2025
92d3cd2
add update streaming session in scheduler directly
joshuadeng Nov 24, 2025
5590195
fix mypy
joshuadeng Nov 24, 2025
adc576a
Merge branch 'main' into streaming_support
joshuadeng Nov 24, 2025
ed15928
remove utils EngineCoreProc streaming case
joshuadeng Nov 25, 2025
9a3fe56
refactor close_session to close_streaming_session
joshuadeng Nov 25, 2025
f804ab8
Merge branch 'main' into streaming_support
joshuadeng Nov 25, 2025
7a6c566
clean up old scheduler subclass logic
joshuadeng Nov 25, 2025
23a45e9
merge main
joshuadeng Dec 2, 2025
6fc9692
add streaming changes to input_processor
joshuadeng Dec 2, 2025
61dfd16
remove handle logic in scheduler
joshuadeng Dec 3, 2025
d400f04
refactor close_streaming_session to continue_session
joshuadeng Dec 3, 2025
49392d2
merge streaming scheduler into scheduler
joshuadeng Dec 3, 2025
5ddfe8b
add exception for updating streaming session
joshuadeng Dec 3, 2025
cad769e
make streaming_queue None for non streaming
joshuadeng Dec 3, 2025
403fcdb
Merge branch 'main' into streaming_support
joshuadeng Dec 3, 2025
6f52521
fix mypy typing
joshuadeng Dec 3, 2025
93234c0
fix mypy typing pt2
joshuadeng Dec 3, 2025
2b7fb97
fix closing session logic
joshuadeng Dec 4, 2025
76b45fe
merge main into branch
joshuadeng Dec 4, 2025
03869d9
addresss bugs (concat prompt embeds, OutputProcessor._update_streamin…
joshuadeng Dec 8, 2025
b5c7266
Merge branch 'main' into streaming_support
joshuadeng Dec 8, 2025
13d91c7
refactor continue_session to resumable
joshuadeng Dec 8, 2025
37dfc2f
remove validation for resumable in _update_streaming_request_state (w…
joshuadeng Dec 8, 2025
9d8b98d
handle none values for prompt embeds
joshuadeng Dec 8, 2025
d67b394
Merge branch 'main' into streaming_support
ywang96 Dec 9, 2025
a655026
Merge branch 'main' into streaming_support
ywang96 Dec 10, 2025
a06638d
fix merge conflic
joshuadeng Dec 10, 2025
c19c64a
Merge branch 'main' into streaming_support
joshuadeng Dec 12, 2025
0c21972
fix add request logic for streaming
joshuadeng Dec 12, 2025
26236d8
replace Request with lightweight StreamingUpdate in streaming queue
joshuadeng Dec 12, 2025
e385575
optimize counting requests with WAITING_FOR_STREAMING_REQ
joshuadeng Dec 12, 2025
6b49bab
optimize updating session in update_from_output
joshuadeng Dec 12, 2025
acc9faf
Merge branch 'main' into streaming_support
joshuadeng Dec 18, 2025
e4d6431
Merge branch 'main' into streaming_support
joshuadeng Dec 23, 2025
0e315e4
Merge branch 'main' into streaming_support
patrickvonplaten Dec 23, 2025
b3584e7
merge main into streaming support
joshuadeng Dec 28, 2025
74bfa10
fix async llm streaming test
joshuadeng Dec 28, 2025
cd877e6
add streaming session apis to async llm
joshuadeng Dec 28, 2025
e38b36d
merge main into branch
joshuadeng Jan 12, 2026
3054a89
address cursor comments
joshuadeng Jan 12, 2026
a9931d6
remove streaming generate apis and move logic into generate
joshuadeng Jan 12, 2026
eba8018
update prompt_len rather than recalculate as property field
joshuadeng Jan 13, 2026
c928274
fix finish reason check in process_outputs
joshuadeng Jan 13, 2026
a52fe25
propagate streaming exception in generate
joshuadeng Jan 13, 2026
6a1d08c
fix output processor req state prompt is none case
joshuadeng Jan 13, 2026
3c63985
fix streaming session race condition with pending outputs counter
joshuadeng Jan 13, 2026
584bba6
Merge remote-tracking branch 'origin/main' into streaming_support
njhill Jan 16, 2026
5ca995b
Merge branch 'main' into streaming_support
patrickvonplaten Jan 23, 2026
73a3092
some updates
njhill Jan 16, 2026
65efa39
update existing tests (used claude)
njhill Jan 16, 2026
396dfa0
fixes, add e2e tests
njhill Jan 19, 2026
41ca185
update other tests
njhill Jan 20, 2026
d615086
update behavior to only discard final output token
njhill Jan 23, 2026
134511c
small refactor
njhill Jan 24, 2026
a75aea5
Merge pull request #2 from njhill/streaming_support_nick2
joshuadeng Jan 24, 2026
c652894
Merge branch 'main' into streaming_support
joshuadeng Jan 24, 2026
ec48483
small fix
njhill Jan 24, 2026
3abe7e7
fix duplicate ids in scheduler tests
njhill Jan 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added tests/v1/streaming/__init__.py
Empty file.
206 changes: 206 additions & 0 deletions tests/v1/streaming/test_streaming_async_llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import asyncio
from unittest.mock import AsyncMock, MagicMock

import pytest

from vllm.outputs import RequestOutput
from vllm.sampling_params import RequestOutputKind, SamplingParams
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.engine.output_processor import RequestOutputCollector


@pytest.fixture
def mock_streaming_llm():
"""Create a mock AsyncLLM with mocked dependencies."""
# Create a minimal mock without initializing the full engine
llm = MagicMock(spec=AsyncLLM)

# Mock the essential attributes
llm.vllm_config = MagicMock()
llm.vllm_config.cache_config.kv_sharing_fast_prefill = False
llm.model_config = MagicMock()
llm.model_config.max_model_len = 2048
llm.log_requests = False
llm.errored = False
llm._pause_cond = asyncio.Condition()
llm._paused = False

# Mock methods
llm._run_output_handler = MagicMock()
llm.abort = AsyncMock()

# Use the real generate method from AsyncLLM
llm.generate = AsyncLLM.generate.__get__(llm, AsyncLLM)

return llm


@pytest.mark.asyncio
async def test_generate_normal_flow(mock_streaming_llm):
"""Test normal generation flow with streaming requests."""
request_id = "test_request"
prompt = "Tell me about Paris"
sampling_params = SamplingParams(max_tokens=10)
close_session = False

# Create a mock queue with outputs
queue = RequestOutputCollector(output_kind=RequestOutputKind.FINAL_ONLY)
output1 = RequestOutput(
request_id=request_id,
prompt="Tell me about Paris",
prompt_token_ids=[1, 2, 3],
prompt_logprobs=None,
outputs=[],
finished=False,
)
output2 = RequestOutput(
request_id=request_id,
prompt="Tell me about Paris",
prompt_token_ids=[1, 2, 3],
prompt_logprobs=None,
outputs=[],
finished=True,
)

# Feed outputs to queue as they're consumed to avoid aggregation
async def feed_outputs():
queue.put(output1)
await asyncio.sleep(1) # Let first output be consumed
queue.put(output2)

asyncio.create_task(feed_outputs()) # noqa

# Mock add_request to return the queue
async def mock_add_request(*args, **kwargs):
return queue

mock_streaming_llm.add_request = mock_add_request

# Collect outputs from generate
outputs = []
async for output in mock_streaming_llm.generate(
prompt=prompt,
sampling_params=sampling_params,
request_id=request_id,
close_session=close_session,
):
outputs.append(output)

assert len(outputs) == 2
assert outputs[0].finished is False
assert outputs[1].finished is True


@pytest.mark.asyncio
async def test_generate_multiple_streaming_requests(mock_streaming_llm):
"""Test session continuation across multiple streaming requests."""
request_id = "session"
prompt1 = "Tell me about Paris"
prompt2 = "Tell me more"
sampling_params = SamplingParams(max_tokens=10)

# First streaming request (sequence_id=0)
queue1 = RequestOutputCollector(output_kind=RequestOutputKind.FINAL_ONLY)
output1 = RequestOutput(
request_id=request_id,
prompt=prompt1,
prompt_token_ids=[1, 2, 3],
prompt_logprobs=None,
outputs=[],
finished=True,
)

queue1.put(output1)

async def mock_add_request1(*args, **kwargs):
return queue1

mock_streaming_llm.add_request = mock_add_request1

# Generate first request
outputs1 = []
async for output in mock_streaming_llm.generate(
prompt=prompt1,
sampling_params=sampling_params,
request_id=request_id,
close_session=False,
):
outputs1.append(output)

assert len(outputs1) == 1
assert outputs1[0].finished is True

# Second streaming request (sequence_id=1)
queue2 = RequestOutputCollector(output_kind=RequestOutputKind.FINAL_ONLY)
output2 = RequestOutput(
request_id=request_id,
prompt=prompt2,
prompt_token_ids=[4, 5],
prompt_logprobs=None,
outputs=[],
finished=True,
)

queue2.put(output2)

async def mock_add_request2(*args, **kwargs):
return queue2

mock_streaming_llm.add_request = mock_add_request2

# Generate second request
outputs2 = []
async for output in mock_streaming_llm.generate(
prompt=prompt2,
sampling_params=sampling_params,
request_id=request_id,
close_session=False,
):
outputs2.append(output)

assert len(outputs2) == 1
assert outputs2[0].finished is True


@pytest.mark.asyncio
async def test_generate_generator_exit(mock_streaming_llm):
"""Test that GeneratorExit is handled gracefully for streaming sessions."""
request_id = "test_request"
prompt = "Test prompt"
sampling_params = SamplingParams(max_tokens=10)

# Create a queue with one output
queue = RequestOutputCollector(output_kind=RequestOutputKind.FINAL_ONLY)
output1 = RequestOutput(
request_id=request_id,
prompt="Test prompt",
prompt_token_ids=[1, 2, 3],
prompt_logprobs=None,
outputs=[],
finished=False,
)

queue.put(output1)

async def mock_add_request(*args, **kwargs):
return queue

mock_streaming_llm.add_request = mock_add_request

# Create generator and close it early (simulates GeneratorExit)
gen = mock_streaming_llm.generate(
prompt=prompt,
sampling_params=sampling_params,
request_id=request_id,
close_session=False,
)

# Get first output then close generator
output = await gen.__anext__()
assert output.finished is False

# Closing the generator should not raise or abort (streaming session continues)
await gen.aclose()
Loading