Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/4017.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added Azure STT cancellation tracing attributes and session termination guards so canceled recognition sessions surface structured observability data and stop accepting audio as if still healthy.
1 change: 1 addition & 0 deletions changelog/4017.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed Azure STT silent failures by handling recognizer canceled and session lifecycle events and surfacing cancellation errors through `ErrorFrame` propagation.
163 changes: 153 additions & 10 deletions src/pipecat/services/azure/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
from pipecat.utils.tracing.service_decorators import trace_stt_cancellation, traced_stt

try:
from azure.cognitiveservices.speech import (
CancellationReason,
ResultReason,
SpeechConfig,
SpeechRecognizer,
Expand Down Expand Up @@ -156,6 +155,10 @@ def __init__(

self._audio_stream = None
self._speech_recognizer = None
self._audio_sent = False
self._recognition_active = False
self._recognition_terminated = False
self._shutdown_requested = False

def can_generate_metrics(self) -> bool:
"""Check if this service can generate performance metrics.
Expand Down Expand Up @@ -205,7 +208,12 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
try:
await self.start_processing_metrics()
if self._audio_stream:
if self._recognition_terminated and not self._shutdown_requested:
logger.warning("Azure STT recognition terminated, dropping audio chunk")
yield None
return
self._audio_stream.write(audio)
self._audio_sent = True
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
Expand Down Expand Up @@ -242,6 +250,11 @@ async def _connect(self):
if self._audio_stream:
return

self._audio_sent = False
self._recognition_active = False
self._recognition_terminated = False
self._shutdown_requested = False

try:
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
self._audio_stream = PushAudioInputStream(stream_format)
Expand All @@ -254,12 +267,46 @@ async def _connect(self):
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.canceled.connect(self._on_handle_canceled)
self._speech_recognizer.start_continuous_recognition_async()
self._speech_recognizer.session_started.connect(self._on_handle_session_started)
self._speech_recognizer.session_stopped.connect(self._on_handle_session_stopped)

start_future = self._speech_recognizer.start_continuous_recognition_async()
await self.get_event_loop().run_in_executor(None, start_future.get)
except Exception as e:
await self.push_error(
error_msg=f"Uncaught exception during initialization: {e}", exception=e
)

async def stop(self, frame: EndFrame):
"""Stop the speech recognition service.

Cleanly shuts down the Azure speech recognizer and closes audio streams.

Args:
frame: Frame indicating the end of processing.
"""
await super().stop(frame)

self._shutdown_requested = True
self._recognition_active = False
self._recognition_terminated = True
await self._disconnect()

async def cancel(self, frame: CancelFrame):
"""Cancel the speech recognition service.

Immediately stops recognition and closes resources.

Args:
frame: Frame indicating cancellation.
"""
await super().cancel(frame)

self._shutdown_requested = True
self._recognition_active = False
self._recognition_terminated = True
await self._disconnect()

async def _disconnect(self):
"""Stop recognition and close audio streams."""
if self._speech_recognizer:
Expand All @@ -277,6 +324,25 @@ async def _handle_transcription(
"""Handle a transcription result with tracing."""
await self.stop_processing_metrics()

async def _trace_cancellation(
self,
*,
reason: str,
code: str,
recoverable: bool,
phase: str,
):
"""Record a trace span for a canceled Azure STT recognition."""
trace_stt_cancellation(
self,
error_type="azure.stt.canceled",
cancel_reason=reason,
cancel_code=code,
recoverable=recoverable,
phase=phase,
region=self._settings.region if isinstance(self._settings.region, str) else None,
)

def _on_handle_recognized(self, event):
if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:
language = getattr(event.result, "language", None) or self._settings.language
Expand Down Expand Up @@ -305,11 +371,88 @@ def _on_handle_recognizing(self, event):
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())

def _on_handle_canceled(self, event):
details = event.result.cancellation_details
if details.reason == CancellationReason.Error:
error_msg = f"Azure STT recognition canceled: {details.reason}"
if details.error_details:
error_msg += f" - {details.error_details}"
asyncio.run_coroutine_threadsafe(
self.push_error(error_msg=error_msg), self.get_event_loop()
details = getattr(event, "cancellation_details", None)
reason = self._normalize_cancellation_value(getattr(details, "reason", "UNKNOWN"))
code = self._normalize_cancellation_value(getattr(details, "code", "UNKNOWN"))
error_details = getattr(details, "error_details", "")
phase = self._get_cancellation_phase()
recoverable = self._is_cancellation_recoverable(reason, code)

self._recognition_active = False
self._recognition_terminated = True

logger.error(
"Azure STT recognition canceled: reason={}, code={}, phase={}, recoverable={}, details={}",
reason,
code,
phase,
recoverable,
error_details,
)

asyncio.run_coroutine_threadsafe(
self._trace_cancellation(
reason=reason,
code=code,
recoverable=recoverable,
phase=phase,
),
self.get_event_loop(),
)

error_message = f"Azure STT recognition canceled: {reason} ({code})"
asyncio.run_coroutine_threadsafe(
self.push_error(error_msg=error_message), self.get_event_loop()
)

def _on_handle_session_started(self, event):
self._recognition_active = True
self._recognition_terminated = False
logger.info(
"Azure STT session started: session_id={}",
getattr(event, "session_id", "unknown"),
)

def _on_handle_session_stopped(self, event):
self._recognition_active = False
self._recognition_terminated = True
if self._shutdown_requested:
logger.info(
"Azure STT session stopped during shutdown: session_id={}",
getattr(event, "session_id", "unknown"),
)
else:
logger.warning(
"Azure STT session stopped: session_id={}",
getattr(event, "session_id", "unknown"),
)

@staticmethod
def _normalize_cancellation_value(value: Any) -> str:
normalized = getattr(value, "name", None)
if normalized:
return normalized
return str(value)

def _get_cancellation_phase(self) -> str:
if self._shutdown_requested:
return "shutdown"
if not self._recognition_active and not self._audio_sent:
return "startup"
return "streaming"

@staticmethod
def _is_cancellation_recoverable(reason: str, code: str) -> bool:
if reason == "CancelledByUser":
return True
if reason != "Error":
return False

return code in {
"ConnectionFailure",
"ServiceRedirectPermanent",
"ServiceRedirectTemporary",
"ServiceTimeout",
"ServiceUnavailable",
"TooManyRequests",
}
45 changes: 45 additions & 0 deletions src/pipecat/utils/tracing/service_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,51 @@ async def wrapper(self, transcript, is_final, language=None):
return decorator


def trace_stt_cancellation(
service,
*,
error_type: str,
cancel_reason: str,
cancel_code: str,
recoverable: bool,
phase: str,
region: Optional[str] = None,
) -> None:
"""Create a trace span for STT cancellation events.

Args:
service: STT service instance generating the cancellation.
error_type: Stable error classification.
cancel_reason: Provider cancellation reason.
cancel_code: Provider cancellation code.
recoverable: Whether the application should attempt recovery.
phase: Service lifecycle phase where cancellation happened.
region: Cloud region associated with the service, if known.
"""
if not is_tracing_available() or not getattr(service, "_tracing_enabled", False):
return

service_class_name = service.__class__.__name__
parent_context = _get_turn_context(service) or _get_parent_service_context(service)

tracer = trace.get_tracer("pipecat")
with tracer.start_as_current_span("stt.cancel", context=parent_context) as current_span:
current_span.set_attribute(
"gen_ai.system", service_class_name.replace("STTService", "").lower()
)
current_span.set_attribute("gen_ai.operation.name", "stt.cancel")
current_span.set_attribute("error.type", error_type)
current_span.set_attribute("stt.cancel.reason", cancel_reason)
current_span.set_attribute("stt.cancel.code", cancel_code)
current_span.set_attribute("stt.cancel.recoverable", recoverable)
current_span.set_attribute("stt.cancel.phase", phase)
if region:
current_span.set_attribute("cloud.region", region)

if cancel_reason == "Error":
current_span.set_status(trace.Status(trace.StatusCode.ERROR, cancel_code))


def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -> Callable:
"""Trace LLM service methods with LLM-specific attributes.

Expand Down
Loading