Skip to content

Commit 7c05120

Browse files
committed
Enable streaming usage metrics for OpenAI-compatible providers
Inject stream_options={"include_usage": True} when streaming and OpenTelemetry telemetry is active. Telemetry always overrides any caller preference to ensure complete and consistent observability metrics. Changes: - Add conditional stream_options injection to OpenAIMixin (benefits OpenAI, Bedrock, Runpod, Together, Fireworks providers) - Add conditional stream_options injection to LiteLLMOpenAIMixin (benefits litellm-based providers that call parent methods) - Add telemetry-gated stream_options injection to WatsonX via helper method (WatsonX bypasses LiteLLMOpenAIMixin by calling litellm.acompletion directly, so it uses _inject_stream_options_for_telemetry helper to avoid code duplication) - Check telemetry status using trace.get_current_span().is_recording() - Override include_usage=False when telemetry active to prevent metric gaps - Unit tests for this functionality - Remove legacy ungated stream_options from Bedrock and Runpod providers (pre-#4127 code that bypassed telemetry gating) Fixes #3981
1 parent ff375f1 commit 7c05120

7 files changed

Lines changed: 435 additions & 44 deletions

File tree

src/llama_stack/providers/remote/inference/bedrock/bedrock.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,7 @@ async def openai_chat_completion(
8181
self,
8282
params: OpenAIChatCompletionRequestWithExtraBody,
8383
) -> OpenAIChatCompletion | AsyncIterator[OpenAIChatCompletionChunk]:
84-
"""Override to enable streaming usage metrics and handle authentication errors."""
85-
# Enable streaming usage metrics when telemetry is active
86-
if params.stream:
87-
if params.stream_options is None:
88-
params.stream_options = {"include_usage": True}
89-
elif "include_usage" not in params.stream_options:
90-
params.stream_options = {**params.stream_options, "include_usage": True}
91-
84+
"""Override to handle authentication errors and null responses."""
9285
try:
9386
logger.debug(f"Calling Bedrock OpenAI API with model={params.model}, stream={params.stream}")
9487
result = await super().openai_chat_completion(params=params)

src/llama_stack/providers/remote/inference/runpod/runpod.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,7 @@
44
# This source code is licensed under the terms described in the LICENSE file in
55
# the root directory of this source tree.
66

7-
from collections.abc import AsyncIterator
8-
97
from llama_stack.providers.utils.inference.openai_mixin import OpenAIMixin
10-
from llama_stack_api import (
11-
OpenAIChatCompletion,
12-
OpenAIChatCompletionChunk,
13-
OpenAIChatCompletionRequestWithExtraBody,
14-
)
158

169
from .config import RunpodImplConfig
1710

@@ -29,15 +22,3 @@ class RunpodInferenceAdapter(OpenAIMixin):
2922
def get_base_url(self) -> str:
3023
"""Get base URL for OpenAI client."""
3124
return str(self.config.base_url)
32-
33-
async def openai_chat_completion(
34-
self,
35-
params: OpenAIChatCompletionRequestWithExtraBody,
36-
) -> OpenAIChatCompletion | AsyncIterator[OpenAIChatCompletionChunk]:
37-
"""Override to add RunPod-specific stream_options requirement."""
38-
params = params.model_copy()
39-
40-
if params.stream and not params.stream_options:
41-
params.stream_options = {"include_usage": True}
42-
43-
return await super().openai_chat_completion(params)

src/llama_stack/providers/remote/inference/watsonx/watsonx.py

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,40 @@ def __init__(self, config: WatsonXConfig):
4747
openai_compat_api_base=self.get_base_url(),
4848
)
4949

50+
def _inject_stream_options_for_telemetry(
51+
self,
52+
stream_options: dict | None,
53+
is_streaming: bool,
54+
) -> dict | None:
55+
"""
56+
Inject stream_options when streaming and telemetry is active.
57+
58+
Active telemetry takes precedence over caller preference to ensure
59+
complete and consistent observability metrics.
60+
61+
Args:
62+
stream_options: Original stream_options from params
63+
is_streaming: Whether this is a streaming request
64+
65+
Returns:
66+
Modified stream_options with include_usage=True if telemetry active,
67+
otherwise returns original stream_options unchanged
68+
"""
69+
if not is_streaming:
70+
return stream_options
71+
72+
from opentelemetry import trace
73+
74+
span = trace.get_current_span()
75+
if not span or not span.is_recording():
76+
return stream_options
77+
78+
# Telemetry is active - inject include_usage
79+
if stream_options is None:
80+
return {"include_usage": True}
81+
else:
82+
return {**stream_options, "include_usage": True}
83+
5084
async def openai_chat_completion(
5185
self,
5286
params: OpenAIChatCompletionRequestWithExtraBody,
@@ -55,14 +89,11 @@ async def openai_chat_completion(
5589
Override parent method to add timeout and inject usage object when missing.
5690
This works around a LiteLLM defect where usage block is sometimes dropped.
5791
"""
58-
59-
# Add usage tracking for streaming when telemetry is active
60-
stream_options = params.stream_options
61-
if params.stream:
62-
if stream_options is None:
63-
stream_options = {"include_usage": True}
64-
elif "include_usage" not in stream_options:
65-
stream_options = {**stream_options, "include_usage": True}
92+
# Inject stream_options when streaming and telemetry is active
93+
stream_options = self._inject_stream_options_for_telemetry(
94+
params.stream_options,
95+
params.stream,
96+
)
6697

6798
model_obj = await self.model_store.get_model(params.model)
6899

@@ -183,6 +214,12 @@ async def openai_completion(
183214
"""
184215
from llama_stack.providers.utils.inference.openai_compat import prepare_openai_completion_params
185216

217+
# Inject stream_options when streaming and telemetry is active
218+
stream_options = self._inject_stream_options_for_telemetry(
219+
params.stream_options,
220+
params.stream,
221+
)
222+
186223
model_obj = await self.model_store.get_model(params.model)
187224

188225
request_params = await prepare_openai_completion_params(
@@ -199,7 +236,7 @@ async def openai_completion(
199236
seed=params.seed,
200237
stop=params.stop,
201238
stream=params.stream,
202-
stream_options=params.stream_options,
239+
stream_options=stream_options,
203240
temperature=params.temperature,
204241
top_p=params.top_p,
205242
user=params.user,

src/llama_stack/providers/utils/inference/litellm_openai_mixin.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,20 @@ async def openai_completion(
179179
self,
180180
params: OpenAICompletionRequestWithExtraBody,
181181
) -> OpenAICompletion:
182+
# Inject stream_options when streaming and telemetry is active
183+
stream_options = params.stream_options
184+
if params.stream:
185+
from opentelemetry import trace
186+
187+
span = trace.get_current_span()
188+
if span and span.is_recording():
189+
if stream_options is None:
190+
stream_options = {"include_usage": True}
191+
else:
192+
# Active telemetry takes precedence over caller preference.
193+
# This ensures complete and consistent observability metrics.
194+
stream_options = {**stream_options, "include_usage": True}
195+
182196
if not self.model_store:
183197
raise ValueError("Model store is not initialized")
184198

@@ -201,7 +215,7 @@ async def openai_completion(
201215
seed=params.seed,
202216
stop=params.stop,
203217
stream=params.stream,
204-
stream_options=params.stream_options,
218+
stream_options=stream_options,
205219
temperature=params.temperature,
206220
top_p=params.top_p,
207221
user=params.user,
@@ -216,14 +230,19 @@ async def openai_chat_completion(
216230
self,
217231
params: OpenAIChatCompletionRequestWithExtraBody,
218232
) -> OpenAIChatCompletion | AsyncIterator[OpenAIChatCompletionChunk]:
219-
# Add usage tracking for streaming when telemetry is active
220-
233+
# Inject stream_options when streaming and telemetry is active
221234
stream_options = params.stream_options
222235
if params.stream:
223-
if stream_options is None:
224-
stream_options = {"include_usage": True}
225-
elif "include_usage" not in stream_options:
226-
stream_options = {**stream_options, "include_usage": True}
236+
from opentelemetry import trace
237+
238+
span = trace.get_current_span()
239+
if span and span.is_recording():
240+
if stream_options is None:
241+
stream_options = {"include_usage": True}
242+
else:
243+
# Active telemetry takes precedence over caller preference.
244+
# This ensures complete and consistent observability metrics.
245+
stream_options = {**stream_options, "include_usage": True}
227246

228247
if not self.model_store:
229248
raise ValueError("Model store is not initialized")

src/llama_stack/providers/utils/inference/openai_mixin.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,20 @@ async def openai_completion(
271271
"""
272272
Direct OpenAI completion API call.
273273
"""
274+
# Inject stream_options when streaming and telemetry is active
275+
if params.stream:
276+
from opentelemetry import trace
277+
278+
span = trace.get_current_span()
279+
if span and span.is_recording():
280+
params = params.model_copy()
281+
if params.stream_options is None:
282+
params.stream_options = {"include_usage": True}
283+
else:
284+
# Active telemetry takes precedence over caller preference.
285+
# This ensures complete and consistent observability metrics.
286+
params.stream_options = {**params.stream_options, "include_usage": True}
287+
274288
# TODO: fix openai_completion to return type compatible with OpenAI's API response
275289
provider_model_id = await self._get_provider_model_id(params.model)
276290
self._validate_model_allowed(provider_model_id)
@@ -308,6 +322,20 @@ async def openai_chat_completion(
308322
"""
309323
Direct OpenAI chat completion API call.
310324
"""
325+
# Inject stream_options when streaming and telemetry is active
326+
if params.stream:
327+
from opentelemetry import trace
328+
329+
span = trace.get_current_span()
330+
if span and span.is_recording():
331+
params = params.model_copy()
332+
if params.stream_options is None:
333+
params.stream_options = {"include_usage": True}
334+
else:
335+
# Active telemetry takes precedence over caller preference.
336+
# This ensures complete and consistent observability metrics.
337+
params.stream_options = {**params.stream_options, "include_usage": True}
338+
311339
provider_model_id = await self._get_provider_model_id(params.model)
312340
self._validate_model_allowed(provider_model_id)
313341

0 commit comments

Comments
 (0)