diff --git a/examples/foundational/07zl-interruptible-smallest.py b/examples/foundational/07zl-interruptible-smallest.py new file mode 100644 index 0000000000..ddebd6e4aa --- /dev/null +++ b/examples/foundational/07zl-interruptible-smallest.py @@ -0,0 +1,122 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.smallest.stt import SmallestSTTService +from pipecat.services.smallest.tts import SmallestTTSService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = SmallestSTTService( + api_key=os.getenv("SMALLEST_API_KEY"), + ) + + tts = SmallestTTSService( + api_key=os.getenv("SMALLEST_API_KEY"), + voice_id="sophia", + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = LLMContext(messages) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + user_aggregator, + llm, + tts, + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/pyproject.toml b/pyproject.toml index 91afcc7949..1d8563de79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,7 @@ runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0. sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"] sambanova = [] sarvam = [ "sarvamai==0.1.26", "pipecat-ai[websockets-base]" ] +smallest = [ "pipecat-ai[websockets-base]" ] sentry = [ "sentry-sdk>=2.28.0,<3" ] silero = [] simli = [ "simli-ai~=2.0.1"] diff --git a/src/pipecat/services/smallest/__init__.py b/src/pipecat/services/smallest/__init__.py new file mode 100644 index 0000000000..40098b034a --- /dev/null +++ b/src/pipecat/services/smallest/__init__.py @@ -0,0 +1,14 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import sys + +from pipecat.services import DeprecatedModuleProxy + +from .stt import * +from .tts import * + +sys.modules[__name__] = DeprecatedModuleProxy(globals(), "smallest", "smallest.[stt,tts]") diff --git a/src/pipecat/services/smallest/stt.py b/src/pipecat/services/smallest/stt.py new file mode 100644 index 0000000000..f380afe578 --- /dev/null +++ b/src/pipecat/services/smallest/stt.py @@ -0,0 +1,314 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Smallest AI speech-to-text service implementation. + +This module provides a WebSocket-based real-time STT service using Smallest +AI's Waves API (Pulse model). Audio is streamed continuously over a WebSocket +connection and interim/final transcription results are received with low +latency. +""" + +import asyncio +import json +from typing import AsyncGenerator, Optional +from urllib.parse import urlencode + +from loguru import logger +from pydantic import BaseModel + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InterimTranscriptionFrame, + StartFrame, + TranscriptionFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.settings import STTSettings +from pipecat.services.stt_latency import SMALLEST_TTFS_P99 +from pipecat.services.stt_service import WebsocketSTTService +from pipecat.utils.time import time_now_iso8601 +from pipecat.utils.tracing.service_decorators import traced_stt + +try: + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Smallest, you need to `pip install pipecat-ai[smallest]`.") + raise Exception(f"Missing module: {e}") + + +class SmallestSTTService(WebsocketSTTService): + """Smallest AI real-time speech-to-text service using the Pulse WebSocket API. + + Streams audio continuously over a WebSocket connection and receives + interim and final transcription results with low latency. Best suited + for real-time voice applications where immediate feedback is needed. + + Uses Pipecat's VAD to detect when the user stops speaking and sends + a finalize message to flush the final transcript. + + Example:: + + stt = SmallestSTTService( + api_key="your-api-key", + params=SmallestSTTService.InputParams( + language="en", + word_timestamps=True, + ), + ) + """ + + class InputParams(BaseModel): + """Configuration parameters for Smallest STT service. + + Parameters: + language: Language code for transcription. Use "multi" for auto-detection. + Defaults to "en". + encoding: Audio encoding format. Defaults to "linear16". + word_timestamps: Include word-level timestamps. Defaults to False. + full_transcript: Include cumulative transcript. Defaults to False. + sentence_timestamps: Include sentence-level timestamps. Defaults to False. + redact_pii: Redact personally identifiable information. Defaults to False. + redact_pci: Redact payment card information. Defaults to False. + numerals: Convert spoken numerals to digits. Defaults to "auto". + diarize: Enable speaker diarization. Defaults to False. + """ + + language: str = "en" + encoding: str = "linear16" + word_timestamps: bool = False + full_transcript: bool = False + sentence_timestamps: bool = False + redact_pii: bool = False + redact_pci: bool = False + numerals: str = "auto" + diarize: bool = False + + def __init__( + self, + *, + api_key: str, + base_url: str = "wss://api.smallest.ai", + sample_rate: Optional[int] = None, + params: Optional[InputParams] = None, + ttfs_p99_latency: Optional[float] = SMALLEST_TTFS_P99, + **kwargs, + ): + """Initialize the Smallest AI STT service. + + Args: + api_key: Smallest AI API key for authentication. + base_url: Base WebSocket URL for the Smallest API. + sample_rate: Audio sample rate in Hz. If None, uses the pipeline's rate. + params: Configuration parameters for the STT service. + ttfs_p99_latency: P99 latency from speech end to final transcript in seconds. + **kwargs: Additional arguments passed to WebsocketSTTService. + """ + self._stt_params = params or SmallestSTTService.InputParams() + + super().__init__( + sample_rate=sample_rate, + ttfs_p99_latency=ttfs_p99_latency, + keepalive_timeout=10, + keepalive_interval=5, + settings=STTSettings(model="pulse", language=self._stt_params.language), + **kwargs, + ) + + self._api_key = api_key + self._base_url = base_url.rstrip("/") + self._receive_task = None + self._connected_event = asyncio.Event() + self._connected_event.set() + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics.""" + return True + + async def start(self, frame: StartFrame): + """Start the service and connect to the WebSocket.""" + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the service and disconnect from the WebSocket.""" + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the service and disconnect from the WebSocket.""" + await super().cancel(frame) + await self._disconnect() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames, handling VAD events for finalization.""" + await super().process_frame(frame, direction) + + if isinstance(frame, VADUserStartedSpeakingFrame): + await self.start_processing_metrics() + elif isinstance(frame, VADUserStoppedSpeakingFrame): + if self._websocket and self._websocket.state is State.OPEN: + try: + await self._websocket.send(json.dumps({"type": "finalize"})) + except Exception as e: + logger.warning(f"{self} failed to send finalize: {e}") + + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + """Send audio to the Smallest Pulse WebSocket for transcription. + + Args: + audio: Raw PCM audio bytes. + + Yields: + None -- transcription results arrive via WebSocket messages. + """ + await self._connected_event.wait() + + if not self._websocket or self._websocket.state is State.CLOSED: + await self._connect() + + if self._websocket and self._websocket.state is State.OPEN: + try: + await self._websocket.send(audio) + except Exception as e: + yield ErrorFrame(error=f"Smallest STT error: {e}") + + yield None + + async def _connect(self): + self._connected_event.clear() + try: + await self._connect_websocket() + await super()._connect() + + if self._websocket and not self._receive_task: + self._receive_task = self.create_task( + self._receive_task_handler(self._report_error) + ) + finally: + self._connected_event.set() + + async def _disconnect(self): + await super()._disconnect() + + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + """Establish WebSocket connection to the Smallest Pulse STT API.""" + try: + if self._websocket and self._websocket.state is State.OPEN: + return + + logger.debug("Connecting to Smallest STT") + + query_params = { + "language": self._stt_params.language, + "encoding": self._stt_params.encoding, + "sample_rate": str(self.sample_rate), + "word_timestamps": str(self._stt_params.word_timestamps).lower(), + "full_transcript": str(self._stt_params.full_transcript).lower(), + "sentence_timestamps": str(self._stt_params.sentence_timestamps).lower(), + "redact_pii": str(self._stt_params.redact_pii).lower(), + "redact_pci": str(self._stt_params.redact_pci).lower(), + "numerals": self._stt_params.numerals, + "diarize": str(self._stt_params.diarize).lower(), + } + + ws_url = f"{self._base_url}/waves/v1/pulse/get_text?{urlencode(query_params)}" + + self._websocket = await websocket_connect( + ws_url, + additional_headers={"Authorization": f"Bearer {self._api_key}"}, + ) + await self._call_event_handler("on_connected") + logger.debug("Connected to Smallest STT") + except Exception as e: + await self.push_error(error_msg=f"Smallest STT connection error: {e}", exception=e) + self._websocket = None + await self._call_event_handler("on_connection_error", f"{e}") + + async def _disconnect_websocket(self): + """Close the WebSocket connection.""" + try: + if self._websocket and self._websocket.state is State.OPEN: + logger.debug("Disconnecting from Smallest STT") + await self._websocket.close() + except Exception as e: + logger.error(f"{self} error closing websocket: {e}") + finally: + self._websocket = None + await self._call_event_handler("on_disconnected") + + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + + async def _receive_messages(self): + """Receive and process messages from the Smallest Pulse WebSocket.""" + async for message in self._get_websocket(): + try: + data = json.loads(message) + await self._process_response(data) + except json.JSONDecodeError: + logger.warning(f"{self} received non-JSON message: {message}") + except Exception as e: + logger.error(f"{self} error processing message: {e}") + + async def _process_response(self, data: dict): + """Process a transcription response from the Pulse API. + + Args: + data: Parsed JSON response containing transcript data. + """ + is_final = data.get("is_final", False) + text = data.get("transcript", "").strip() + + if not text: + return + + if is_final: + await self.stop_processing_metrics() + logger.debug(f"Smallest final transcript: [{text}]") + await self._handle_transcription(text, True, data.get("language")) + await self.push_frame( + TranscriptionFrame( + text, + self._user_id, + time_now_iso8601(), + data.get("language"), + result=data, + ) + ) + else: + logger.trace(f"Smallest interim transcript: [{text}]") + await self.push_frame( + InterimTranscriptionFrame( + text, + self._user_id, + time_now_iso8601(), + data.get("language"), + result=data, + ) + ) + + @traced_stt + async def _handle_transcription( + self, transcript: str, is_final: bool, language: Optional[str] = None + ): + """Handle a transcription result with tracing.""" + pass diff --git a/src/pipecat/services/smallest/tts.py b/src/pipecat/services/smallest/tts.py new file mode 100644 index 0000000000..648c17fc30 --- /dev/null +++ b/src/pipecat/services/smallest/tts.py @@ -0,0 +1,389 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Smallest AI text-to-speech service implementation. + +This module provides a WebSocket-based integration with Smallest AI's Waves +API for real-time text-to-speech synthesis with interruption support. +""" + +import base64 +import json +from enum import Enum +from typing import AsyncGenerator, Optional, Union + +from loguru import logger +from pydantic import BaseModel, Field + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InterruptionFrame, + StartFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.settings import TTSSettings +from pipecat.services.tts_service import InterruptibleTTSService +from pipecat.transcriptions.language import Language +from pipecat.utils.tracing.service_decorators import traced_tts + +try: + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Smallest, you need to `pip install pipecat-ai[smallest]`.") + raise Exception(f"Missing module: {e}") + + +class SmallestTTSModel(str, Enum): + """Available Smallest AI TTS models.""" + + LIGHTNING_V2 = "lightning-v2" + LIGHTNING_V3_1 = "lightning-v3.1" + + +def language_to_smallest_tts_language(language: Language) -> Optional[str]: + """Convert a Language enum to a Smallest TTS language string. + + Args: + language: The Language enum value to convert. + + Returns: + The Smallest language code string, or None if unsupported. + """ + BASE_LANGUAGES = { + Language.AR: "ar", + Language.BN: "bn", + Language.DE: "de", + Language.EN: "en", + Language.ES: "es", + Language.FR: "fr", + Language.GU: "gu", + Language.HE: "he", + Language.HI: "hi", + Language.IT: "it", + Language.KN: "kn", + Language.MR: "mr", + Language.NL: "nl", + Language.PL: "pl", + Language.RU: "ru", + Language.TA: "ta", + } + + result = BASE_LANGUAGES.get(language) + + if not result: + lang_str = str(language.value) + base_code = lang_str.split("-")[0].lower() + result = base_code if base_code in BASE_LANGUAGES.values() else None + + return result + + +class SmallestTTSService(InterruptibleTTSService): + """Smallest AI real-time text-to-speech service using WebSocket streaming. + + Provides real-time text-to-speech synthesis using Smallest AI's WebSocket API. + Supports streaming audio generation with configurable voice parameters and + language settings. Handles interruptions by reconnecting the WebSocket. + + Example:: + + tts = SmallestTTSService( + api_key="your-api-key", + voice_id="sophia", + params=SmallestTTSService.InputParams( + language=Language.EN, + speed=1.0, + ), + ) + """ + + class InputParams(BaseModel): + """Configuration parameters for Smallest TTS service. + + Parameters: + language: Language for synthesis. Defaults to English. + speed: Speech speed multiplier. Defaults to 1.0. + consistency: Consistency level for voice generation (0-1). Defaults to 0.5. + similarity: Similarity level for voice generation (0-1). Defaults to 0. + enhancement: Enhancement level for voice generation (0-2). Defaults to 1. + """ + + language: Optional[Language] = Language.EN + speed: Optional[Union[str, float]] = 1.0 + consistency: Optional[float] = Field(default=0.5, ge=0, le=1) + similarity: Optional[float] = Field(default=0, ge=0, le=1) + enhancement: Optional[int] = Field(default=1, ge=0, le=2) + + def __init__( + self, + *, + api_key: str, + voice_id: str = "sophia", + base_url: str = "wss://waves-api.smallest.ai", + model: str = SmallestTTSModel.LIGHTNING_V3_1, + sample_rate: Optional[int] = 24000, + params: Optional[InputParams] = None, + **kwargs, + ): + """Initialize the Smallest AI WebSocket TTS service. + + Args: + api_key: Smallest AI API key for authentication. + voice_id: Voice identifier for synthesis. + base_url: Base WebSocket URL for the Smallest API. + model: TTS model to use. Defaults to "lightning-v3.1". + sample_rate: Audio sample rate in Hz. Defaults to 24000. + params: Configuration parameters for the TTS service. + **kwargs: Additional arguments passed to parent InterruptibleTTSService. + """ + params = params or SmallestTTSService.InputParams() + model_str = model.value if isinstance(model, Enum) else model + lang_str = language_to_smallest_tts_language(params.language) if params.language else "en" + + super().__init__( + aggregate_sentences=True, + push_text_frames=True, + pause_frame_processing=True, + sample_rate=sample_rate, + settings=TTSSettings(model=model_str, voice=voice_id, language=lang_str), + **kwargs, + ) + + self._api_key = api_key + self._websocket_url = f"{base_url}/api/v1/{model_str}/get_speech/stream" + + self._tts_params = { + "language": lang_str, + "speed": params.speed, + "consistency": params.consistency, + "similarity": params.similarity, + "enhancement": params.enhancement, + } + + self._receive_task = None + self._context_id: Optional[str] = None + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Smallest service supports metrics generation. + """ + return True + + def language_to_service_language(self, language: Language) -> Optional[str]: + """Convert a Language enum to Smallest service language format. + + Args: + language: The language to convert. + + Returns: + The Smallest-specific language code, or None if not supported. + """ + return language_to_smallest_tts_language(language) + + def _build_msg(self, text: str) -> dict: + """Build a WebSocket message for the Smallest API. + + Args: + text: The text to synthesize. + + Returns: + Dictionary with the API message payload. + """ + msg = { + "text": text, + "voice_id": self._settings.voice, + "language": self._tts_params["language"], + "speed": self._tts_params["speed"], + "consistency": self._tts_params["consistency"], + "similarity": self._tts_params["similarity"], + "enhancement": self._tts_params["enhancement"], + } + + if self._context_id: + msg["request_id"] = self._context_id + + return msg + + async def start(self, frame: StartFrame): + """Start the Smallest TTS service. + + Args: + frame: The start frame containing initialization parameters. + """ + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the Smallest TTS service. + + Args: + frame: The end frame. + """ + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the Smallest TTS service. + + Args: + frame: The cancel frame. + """ + await super().cancel(frame) + await self._disconnect() + + async def _connect(self): + """Connect to Smallest WebSocket and start receive task.""" + await super()._connect() + + await self._connect_websocket() + + if self._websocket and not self._receive_task: + self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) + + async def _disconnect(self): + """Disconnect from Smallest WebSocket and clean up tasks.""" + await super()._disconnect() + + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + """Establish WebSocket connection to the Smallest API.""" + try: + if self._websocket and self._websocket.state is State.OPEN: + return + + logger.debug("Connecting to Smallest TTS") + + self._websocket = await websocket_connect( + self._websocket_url, + additional_headers={"Authorization": f"Bearer {self._api_key}"}, + ) + + await self._call_event_handler("on_connected") + except Exception as e: + await self.push_error(error_msg=f"Smallest TTS connection error: {e}", exception=e) + self._websocket = None + await self._call_event_handler("on_connection_error", f"{e}") + + async def _disconnect_websocket(self): + """Close the WebSocket connection and clean up state.""" + try: + await self.stop_all_metrics() + + if self._websocket: + logger.debug("Disconnecting from Smallest TTS") + await self._websocket.close() + except Exception as e: + logger.error(f"{self} error closing websocket: {e}") + finally: + self._context_id = None + self._websocket = None + await self._call_event_handler("on_disconnected") + + def _get_websocket(self): + """Get the WebSocket connection if available. + + Returns: + The active WebSocket connection. + + Raises: + Exception: If no WebSocket connection is available. + """ + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + + async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection): + """Handle an interruption by resetting state. + + Args: + frame: The interruption frame. + direction: The direction of frame processing. + """ + await super()._handle_interruption(frame, direction) + await self.stop_all_metrics() + self._context_id = None + + async def _receive_messages(self): + """Receive and process messages from the Smallest WebSocket API.""" + async for message in self._get_websocket(): + msg = json.loads(message) + status = msg.get("status") + + if status == "complete": + msg_request_id = msg.get("request_id") + if self._context_id and msg_request_id and msg_request_id == self._context_id: + await self.stop_all_metrics() + await self.push_frame(TTSStoppedFrame(context_id=self._context_id)) + self._context_id = None + elif status == "chunk": + await self.stop_ttfb_metrics() + frame = TTSAudioRawFrame( + audio=base64.b64decode(msg["data"]["audio"]), + sample_rate=self.sample_rate, + num_channels=1, + context_id=self._context_id, + ) + await self.push_frame(frame) + elif status == "error": + logger.error(f"{self} error: {msg}") + await self.push_frame(TTSStoppedFrame(context_id=self._context_id)) + await self.stop_all_metrics() + await self.push_error(error_msg=f"Smallest TTS error: {msg.get('error', msg)}") + self._context_id = None + else: + logger.warning(f"{self} unknown message status: {msg}") + + @traced_tts + async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using Smallest's WebSocket streaming API. + + Args: + text: The text to synthesize into speech. + context_id: Unique identifier for this TTS context. + + Yields: + Frame: TTSStartedFrame to signal start; audio arrives via WebSocket. + """ + logger.debug(f"{self}: Generating TTS [{text}]") + + try: + if not self._websocket or self._websocket.state is State.CLOSED: + await self._connect() + + try: + await self.start_ttfb_metrics() + self._context_id = context_id + yield TTSStartedFrame(context_id=context_id) + + msg = self._build_msg(text=text) + await self._get_websocket().send(json.dumps(msg)) + await self.start_tts_usage_metrics(text) + except Exception as e: + logger.error(f"{self} error sending message: {e}") + yield ErrorFrame(error=f"Smallest TTS send error: {e}") + yield TTSStoppedFrame(context_id=context_id) + await self._disconnect() + await self._connect() + return + yield None + except Exception as e: + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"Smallest TTS error: {e}") diff --git a/src/pipecat/services/stt_latency.py b/src/pipecat/services/stt_latency.py index 351e041a60..ba5d7089eb 100644 --- a/src/pipecat/services/stt_latency.py +++ b/src/pipecat/services/stt_latency.py @@ -44,9 +44,11 @@ OPENAI_REALTIME_TTFS_P99: float = 1.66 SAMBANOVA_TTFS_P99: float = 2.20 SARVAM_TTFS_P99: float = 1.17 +SMALLEST_TTFS_P99: float = DEFAULT_TTFS_P99 SONIOX_TTFS_P99: float = 0.35 SPEECHMATICS_TTFS_P99: float = 0.74 # These services run locally and should be replaced with measured values NVIDIA_TTFS_P99: float = DEFAULT_TTFS_P99 WHISPER_TTFS_P99: float = DEFAULT_TTFS_P99 + diff --git a/uv.lock b/uv.lock index 144b30e668..e18e085a2a 100644 --- a/uv.lock +++ b/uv.lock @@ -4725,6 +4725,9 @@ sentry = [ simli = [ { name = "simli-ai" }, ] +smallest = [ + { name = "websockets" }, +] soniox = [ { name = "websockets" }, ] @@ -4861,6 +4864,7 @@ requires-dist = [ { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'resembleai'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'rime'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'sarvam'" }, + { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'smallest'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'soniox'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'ultravox'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'websocket'" }, @@ -4898,7 +4902,7 @@ requires-dist = [ { name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1,<1" }, { name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" }, ] -provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "camb", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "kokoro", "krisp", "langchain", "lemonslice", "livekit", "lmnt", "local", "local-smart-turn", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "rnnoise", "openpipe", "openrouter", "perplexity", "piper", "qwen", "remote-smart-turn", "resembleai", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"] +provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "camb", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "kokoro", "krisp", "langchain", "lemonslice", "livekit", "lmnt", "local", "local-smart-turn", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "rnnoise", "openpipe", "openrouter", "perplexity", "piper", "qwen", "remote-smart-turn", "resembleai", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "smallest", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"] [package.metadata.requires-dev] dev = [