diff --git a/benchmark/aiperf/README.md b/benchmark/aiperf/README.md
index 9a3608927..5c39fa78d 100644
--- a/benchmark/aiperf/README.md
+++ b/benchmark/aiperf/README.md
@@ -20,9 +20,9 @@ Instead of manually running AIPerf multiple times with different parameters, you
### Prerequisites
These steps have been tested with Python 3.11.11.
-To use the provided configurations, you need to create accounts at https://build.nvidia.com/ and [Huggingface](https://huggingface.co/).
-* The provided configurations use models hosted at https://build.nvidia.com/, you'll need to create a Personal API Key to access the models.
-* The provided AIperf configurations require the [Meta Llama 3.3 70B Instruct tokenizer](https://huggingface.co/meta-llama/Llama-3.3-70B-Instruct) to calculate token-counts.
+To use the provided configurations, you need to create accounts at and [Huggingface](https://huggingface.co/).
+- The provided configurations use models hosted at , you'll need to create a Personal API Key to access the models.
+- The provided AIperf configurations require the [Meta Llama 3.3 70B Instruct tokenizer](https://huggingface.co/meta-llama/Llama-3.3-70B-Instruct) to calculate token-counts.
1. **Create a virtual environment in which to install AIPerf**
@@ -37,13 +37,13 @@ To use the provided configurations, you need to create accounts at https://build
$ pip install aiperf huggingface_hub typer
```
-3. ** Login to Hugging Face:**
+3. **Login to Hugging Face:**
```bash
huggingface-cli login
```
-4. ** Set NVIDIA API Key:**
+4. **Set NVIDIA API Key:**
The provided configs use models hosted on [build.nvidia.com](https://build.nvidia.com/).
To access these, [create an account](https://build.nvidia.com/), and create a Personal API Key.
diff --git a/benchmark/mock_llm_server/api.py b/benchmark/mock_llm_server/api.py
index 1dac1c2ee..6e3a2d85e 100644
--- a/benchmark/mock_llm_server/api.py
+++ b/benchmark/mock_llm_server/api.py
@@ -17,18 +17,24 @@
import asyncio
import logging
import time
-from typing import Annotated, Union
+from typing import Annotated, AsyncGenerator, Union
from fastapi import Depends, FastAPI, HTTPException, Request
+from fastapi.responses import StreamingResponse
from benchmark.mock_llm_server.config import ModelSettings, get_settings
from benchmark.mock_llm_server.models import (
ChatCompletionChoice,
ChatCompletionRequest,
ChatCompletionResponse,
+ ChatCompletionStreamChoice,
+ ChatCompletionStreamResponse,
CompletionChoice,
CompletionRequest,
CompletionResponse,
+ CompletionStreamChoice,
+ CompletionStreamResponse,
+ DeltaMessage,
Message,
Model,
ModelsResponse,
@@ -36,9 +42,11 @@
)
from benchmark.mock_llm_server.response_data import (
calculate_tokens,
+ generate_chunk_latencies,
generate_id,
get_latency_seconds,
get_response,
+ split_response_into_chunks,
)
# Create a console logging handler
@@ -89,7 +97,7 @@ async def log_http_duration(request: Request, call_next):
response_time = time.time()
duration_seconds = response_time - request_time
- log.info(
+ log.debug(
"Request finished: %s, took %.3f seconds",
response.status_code,
duration_seconds,
@@ -120,8 +128,149 @@ async def list_models(config: ModelSettingsDep):
return response
-@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
-async def chat_completions(request: ChatCompletionRequest, config: ModelSettingsDep) -> ChatCompletionResponse:
+async def stream_chat_completion(
+ completion_id: str,
+ model: str,
+ response_content: str,
+ config: ModelSettings,
+ n_choices: int = 1,
+) -> AsyncGenerator[str, None]:
+ """Generate Server-Sent Events for streaming chat completions.
+
+ Args:
+ completion_id: Unique ID for this completion
+ model: Model name
+ response_content: Full response text to stream
+ config: Model settings for latency configuration
+ n_choices: Number of choices to generate
+ """
+ created_timestamp = int(time.time())
+ chunks = split_response_into_chunks(response_content)
+ latencies = generate_chunk_latencies(config, len(chunks))
+
+ # First chunk with role
+ for i in range(n_choices):
+ first_response = ChatCompletionStreamResponse(
+ id=completion_id,
+ object="chat.completion.chunk",
+ created=created_timestamp,
+ model=model,
+ choices=[
+ ChatCompletionStreamChoice(
+ index=i,
+ delta=DeltaMessage(role="assistant", content=""),
+ finish_reason=None,
+ )
+ ],
+ )
+ yield f"data: {first_response.model_dump_json(exclude_none=True)}\n\n"
+
+ # Stream content chunks
+ for chunk_idx, chunk in enumerate(chunks):
+ await asyncio.sleep(latencies[chunk_idx])
+
+ for i in range(n_choices):
+ chunk_response = ChatCompletionStreamResponse(
+ id=completion_id,
+ object="chat.completion.chunk",
+ created=created_timestamp,
+ model=model,
+ choices=[
+ ChatCompletionStreamChoice(
+ index=i,
+ delta=DeltaMessage(content=chunk),
+ finish_reason=None,
+ )
+ ],
+ )
+ yield f"data: {chunk_response.model_dump_json(exclude_none=True)}\n\n"
+
+ # Final chunk with finish_reason
+ for i in range(n_choices):
+ final_response = ChatCompletionStreamResponse(
+ id=completion_id,
+ object="chat.completion.chunk",
+ created=created_timestamp,
+ model=model,
+ choices=[
+ ChatCompletionStreamChoice(
+ index=i,
+ delta=DeltaMessage(),
+ finish_reason="stop",
+ )
+ ],
+ )
+ yield f"data: {final_response.model_dump_json(exclude_none=True)}\n\n"
+
+ yield "data: [DONE]\n\n"
+
+
+async def stream_completion(
+ completion_id: str,
+ model: str,
+ response_text: str,
+ config: ModelSettings,
+ n: int = 1,
+) -> AsyncGenerator[str, None]:
+ """Generate Server-Sent Events for streaming text completions.
+
+ Args:
+ completion_id: Unique ID for this completion
+ model: Model name
+ response_text: Full response text to stream
+ config: Model settings for latency configuration
+ n: Number of choices to generate
+ """
+ created_timestamp = int(time.time())
+ chunks = split_response_into_chunks(response_text)
+ latencies = generate_chunk_latencies(config, len(chunks))
+
+ # Stream content chunks
+ for chunk_idx, chunk in enumerate(chunks):
+ await asyncio.sleep(latencies[chunk_idx])
+
+ for i in range(n):
+ chunk_response = CompletionStreamResponse(
+ id=completion_id,
+ object="text_completion",
+ created=created_timestamp,
+ model=model,
+ choices=[
+ CompletionStreamChoice(
+ text=chunk,
+ index=i,
+ logprobs=None,
+ finish_reason=None,
+ )
+ ],
+ )
+ yield f"data: {chunk_response.model_dump_json(exclude_none=True)}\n\n"
+
+ # Final chunk with finish_reason
+ for i in range(n):
+ final_response = CompletionStreamResponse(
+ id=completion_id,
+ object="text_completion",
+ created=created_timestamp,
+ model=model,
+ choices=[
+ CompletionStreamChoice(
+ text="",
+ index=i,
+ logprobs=None,
+ finish_reason="stop",
+ )
+ ],
+ )
+ yield f"data: {final_response.model_dump_json(exclude_none=True)}\n\n"
+
+ yield "data: [DONE]\n\n"
+
+
+@app.post("/v1/chat/completions", response_model=None)
+async def chat_completions(
+ request: ChatCompletionRequest, config: ModelSettingsDep
+) -> Union[ChatCompletionResponse, StreamingResponse]:
"""Create a chat completion."""
log.debug("/v1/chat/completions request: %s", request)
@@ -131,6 +280,23 @@ async def chat_completions(request: ChatCompletionRequest, config: ModelSettings
# Generate dummy response
response_content = get_response(config)
+ completion_id = generate_id("chatcmpl")
+
+ # Handle streaming response
+ if request.stream:
+ log.debug("/v1/chat/completions streaming response for id: %s", completion_id)
+ return StreamingResponse(
+ stream_chat_completion(
+ completion_id=completion_id,
+ model=request.model,
+ response_content=response_content,
+ config=config,
+ n_choices=request.n or 1,
+ ),
+ media_type="text/event-stream",
+ )
+
+ # Non-streaming response
response_latency_seconds = get_latency_seconds(config)
# Calculate token usage
@@ -139,7 +305,6 @@ async def chat_completions(request: ChatCompletionRequest, config: ModelSettings
completion_tokens = calculate_tokens(response_content)
# Create response
- completion_id = generate_id("chatcmpl")
created_timestamp = int(time.time())
choices = []
@@ -168,8 +333,10 @@ async def chat_completions(request: ChatCompletionRequest, config: ModelSettings
return response
-@app.post("/v1/completions", response_model=CompletionResponse)
-async def completions(request: CompletionRequest, config: ModelSettingsDep) -> CompletionResponse:
+@app.post("/v1/completions", response_model=None)
+async def completions(
+ request: CompletionRequest, config: ModelSettingsDep
+) -> Union[CompletionResponse, StreamingResponse]:
"""Create a text completion."""
log.debug("/v1/completions request: %s", request)
@@ -185,6 +352,23 @@ async def completions(request: CompletionRequest, config: ModelSettingsDep) -> C
# Generate dummy response
response_text = get_response(config)
+ completion_id = generate_id("cmpl")
+
+ # Handle streaming response
+ if request.stream:
+ log.debug("/v1/completions streaming response for id: %s", completion_id)
+ return StreamingResponse(
+ stream_completion(
+ completion_id=completion_id,
+ model=request.model,
+ response_text=response_text,
+ config=config,
+ n=request.n or 1,
+ ),
+ media_type="text/event-stream",
+ )
+
+ # Non-streaming response
response_latency_seconds = get_latency_seconds(config)
# Calculate token usage
@@ -192,7 +376,6 @@ async def completions(request: CompletionRequest, config: ModelSettingsDep) -> C
completion_tokens = calculate_tokens(response_text)
# Create response
- completion_id = generate_id("cmpl")
created_timestamp = int(time.time())
choices = []
diff --git a/benchmark/mock_llm_server/config.py b/benchmark/mock_llm_server/config.py
index 0bb60edc6..2aa5e36bc 100644
--- a/benchmark/mock_llm_server/config.py
+++ b/benchmark/mock_llm_server/config.py
@@ -37,10 +37,24 @@ class ModelSettings(BaseSettings):
# Config with default values
# Latency sampled from a truncated-normal distribution.
# Plain Normal distributions have infinite support, and can be negative
- latency_min_seconds: float = Field(default=0.1, description="Minimum latency in seconds")
- latency_max_seconds: float = Field(default=5, description="Maximum latency in seconds")
- latency_mean_seconds: float = Field(default=0.5, description="The average response time in seconds")
- latency_std_seconds: float = Field(default=0.1, description="Standard deviation of response time")
+ e2e_latency_min_seconds: float = Field(default=0.1, description="Minimum latency in seconds")
+ e2e_latency_max_seconds: float = Field(default=5, description="Maximum latency in seconds")
+ e2e_latency_mean_seconds: float = Field(default=0.5, description="The average response time in seconds")
+ e2e_latency_std_seconds: float = Field(default=0.1, description="Standard deviation of response time")
+
+ # Streaming latency: Time to First Token (TTFT)
+ # https://docs.nvidia.com/nim/benchmarking/llm/latest/metrics.html#time-to-first-token-ttft
+ ttft_min_seconds: float = Field(default=0.05, description="Minimum TTFT in seconds")
+ ttft_max_seconds: float = Field(default=0.5, description="Maximum TTFT in seconds")
+ ttft_mean_seconds: float = Field(default=0.1, description="Average TTFT in seconds")
+ ttft_std_seconds: float = Field(default=0.02, description="Standard deviation of TTFT")
+
+ # Streaming latency: Chunk Latency
+ # https://docs.nvidia.com/nim/benchmarking/llm/latest/metrics.html#inter-token-latency-itl
+ chunk_latency_min_seconds: float = Field(default=0.01, description="Minimum chunk latency in seconds")
+ chunk_latency_max_seconds: float = Field(default=0.1, description="Maximum chunk latency in seconds")
+ chunk_latency_mean_seconds: float = Field(default=0.03, description="Average chunk latency in seconds")
+ chunk_latency_std_seconds: float = Field(default=0.01, description="Standard deviation of chunk latency")
model_config = SettingsConfigDict(env_file=CONFIG_FILE)
diff --git a/benchmark/mock_llm_server/configs/meta-llama-3.3-70b-instruct.env b/benchmark/mock_llm_server/configs/meta-llama-3.3-70b-instruct.env
index d1cc35477..0cec095b3 100644
--- a/benchmark/mock_llm_server/configs/meta-llama-3.3-70b-instruct.env
+++ b/benchmark/mock_llm_server/configs/meta-llama-3.3-70b-instruct.env
@@ -2,7 +2,18 @@ MODEL="meta/llama-3.3-70b-instruct"
UNSAFE_PROBABILITY=0.0
UNSAFE_TEXT="I can't help with that. Is there anything else I can assist you with?"
SAFE_TEXT="I can provide information and help with a wide range of topics, from science and history to entertainment and culture. I can also help with language-related tasks, such as translation and text summarization. However, I can't assist with requests that involve harm or illegal activities."
-LATENCY_MIN_SECONDS=4.0
-LATENCY_MAX_SECONDS=4.0
-LATENCY_MEAN_SECONDS=4.0
-LATENCY_STD_SECONDS=0.0
+# End-to-end latency
+E2E_LATENCY_MIN_SECONDS=4.0
+E2E_LATENCY_MAX_SECONDS=4.0
+E2E_LATENCY_MEAN_SECONDS=4.0
+E2E_LATENCY_STD_SECONDS=0.0
+# Streaming latency: Time to First Token (TTFT)
+TTFT_MIN_SECONDS=0.3
+TTFT_MAX_SECONDS=0.3
+TTFT_MEAN_SECONDS=0.3
+TTFT_STD_SECONDS=0.0
+# Streaming latency: Chunk Latency (ITL)
+CHUNK_LATENCY_MIN_SECONDS=0.015
+CHUNK_LATENCY_MAX_SECONDS=0.015
+CHUNK_LATENCY_MEAN_SECONDS=0.015
+CHUNK_LATENCY_STD_SECONDS=0.0
diff --git a/benchmark/mock_llm_server/configs/nvidia-llama-3.1-nemoguard-8b-content-safety.env b/benchmark/mock_llm_server/configs/nvidia-llama-3.1-nemoguard-8b-content-safety.env
index 2ae46d578..8d11bbaa7 100644
--- a/benchmark/mock_llm_server/configs/nvidia-llama-3.1-nemoguard-8b-content-safety.env
+++ b/benchmark/mock_llm_server/configs/nvidia-llama-3.1-nemoguard-8b-content-safety.env
@@ -2,7 +2,18 @@ MODEL="nvidia/llama-3.1-nemoguard-8b-content-safety"
UNSAFE_PROBABILITY=0.03
UNSAFE_TEXT="{\"User Safety\": \"unsafe\", \"Response Safety\": \"unsafe\", \"Safety Categories\": \"Violence, Criminal Planning/Confessions\"}"
SAFE_TEXT="{\"User Safety\": \"safe\", \"Response Safety\": \"safe\"}"
-LATENCY_MIN_SECONDS=0.5
-LATENCY_MAX_SECONDS=0.5
-LATENCY_MEAN_SECONDS=0.5
-LATENCY_STD_SECONDS=0.0
+# End-to-end latency
+E2E_LATENCY_MIN_SECONDS=0.5
+E2E_LATENCY_MAX_SECONDS=0.5
+E2E_LATENCY_MEAN_SECONDS=0.5
+E2E_LATENCY_STD_SECONDS=0.0
+# Streaming latency: Time to First Token (TTFT)
+TTFT_MIN_SECONDS=0.2
+TTFT_MAX_SECONDS=0.2
+TTFT_MEAN_SECONDS=0.2
+TTFT_STD_SECONDS=0.0
+# Streaming latency: Chunk Latency (ITL)
+CHUNK_LATENCY_MIN_SECONDS=0.015
+CHUNK_LATENCY_MAX_SECONDS=0.015
+CHUNK_LATENCY_MEAN_SECONDS=0.015
+CHUNK_LATENCY_STD_SECONDS=0.0
diff --git a/benchmark/mock_llm_server/models.py b/benchmark/mock_llm_server/models.py
index 804171a39..ae17c693b 100644
--- a/benchmark/mock_llm_server/models.py
+++ b/benchmark/mock_llm_server/models.py
@@ -78,6 +78,21 @@ class ChatCompletionChoice(BaseModel):
finish_reason: str = Field(..., description="The reason the model stopped generating")
+class DeltaMessage(BaseModel):
+ """Delta message for streaming responses."""
+
+ role: Optional[str] = Field(default=None, description="The role of the message author")
+ content: Optional[str] = Field(default=None, description="The content delta")
+
+
+class ChatCompletionStreamChoice(BaseModel):
+ """Chat completion streaming choice - https://platform.openai.com/docs/api-reference/chat/streaming"""
+
+ index: int = Field(..., description="The index of this choice")
+ delta: DeltaMessage = Field(..., description="The delta message content")
+ finish_reason: Optional[str] = Field(None, description="The reason the model stopped generating")
+
+
class CompletionChoice(BaseModel):
"""Text completion choice."""
@@ -87,6 +102,15 @@ class CompletionChoice(BaseModel):
finish_reason: str = Field(..., description="The reason the model stopped generating")
+class CompletionStreamChoice(BaseModel):
+ """Text completion streaming choice."""
+
+ text: str = Field(..., description="The generated text delta")
+ index: int = Field(..., description="The index of this choice")
+ logprobs: Optional[dict[str, Any]] = Field(None, description="Log probability information")
+ finish_reason: Optional[str] = Field(None, description="The reason the model stopped generating")
+
+
class ChatCompletionResponse(BaseModel):
"""Chat completion response - https://platform.openai.com/docs/api-reference/chat/object"""
@@ -98,6 +122,16 @@ class ChatCompletionResponse(BaseModel):
usage: Usage = Field(..., description="Token usage information")
+class ChatCompletionStreamResponse(BaseModel):
+ """Chat completion streaming response chunk - https://platform.openai.com/docs/api-reference/chat/streaming"""
+
+ id: str = Field(..., description="Unique identifier for the completion")
+ object: str = Field("chat.completion.chunk", description="Object type")
+ created: int = Field(..., description="Unix timestamp when the completion was created")
+ model: str = Field(..., description="The model used for completion")
+ choices: list[ChatCompletionStreamChoice] = Field(..., description="List of completion choices")
+
+
class CompletionResponse(BaseModel):
"""Text completion response. https://platform.openai.com/docs/api-reference/completions/object"""
@@ -109,6 +143,16 @@ class CompletionResponse(BaseModel):
usage: Usage = Field(..., description="Token usage information")
+class CompletionStreamResponse(BaseModel):
+ """Text completion streaming response chunk."""
+
+ id: str = Field(..., description="Unique identifier for the completion")
+ object: str = Field("text_completion", description="Object type")
+ created: int = Field(..., description="Unix timestamp when the completion was created")
+ model: str = Field(..., description="The model used for completion")
+ choices: list[CompletionStreamChoice] = Field(..., description="List of completion choices")
+
+
class Model(BaseModel):
"""Model information."""
diff --git a/benchmark/mock_llm_server/response_data.py b/benchmark/mock_llm_server/response_data.py
index 27f035266..6868c408b 100644
--- a/benchmark/mock_llm_server/response_data.py
+++ b/benchmark/mock_llm_server/response_data.py
@@ -41,20 +41,24 @@ def get_response(config: ModelSettings, seed: Optional[int] = None) -> str:
def get_latency_seconds(config: ModelSettings, seed: Optional[int] = None) -> float:
- """Sample latency for this request using the model's config
- Very inefficient to generate each sample singly rather than in batch
+ """Sample end-to-end latency for this request using the model's config.
+ Very inefficient to generate each sample singly rather than in batch.
"""
if seed:
np.random.seed(seed)
# Sample from the normal distribution using model config
- latency_seconds = np.random.normal(loc=config.latency_mean_seconds, scale=config.latency_std_seconds, size=1)
+ latency_seconds = np.random.normal(
+ loc=config.e2e_latency_mean_seconds,
+ scale=config.e2e_latency_std_seconds,
+ size=1,
+ )
# Truncate distribution's support using min and max config values
latency_seconds = np.clip(
latency_seconds,
- a_min=config.latency_min_seconds,
- a_max=config.latency_max_seconds,
+ a_min=config.e2e_latency_min_seconds,
+ a_max=config.e2e_latency_max_seconds,
)
return float(latency_seconds[0])
@@ -68,3 +72,73 @@ def is_unsafe(config: ModelSettings, seed: Optional[int] = None) -> bool:
refusal = np.random.binomial(n=1, p=config.unsafe_probability, size=1)
return bool(refusal[0])
+
+
+def split_response_into_chunks(response_text: str) -> list[str]:
+ """Split response text by whitespace into chunks for streaming.
+
+ Each word (and any attached punctuation) becomes a separate chunk.
+ Whitespace is preserved by appending a space after each chunk except the last.
+
+ Args:
+ response_text: The full response text to split
+
+ Returns:
+ List of text chunks to stream back
+ """
+ words = response_text.split()
+ if not words:
+ return []
+
+ # Add space after each word except the last to preserve original spacing
+ chunks = [word + " " for word in words[:-1]]
+ chunks.append(words[-1]) # Last word without trailing space
+ return chunks
+
+
+def generate_chunk_latencies(
+ config: ModelSettings,
+ num_chunks: int,
+ seed: Optional[int] = None,
+) -> np.ndarray:
+ """Generate latencies for each streaming chunk.
+
+ Uses TTFT (Time to First Token) for the first chunk and ITL (Inter-Token Latency)
+ for subsequent chunks. Both are sampled from truncated normal distributions.
+
+ Args:
+ config: Model settings containing TTFT and ITL parameters
+ num_chunks: Number of chunks to generate latencies for
+ seed: Optional random seed for reproducibility
+
+ Returns:
+ Numpy array of latencies in seconds, one for each chunk
+ """
+ if num_chunks <= 0:
+ return np.array([])
+
+ if seed:
+ np.random.seed(seed)
+
+ latencies = np.zeros(num_chunks)
+
+ # First chunk uses TTFT
+ ttft = np.random.normal(loc=config.ttft_mean_seconds, scale=config.ttft_std_seconds, size=1)
+ ttft = np.clip(ttft, a_min=config.ttft_min_seconds, a_max=config.ttft_max_seconds)
+ latencies[0] = ttft[0]
+
+ # Remaining chunks use Inter Token Latencies
+ if num_chunks > 1:
+ inter_token_latencies = np.random.normal(
+ loc=config.chunk_latency_mean_seconds,
+ scale=config.chunk_latency_std_seconds,
+ size=num_chunks - 1,
+ )
+ inter_token_latencies = np.clip(
+ inter_token_latencies,
+ a_min=config.chunk_latency_min_seconds,
+ a_max=config.chunk_latency_max_seconds,
+ )
+ latencies[1:] = inter_token_latencies
+
+ return latencies
diff --git a/benchmark/tests/test_mock_api.py b/benchmark/tests/test_mock_api.py
index 96d665dca..bed4c259c 100644
--- a/benchmark/tests/test_mock_api.py
+++ b/benchmark/tests/test_mock_api.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import json
import time
import pytest
@@ -21,17 +22,29 @@
from benchmark.mock_llm_server.api import app
from benchmark.mock_llm_server.config import ModelSettings, get_settings
+UNSAFE_RESPONSE = "I cannot help with that request"
+SAFE_RESPONSE = "This is a safe response, with many words to test streaming where we split on whitespace"
+
def get_test_settings():
return ModelSettings(
model="gpt-3.5-turbo",
- unsafe_probability=0.1,
- unsafe_text="I cannot help with that request",
- safe_text="This is a safe response",
- latency_min_seconds=0,
- latency_max_seconds=0,
- latency_mean_seconds=0,
- latency_std_seconds=0,
+ unsafe_probability=0.0,
+ unsafe_text=UNSAFE_RESPONSE,
+ safe_text=SAFE_RESPONSE,
+ e2e_latency_min_seconds=0,
+ e2e_latency_max_seconds=0,
+ e2e_latency_mean_seconds=0,
+ e2e_latency_std_seconds=0,
+ # Streaming latency settings (set to 0 for fast tests)
+ ttft_min_seconds=0,
+ ttft_max_seconds=0,
+ ttft_mean_seconds=0,
+ ttft_std_seconds=0,
+ chunk_latency_min_seconds=0,
+ chunk_latency_max_seconds=0,
+ chunk_latency_mean_seconds=0,
+ chunk_latency_std_seconds=0,
)
@@ -359,6 +372,66 @@ def test_validate_request_model_invalid(self, client):
assert "gpt-3.5-turbo" in response.json()["detail"]
+def get_safe_only_settings():
+ """Settings with unsafe_probability=0 (always safe response)."""
+ return ModelSettings(
+ model="gpt-3.5-turbo",
+ unsafe_probability=0.0,
+ unsafe_text=UNSAFE_RESPONSE,
+ safe_text=SAFE_RESPONSE,
+ e2e_latency_min_seconds=0,
+ e2e_latency_max_seconds=0,
+ e2e_latency_mean_seconds=0,
+ e2e_latency_std_seconds=0,
+ ttft_min_seconds=0,
+ ttft_max_seconds=0,
+ ttft_mean_seconds=0,
+ ttft_std_seconds=0,
+ chunk_latency_min_seconds=0,
+ chunk_latency_max_seconds=0,
+ chunk_latency_mean_seconds=0,
+ chunk_latency_std_seconds=0,
+ )
+
+
+def get_unsafe_only_settings():
+ """Settings with unsafe_probability=1.0 (always unsafe response)."""
+ return ModelSettings(
+ model="gpt-3.5-turbo",
+ unsafe_probability=1.0,
+ unsafe_text=UNSAFE_RESPONSE,
+ safe_text=SAFE_RESPONSE,
+ e2e_latency_min_seconds=0,
+ e2e_latency_max_seconds=0,
+ e2e_latency_mean_seconds=0,
+ e2e_latency_std_seconds=0,
+ ttft_min_seconds=0,
+ ttft_max_seconds=0,
+ ttft_mean_seconds=0,
+ ttft_std_seconds=0,
+ chunk_latency_min_seconds=0,
+ chunk_latency_max_seconds=0,
+ chunk_latency_mean_seconds=0,
+ chunk_latency_std_seconds=0,
+ )
+
+
+@pytest.fixture
+def safe_client():
+ """Create a test client that always returns safe responses."""
+ app.dependency_overrides[get_settings] = get_safe_only_settings
+ yield TestClient(app)
+ app.dependency_overrides[get_settings] = get_test_settings
+
+
+@pytest.fixture
+def unsafe_client():
+ """Create a test client that always returns unsafe responses."""
+ app.dependency_overrides[get_settings] = get_unsafe_only_settings
+ yield TestClient(app)
+ app.dependency_overrides[get_settings] = get_test_settings
+
+
class TestResponseContent:
"""Test that responses contain expected content."""
@@ -373,7 +446,7 @@ def test_chat_response_content_type(self, client):
content = data["choices"][0]["message"]["content"]
# Should be one of the configured responses
- assert content in ["This is a safe response", "I cannot help with that request"]
+ assert content == SAFE_RESPONSE
def test_completion_response_content_type(self, client):
"""Test that completion response contains expected text."""
@@ -386,4 +459,297 @@ def test_completion_response_content_type(self, client):
text = data["choices"][0]["text"]
# Should be one of the configured responses
- assert text in ["This is a safe response", "I cannot help with that request"]
+ assert text in {SAFE_RESPONSE, UNSAFE_RESPONSE}
+
+ def test_chat_completions_safe_response_when_probability_zero(self, safe_client):
+ """Test that chat completions returns safe response when unsafe_probability=0."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Test"}],
+ }
+ response = safe_client.post("/v1/chat/completions", json=payload)
+ data = response.json()
+
+ content = data["choices"][0]["message"]["content"]
+ assert content == SAFE_RESPONSE
+
+ def test_chat_completions_unsafe_response_when_probability_one(self, unsafe_client):
+ """Test that chat completions returns unsafe response when unsafe_probability=1.0."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Test"}],
+ }
+ response = unsafe_client.post("/v1/chat/completions", json=payload)
+ data = response.json()
+
+ content = data["choices"][0]["message"]["content"]
+ assert content == UNSAFE_RESPONSE
+
+ def test_completions_safe_response_when_probability_zero(self, safe_client):
+ """Test that completions returns safe response when unsafe_probability=0."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Test",
+ }
+ response = safe_client.post("/v1/completions", json=payload)
+ data = response.json()
+
+ text = data["choices"][0]["text"]
+ assert text == SAFE_RESPONSE
+
+ def test_completions_unsafe_response_when_probability_one(self, unsafe_client):
+ """Test that completions returns unsafe response when unsafe_probability=1.0."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Test",
+ }
+ response = unsafe_client.post("/v1/completions", json=payload)
+ data = response.json()
+
+ text = data["choices"][0]["text"]
+ assert text == UNSAFE_RESPONSE
+
+
+class TestChatCompletionsStreaming:
+ """Test the /v1/chat/completions endpoint with streaming."""
+
+ def test_chat_completions_streaming_returns_sse(self, client):
+ """Test that streaming returns Server-Sent Events format."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Hello"}],
+ "stream": True,
+ }
+ response = client.post("/v1/chat/completions", json=payload)
+ assert response.status_code == 200
+ assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
+
+ def test_chat_completions_streaming_chunks_format(self, client):
+ """Test that streaming chunks have correct SSE format."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Hello"}],
+ "stream": True,
+ }
+ response = client.post("/v1/chat/completions", json=payload)
+ content = response.text
+
+ # Each chunk should start with "data: "
+ lines = [line for line in content.split("\n") if line.strip()]
+ for line in lines:
+ assert line.startswith("data: ")
+
+ # Should end with [DONE]
+ assert "data: [DONE]" in content
+
+ def test_chat_completions_streaming_first_chunk_has_role(self, client):
+ """Test that first streaming chunk contains role."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Hello"}],
+ "stream": True,
+ }
+ response = client.post("/v1/chat/completions", json=payload)
+ content = response.text
+
+ # Get first data chunk (skip empty lines)
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+ first_chunk = json.loads(lines[0].replace("data: ", ""))
+
+ assert first_chunk["object"] == "chat.completion.chunk"
+ assert first_chunk["choices"][0]["delta"]["role"] == "assistant"
+ assert first_chunk["choices"][0]["delta"]["content"] == ""
+
+ def test_chat_completions_streaming_content_chunks(self, client):
+ """Test that content chunks only have content field."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Hello"}],
+ "stream": True,
+ }
+ response = client.post("/v1/chat/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+
+ # Skip first chunk (role) and last chunk (finish_reason)
+ content_chunks = lines[1:-1]
+ for line in content_chunks:
+ chunk = json.loads(line.replace("data: ", ""))
+ delta = chunk["choices"][0]["delta"]
+ # Should only have content, no role
+ assert "content" in delta
+ assert "role" not in delta
+
+ def test_chat_completions_streaming_final_chunk(self, client):
+ """Test that final chunk has finish_reason and empty delta."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Hello"}],
+ "stream": True,
+ }
+ response = client.post("/v1/chat/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+ last_chunk = json.loads(lines[-1].replace("data: ", ""))
+
+ assert last_chunk["choices"][0]["finish_reason"] == "stop"
+ assert last_chunk["choices"][0]["delta"] == {}
+
+ def test_chat_completions_streaming_reconstructs_response(self, client):
+ """Test that concatenating chunks reconstructs the full response."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Hello"}],
+ "stream": True,
+ }
+ response = client.post("/v1/chat/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+
+ # Reconstruct content from all chunks
+ full_content = ""
+ for line in lines:
+ chunk = json.loads(line.replace("data: ", ""))
+ delta = chunk["choices"][0]["delta"]
+ if "content" in delta and delta["content"]:
+ full_content += delta["content"]
+
+ # Should be one of the configured responses
+ assert full_content == SAFE_RESPONSE
+
+ def test_chat_completions_streaming_consistent_id(self, client):
+ """Test that all chunks have the same ID."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "messages": [{"role": "user", "content": "Hello"}],
+ "stream": True,
+ }
+ response = client.post("/v1/chat/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+
+ ids = set()
+ for line in lines:
+ chunk = json.loads(line.replace("data: ", ""))
+ ids.add(chunk["id"])
+
+ # All chunks should have the same ID
+ assert len(ids) == 1
+ # ID should have correct prefix
+ assert list(ids)[0].startswith("chatcmpl-")
+
+
+class TestCompletionsStreaming:
+ """Test the /v1/completions endpoint with streaming."""
+
+ def test_completions_streaming_returns_sse(self, client):
+ """Test that streaming returns Server-Sent Events format."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Once upon a time",
+ "stream": True,
+ }
+ response = client.post("/v1/completions", json=payload)
+ assert response.status_code == 200
+ assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
+
+ def test_completions_streaming_chunks_format(self, client):
+ """Test that streaming chunks have correct SSE format."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Test",
+ "stream": True,
+ }
+ response = client.post("/v1/completions", json=payload)
+ content = response.text
+
+ # Each chunk should start with "data: "
+ lines = [line for line in content.split("\n") if line.strip()]
+ for line in lines:
+ assert line.startswith("data: ")
+
+ # Should end with [DONE]
+ assert "data: [DONE]" in content
+
+ def test_completions_streaming_content_chunks(self, client):
+ """Test that content chunks have text field."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Test",
+ "stream": True,
+ }
+ response = client.post("/v1/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+
+ # All chunks except last should have text content
+ content_chunks = lines[:-1]
+ for line in content_chunks:
+ chunk = json.loads(line.replace("data: ", ""))
+ assert chunk["object"] == "text_completion"
+ assert "text" in chunk["choices"][0]
+
+ def test_completions_streaming_final_chunk(self, client):
+ """Test that final chunk has finish_reason."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Test",
+ "stream": True,
+ }
+ response = client.post("/v1/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+ last_chunk = json.loads(lines[-1].replace("data: ", ""))
+
+ assert last_chunk["choices"][0]["finish_reason"] == "stop"
+
+ def test_completions_streaming_reconstructs_response(self, client):
+ """Test that concatenating chunks reconstructs the full response."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Test",
+ "stream": True,
+ }
+ response = client.post("/v1/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+
+ # Reconstruct content from all chunks
+ full_content = ""
+ for line in lines:
+ chunk = json.loads(line.replace("data: ", ""))
+ text = chunk["choices"][0]["text"]
+ if text:
+ full_content += text
+
+ # Should be one of the configured responses
+ assert full_content == SAFE_RESPONSE
+
+ def test_completions_streaming_consistent_id(self, client):
+ """Test that all chunks have the same ID."""
+ payload = {
+ "model": "gpt-3.5-turbo",
+ "prompt": "Test",
+ "stream": True,
+ }
+ response = client.post("/v1/completions", json=payload)
+ content = response.text
+
+ lines = [line for line in content.split("\n") if line.startswith("data: ") and line != "data: [DONE]"]
+
+ ids = set()
+ for line in lines:
+ chunk = json.loads(line.replace("data: ", ""))
+ ids.add(chunk["id"])
+
+ # All chunks should have the same ID
+ assert len(ids) == 1
+ # ID should have correct prefix
+ assert list(ids)[0].startswith("cmpl-")
diff --git a/benchmark/tests/test_mock_config.py b/benchmark/tests/test_mock_config.py
index 4b9ac6231..62343df00 100644
--- a/benchmark/tests/test_mock_config.py
+++ b/benchmark/tests/test_mock_config.py
@@ -32,10 +32,10 @@ def test_app_model_config_with_defaults(self):
)
# Check defaults
assert config.unsafe_probability == 0.1
- assert config.latency_min_seconds == 0.1
- assert config.latency_max_seconds == 5
- assert config.latency_mean_seconds == 0.5
- assert config.latency_std_seconds == 0.1
+ assert config.e2e_latency_min_seconds == 0.1
+ assert config.e2e_latency_max_seconds == 5
+ assert config.e2e_latency_mean_seconds == 0.5
+ assert config.e2e_latency_std_seconds == 0.1
def test_app_model_config_missing_required_field(self):
"""Test that missing required fields raise validation error."""
diff --git a/benchmark/tests/test_mock_response_data.py b/benchmark/tests/test_mock_response_data.py
index cd6704193..688655828 100644
--- a/benchmark/tests/test_mock_response_data.py
+++ b/benchmark/tests/test_mock_response_data.py
@@ -22,10 +22,12 @@
from benchmark.mock_llm_server.config import ModelSettings
from benchmark.mock_llm_server.response_data import (
calculate_tokens,
+ generate_chunk_latencies,
generate_id,
get_latency_seconds,
get_response,
is_unsafe,
+ split_response_into_chunks,
)
@@ -102,10 +104,10 @@ def model_settings() -> ModelSettings:
unsafe_probability=0.5,
unsafe_text="Sorry Dave, I'm afraid I can't do that.",
safe_text="I'm an AI assistant and am happy to help",
- latency_min_seconds=0.2,
- latency_max_seconds=1.0,
- latency_mean_seconds=0.5,
- latency_std_seconds=0.1,
+ e2e_latency_min_seconds=0.2,
+ e2e_latency_max_seconds=1.0,
+ e2e_latency_mean_seconds=0.5,
+ e2e_latency_std_seconds=0.1,
)
return settings
@@ -182,22 +184,22 @@ def test_get_response_unsafe(model_settings: ModelSettings):
def test_get_latency_seconds_mocks_no_seed(mock_clip, mock_normal, mock_seed, model_settings: ModelSettings):
"""Check we call the correct numpy functions (not including seed)"""
- mock_normal.return_value = np.array([model_settings.latency_mean_seconds])
- mock_clip.return_value = np.array([model_settings.latency_max_seconds])
+ mock_normal.return_value = np.array([model_settings.e2e_latency_mean_seconds])
+ mock_clip.return_value = np.array([model_settings.e2e_latency_max_seconds])
result = get_latency_seconds(model_settings)
assert result == mock_clip.return_value
assert mock_seed.call_count == 0
mock_normal.assert_called_once_with(
- loc=model_settings.latency_mean_seconds,
- scale=model_settings.latency_std_seconds,
+ loc=model_settings.e2e_latency_mean_seconds,
+ scale=model_settings.e2e_latency_std_seconds,
size=1,
)
mock_clip.assert_called_once_with(
mock_normal.return_value,
- a_min=model_settings.latency_min_seconds,
- a_max=model_settings.latency_max_seconds,
+ a_min=model_settings.e2e_latency_min_seconds,
+ a_max=model_settings.e2e_latency_max_seconds,
)
@@ -209,20 +211,263 @@ def test_get_latency_seconds_mocks_with_seed(
):
"""Check we call the correct numpy functions (not including seed)"""
- mock_normal.return_value = np.array([model_settings.latency_mean_seconds])
- mock_clip.return_value = np.array([model_settings.latency_max_seconds])
+ mock_normal.return_value = np.array([model_settings.e2e_latency_mean_seconds])
+ mock_clip.return_value = np.array([model_settings.e2e_latency_max_seconds])
result = get_latency_seconds(model_settings, seed=random_seed)
assert result == mock_clip.return_value
mock_seed.assert_called_once_with(random_seed)
mock_normal.assert_called_once_with(
- loc=model_settings.latency_mean_seconds,
- scale=model_settings.latency_std_seconds,
+ loc=model_settings.e2e_latency_mean_seconds,
+ scale=model_settings.e2e_latency_std_seconds,
size=1,
)
mock_clip.assert_called_once_with(
mock_normal.return_value,
- a_min=model_settings.latency_min_seconds,
- a_max=model_settings.latency_max_seconds,
+ a_min=model_settings.e2e_latency_min_seconds,
+ a_max=model_settings.e2e_latency_max_seconds,
)
+
+
+class TestSplitResponseIntoChunks:
+ """Test the split_response_into_chunks function."""
+
+ def test_split_simple_sentence(self):
+ """Test splitting a simple sentence into word chunks."""
+ text = "Hello world"
+ chunks = split_response_into_chunks(text)
+ assert chunks == ["Hello ", "world"]
+
+ def test_split_multiple_words(self):
+ """Test splitting multiple words preserves spacing."""
+ text = "I can help you"
+ chunks = split_response_into_chunks(text)
+ assert chunks == ["I ", "can ", "help ", "you"]
+
+ def test_split_empty_string(self):
+ """Test splitting an empty string returns empty list."""
+ text = ""
+ chunks = split_response_into_chunks(text)
+ assert chunks == []
+
+ def test_split_single_word(self):
+ """Test splitting a single word returns it without trailing space."""
+ text = "Hello"
+ chunks = split_response_into_chunks(text)
+ assert chunks == ["Hello"]
+
+ def test_split_preserves_punctuation(self):
+ """Test that punctuation stays attached to words."""
+ text = "Hello, world!"
+ chunks = split_response_into_chunks(text)
+ assert chunks == ["Hello, ", "world!"]
+
+ def test_split_reconstructs_original(self):
+ """Test that joining chunks reconstructs the original text."""
+ text = "I can provide information and help with a wide range of topics."
+ chunks = split_response_into_chunks(text)
+ reconstructed = "".join(chunks)
+ assert reconstructed == text
+
+ def test_split_whitespace_only(self):
+ """Test splitting whitespace-only string returns empty list."""
+ text = " "
+ chunks = split_response_into_chunks(text)
+ assert chunks == []
+
+
+class TestGenerateChunkLatencies:
+ """Test the generate_chunk_latencies function."""
+
+ @pytest.fixture
+ def streaming_settings(self) -> ModelSettings:
+ """Generate config data with streaming latency settings.
+ Each value should be unique to make sure we pass the correct configs to numpy functions
+ """
+ return ModelSettings(
+ model="test-model",
+ unsafe_text="Unsafe",
+ safe_text="Safe",
+ ttft_min_seconds=0.1,
+ ttft_max_seconds=4.0,
+ ttft_mean_seconds=1.3,
+ ttft_std_seconds=0.96,
+ chunk_latency_min_seconds=0.01,
+ chunk_latency_max_seconds=0.12,
+ chunk_latency_mean_seconds=0.05,
+ chunk_latency_std_seconds=0.02,
+ )
+
+ def test_generate_latencies_zero_chunks(self, streaming_settings: ModelSettings):
+ """Test generating latencies for zero chunks returns empty array."""
+ latencies = generate_chunk_latencies(streaming_settings, 0)
+ assert len(latencies) == 0
+ assert isinstance(latencies, np.ndarray)
+
+ def test_generate_latencies_negative_chunks(self, streaming_settings: ModelSettings):
+ """Test generating latencies for negative chunks returns empty array."""
+ latencies = generate_chunk_latencies(streaming_settings, -1)
+ assert len(latencies) == 0
+
+ @patch("benchmark.mock_llm_server.response_data.np.random.seed")
+ @patch("benchmark.mock_llm_server.response_data.np.random.normal")
+ @patch("benchmark.mock_llm_server.response_data.np.clip")
+ def test_generate_latencies_single_chunk_no_seed(
+ self,
+ mock_clip: MagicMock,
+ mock_normal: MagicMock,
+ mock_seed: MagicMock,
+ streaming_settings: ModelSettings,
+ ):
+ """Test single chunk calls TTFT normal and clip, no seed call."""
+ mock_normal.return_value = np.array([streaming_settings.ttft_mean_seconds])
+ mock_clip.return_value = np.array([streaming_settings.ttft_mean_seconds])
+
+ latencies = generate_chunk_latencies(streaming_settings, 1)
+
+ assert len(latencies) == 1
+ assert mock_seed.call_count == 0
+ mock_normal.assert_called_once_with(
+ loc=streaming_settings.ttft_mean_seconds,
+ scale=streaming_settings.ttft_std_seconds,
+ size=1,
+ )
+ mock_clip.assert_called_once_with(
+ mock_normal.return_value,
+ a_min=streaming_settings.ttft_min_seconds,
+ a_max=streaming_settings.ttft_max_seconds,
+ )
+
+ @patch("benchmark.mock_llm_server.response_data.np.random.seed")
+ @patch("benchmark.mock_llm_server.response_data.np.random.normal")
+ @patch("benchmark.mock_llm_server.response_data.np.clip")
+ def test_generate_latencies_single_chunk_with_seed(
+ self,
+ mock_clip: MagicMock,
+ mock_normal: MagicMock,
+ mock_seed: MagicMock,
+ streaming_settings: ModelSettings,
+ ):
+ """Test single chunk with seed calls np.random.seed."""
+ mock_normal.return_value = np.array([streaming_settings.ttft_mean_seconds])
+ mock_clip.return_value = np.array([streaming_settings.ttft_min_seconds])
+ seed_value = 42
+
+ latencies = generate_chunk_latencies(streaming_settings, 1, seed=seed_value)
+
+ assert len(latencies) == 1
+ mock_seed.assert_called_once_with(seed_value)
+ mock_normal.assert_called_once_with(
+ loc=streaming_settings.ttft_mean_seconds,
+ scale=streaming_settings.ttft_std_seconds,
+ size=1,
+ )
+ mock_clip.assert_called_once_with(
+ mock_normal.return_value,
+ a_min=streaming_settings.ttft_min_seconds,
+ a_max=streaming_settings.ttft_max_seconds,
+ )
+
+ @patch("benchmark.mock_llm_server.response_data.np.random.seed")
+ @patch("benchmark.mock_llm_server.response_data.np.random.normal")
+ @patch("benchmark.mock_llm_server.response_data.np.clip")
+ def test_generate_latencies_multiple_chunks_no_seed(
+ self,
+ mock_clip: MagicMock,
+ mock_normal: MagicMock,
+ mock_seed: MagicMock,
+ streaming_settings: ModelSettings,
+ ):
+ """Test multiple chunks calls TTFT then ITL normal and clip."""
+ num_chunks = 5
+ ttft_value = np.array([streaming_settings.ttft_mean_seconds])
+ chunk_values = np.array([0.01, 0.02, 0.03, 0.04])
+
+ # First call returns TTFT, second call returns ITL values
+ mock_normal.side_effect = [ttft_value, chunk_values]
+ mock_clip.side_effect = [ttft_value, chunk_values]
+
+ latencies = generate_chunk_latencies(streaming_settings, num_chunks)
+
+ assert len(latencies) == num_chunks
+ assert mock_seed.call_count == 0
+ assert mock_normal.call_count == 2
+ assert mock_clip.call_count == 2
+
+ # Check the TTFT Normal distribution
+ ttft_normal_call_args, ttft_normal_call_kwargs = mock_normal.call_args_list[0]
+ assert ttft_normal_call_args == () # All arguments are passed as kwargs, so args list is empty
+ assert ttft_normal_call_kwargs["loc"] == streaming_settings.ttft_mean_seconds
+ assert ttft_normal_call_kwargs["scale"] == streaming_settings.ttft_std_seconds
+ assert ttft_normal_call_kwargs["size"] == 1
+
+ # Check the ITL Normal distribution call (for all but the first chunk)
+ chunk_normal_call_args, chunk_normal_call_kwargs = mock_normal.call_args_list[1]
+ assert chunk_normal_call_args == () # All arguments are passed as kwargs, so args list is empty
+ assert chunk_normal_call_kwargs["loc"] == streaming_settings.chunk_latency_mean_seconds
+ assert chunk_normal_call_kwargs["scale"] == streaming_settings.chunk_latency_std_seconds
+ assert chunk_normal_call_kwargs["size"] == num_chunks - 1
+
+ # Check TTFT clip calls
+ ttft_clip_call_args, ttft_clip_call_kwargs = mock_clip.call_args_list[0]
+ assert ttft_clip_call_args[0] == ttft_value
+ assert ttft_clip_call_kwargs["a_max"] == streaming_settings.ttft_max_seconds
+ assert ttft_clip_call_kwargs["a_min"] == streaming_settings.ttft_min_seconds
+
+ # Check ITL clip calls
+ chunk_clip_call_args, chunk_clip_call_kwargs = mock_clip.call_args_list[1]
+ np.testing.assert_array_equal(chunk_clip_call_args[0], chunk_values)
+ assert chunk_clip_call_kwargs["a_max"] == streaming_settings.chunk_latency_max_seconds
+ assert chunk_clip_call_kwargs["a_min"] == streaming_settings.chunk_latency_min_seconds
+
+ @patch("benchmark.mock_llm_server.response_data.np.random.seed")
+ @patch("benchmark.mock_llm_server.response_data.np.random.normal")
+ @patch("benchmark.mock_llm_server.response_data.np.clip")
+ def test_generate_latencies_multiple_chunks_with_seed(
+ self,
+ mock_clip: MagicMock,
+ mock_normal: MagicMock,
+ mock_seed: MagicMock,
+ streaming_settings: ModelSettings,
+ ):
+ """Test multiple chunks with seed calls np.random.seed once."""
+ num_chunks = 3
+ seed_value = 12345
+ ttft_value = np.array([streaming_settings.ttft_mean_seconds])
+ chunk_values = np.array([streaming_settings.chunk_latency_mean_seconds] * (num_chunks - 1))
+
+ mock_normal.side_effect = [ttft_value, chunk_values]
+ mock_clip.side_effect = [ttft_value, chunk_values]
+
+ latencies = generate_chunk_latencies(streaming_settings, num_chunks, seed=seed_value)
+
+ assert len(latencies) == num_chunks
+ mock_seed.assert_called_once_with(seed_value)
+
+ # Exact call arguments are tested in test_generate_latencies_multiple_chunks_no_seed, no need to retest
+ assert mock_normal.call_count == 2
+ assert mock_clip.call_count == 2
+
+ @patch("benchmark.mock_llm_server.response_data.np.random.seed")
+ @patch("benchmark.mock_llm_server.response_data.np.random.normal")
+ @patch("benchmark.mock_llm_server.response_data.np.clip")
+ def test_generate_latencies_returns_correct_values(
+ self,
+ mock_clip: MagicMock,
+ mock_normal: MagicMock,
+ streaming_settings: ModelSettings,
+ ):
+ """Test that returned latencies contain the clipped values."""
+ num_chunks = 3
+ ttft_clipped = np.array([0.25])
+ chunk_clipped = np.array([0.04, 0.06])
+
+ mock_normal.side_effect = [np.array([0.3]), np.array([0.05, 0.07])]
+ mock_clip.side_effect = [ttft_clipped, chunk_clipped]
+
+ latencies = generate_chunk_latencies(streaming_settings, num_chunks)
+
+ assert len(latencies) == num_chunks
+ assert latencies[0] == ttft_clipped[0]
+ np.testing.assert_array_equal(latencies[1:], chunk_clipped)