diff --git a/README.md b/README.md index 8af4f942ca..6313503773 100644 --- a/README.md +++ b/README.md @@ -81,19 +81,19 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout ## 🧩 Available services -| Category | Services | -| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | -| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) | -| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | -| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/server/services/s2s/ultravox), | -| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local | -| Serializers | [Exotel](https://docs.pipecat.ai/server/utilities/serializers/exotel), [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx), [Vonage](https://docs.pipecat.ai/server/utilities/serializers/vonage) | -| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [LemonSlice](https://docs.pipecat.ai/server/services/video/lemonslice), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | -| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) | -| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/google-imagen), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) | -| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) | -| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | +| Category | Services | +| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Smallest](https://docs.pipecat.ai/server/services/stt/smallest), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | +| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) | +| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Smallest](https://docs.pipecat.ai/server/services/tts/smallest), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | +| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/server/services/s2s/ultravox), | +| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local | +| Serializers | [Exotel](https://docs.pipecat.ai/server/utilities/serializers/exotel), [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx), [Vonage](https://docs.pipecat.ai/server/utilities/serializers/vonage) | +| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [LemonSlice](https://docs.pipecat.ai/server/services/video/lemonslice), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | +| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) | +| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/google-imagen), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) | +| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) | +| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | 📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services) diff --git a/examples/foundational/07zl-interruptible-smallest.py b/examples/foundational/07zl-interruptible-smallest.py new file mode 100644 index 0000000000..6bc9a34b03 --- /dev/null +++ b/examples/foundational/07zl-interruptible-smallest.py @@ -0,0 +1,122 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.smallest.stt import SmallestSTTService +from pipecat.services.smallest.tts import SmallestTTSService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +load_dotenv(override=True) + + +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = SmallestSTTService( + api_key=os.getenv("SMALLEST_API_KEY"), + ) + + tts = SmallestTTSService( + api_key=os.getenv("SMALLEST_API_KEY"), + settings=SmallestTTSService.Settings( + voice="sophia", + ), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + settings=OpenAILLMService.Settings( + system_instruction="You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.", + ), + ) + + context = LLMContext() + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + user_aggregator, + llm, + tts, + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + context.add_message({"role": "user", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/pyproject.toml b/pyproject.toml index 48226b14fa..1b9acfe963 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,7 @@ runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0. sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"] sambanova = [] sarvam = [ "sarvamai==0.1.26", "pipecat-ai[websockets-base]" ] +smallest = [ "pipecat-ai[websockets-base]" ] sentry = [ "sentry-sdk>=2.28.0,<3" ] silero = [] simli = [ "simli-ai~=2.0.1"] diff --git a/src/pipecat/services/smallest/__init__.py b/src/pipecat/services/smallest/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/pipecat/services/smallest/__init__.py @@ -0,0 +1 @@ + diff --git a/src/pipecat/services/smallest/stt.py b/src/pipecat/services/smallest/stt.py new file mode 100644 index 0000000000..a7883881eb --- /dev/null +++ b/src/pipecat/services/smallest/stt.py @@ -0,0 +1,395 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Smallest AI speech-to-text service implementation. + +This module provides a STT service using Smallest AI's Waves API: + +- ``SmallestSTTService``: WebSocket-based real-time STT. Streams audio + continuously and receives interim/final transcripts with low latency. +""" + +import asyncio +import json +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, AsyncGenerator, Optional +from urllib.parse import urlencode + +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + InterimTranscriptionFrame, + StartFrame, + TranscriptionFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven +from pipecat.services.stt_latency import SMALLEST_TTFS_P99 +from pipecat.services.stt_service import WebsocketSTTService +from pipecat.transcriptions.language import Language, resolve_language +from pipecat.utils.time import time_now_iso8601 +from pipecat.utils.tracing.service_decorators import traced_stt + +try: + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Smallest, you need to `pip install pipecat-ai[smallest]`.") + raise Exception(f"Missing module: {e}") + + +def language_to_smallest_stt_language(language: Language) -> str: + """Convert a Language enum to Smallest STT language code. + + Args: + language: The Language enum value to convert. + + Returns: + The Smallest language code string. + """ + LANGUAGE_MAP = { + Language.BG: "bg", + Language.BN: "bn", + Language.CS: "cs", + Language.DA: "da", + Language.DE: "de", + Language.EN: "en", + Language.ES: "es", + Language.ET: "et", + Language.FI: "fi", + Language.FR: "fr", + Language.GU: "gu", + Language.HI: "hi", + Language.HU: "hu", + Language.IT: "it", + Language.KN: "kn", + Language.LT: "lt", + Language.LV: "lv", + Language.ML: "ml", + Language.MR: "mr", + Language.MT: "mt", + Language.NL: "nl", + Language.OR: "or", + Language.PA: "pa", + Language.PL: "pl", + Language.PT: "pt", + Language.RO: "ro", + Language.RU: "ru", + Language.SK: "sk", + Language.SV: "sv", + Language.TA: "ta", + Language.TE: "te", + Language.UK: "uk", + } + + return resolve_language(language, LANGUAGE_MAP) + + +class SmallestSTTModel(str, Enum): + """Available Smallest AI STT models.""" + + PULSE = "pulse" + + +@dataclass +class SmallestSTTSettings(STTSettings): + """Settings for SmallestSTTService. + + Parameters: + word_timestamps: Include word-level timestamps. + full_transcript: Include cumulative transcript. + sentence_timestamps: Include sentence-level timestamps. + redact_pii: Redact personally identifiable information. + redact_pci: Redact payment card information. + numerals: Convert spoken numerals to digits. + diarize: Enable speaker diarization. + """ + + word_timestamps: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + full_transcript: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + sentence_timestamps: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + redact_pii: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + redact_pci: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + numerals: str | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + diarize: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + + +class SmallestSTTService(WebsocketSTTService): + """Smallest AI real-time speech-to-text service using the Pulse WebSocket API. + + Streams audio continuously over a WebSocket connection and receives + interim and final transcription results with low latency. Best suited + for real-time voice applications where immediate feedback is needed. + + Uses Pipecat's VAD to detect when the user stops speaking and sends + a finalize message to flush the final transcript. + + Example:: + + stt = SmallestSTTService( + api_key="your-api-key", + settings=SmallestSTTService.Settings( + language="en", + word_timestamps=True, + ), + ) + """ + + Settings = SmallestSTTSettings + _settings: Settings + + def __init__( + self, + *, + api_key: str, + base_url: str = "wss://api.smallest.ai", + encoding: str = "linear16", + sample_rate: Optional[int] = None, + settings: Optional[Settings] = None, + ttfs_p99_latency: Optional[float] = SMALLEST_TTFS_P99, + **kwargs, + ): + """Initialize the Smallest AI STT service. + + Args: + api_key: Smallest AI API key for authentication. + base_url: Base WebSocket URL for the Smallest API. + encoding: Audio encoding format. Defaults to "linear16". + sample_rate: Audio sample rate in Hz. If None, uses the pipeline's rate. + settings: Runtime-updatable settings for the STT service. + ttfs_p99_latency: P99 latency from speech end to final transcript in seconds. + **kwargs: Additional arguments passed to WebsocketSTTService. + """ + default_settings = self.Settings( + model=SmallestSTTModel.PULSE.value, + language=language_to_smallest_stt_language(Language.EN), + word_timestamps=False, + full_transcript=False, + sentence_timestamps=False, + redact_pii=False, + redact_pci=False, + numerals="auto", + diarize=False, + ) + + if settings is not None: + default_settings.apply_update(settings) + + super().__init__( + sample_rate=sample_rate, + ttfs_p99_latency=ttfs_p99_latency, + keepalive_timeout=10, + keepalive_interval=5, + settings=default_settings, + **kwargs, + ) + + self._api_key = api_key + self._base_url = base_url.rstrip("/") + self._encoding = encoding + self._receive_task = None + self._connected_event = asyncio.Event() + self._connected_event.set() + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics.""" + return True + + async def start(self, frame: StartFrame): + """Start the service and connect to the WebSocket.""" + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the service and disconnect from the WebSocket.""" + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the service and disconnect from the WebSocket.""" + await super().cancel(frame) + await self._disconnect() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames, handling VAD events for finalization.""" + await super().process_frame(frame, direction) + + if isinstance(frame, VADUserStartedSpeakingFrame): + await self.start_processing_metrics() + elif isinstance(frame, VADUserStoppedSpeakingFrame): + if self._websocket and self._websocket.state is State.OPEN: + try: + await self._websocket.send(json.dumps({"type": "finalize"})) + except Exception as e: + logger.warning(f"{self} failed to send finalize: {e}") + + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + """Send audio to the Smallest Pulse WebSocket for transcription. + + Args: + audio: Raw PCM audio bytes. + + Yields: + None -- transcription results arrive via WebSocket messages. + """ + await self._connected_event.wait() + + if not self._websocket or self._websocket.state is State.CLOSED: + await self._connect() + + if self._websocket and self._websocket.state is State.OPEN: + try: + await self._websocket.send(audio) + except Exception as e: + yield ErrorFrame(error=f"Smallest STT error: {e}") + + yield None + + async def _update_settings(self, delta: STTSettings) -> dict[str, Any]: + """Apply a settings delta and reconnect if anything changed.""" + changed = await super()._update_settings(delta) + + if changed: + await self._disconnect() + await self._connect() + + return changed + + async def _connect(self): + self._connected_event.clear() + try: + await self._connect_websocket() + await super()._connect() + + if self._websocket and not self._receive_task: + self._receive_task = self.create_task( + self._receive_task_handler(self._report_error) + ) + finally: + self._connected_event.set() + + async def _disconnect(self): + await super()._disconnect() + + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + """Establish WebSocket connection to the Smallest Pulse STT API.""" + try: + if self._websocket and self._websocket.state is State.OPEN: + return + + logger.debug("Connecting to Smallest STT") + + query_params = { + "language": self._settings.language, + "encoding": self._encoding, + "sample_rate": str(self.sample_rate), + "word_timestamps": str(self._settings.word_timestamps).lower(), + "full_transcript": str(self._settings.full_transcript).lower(), + "sentence_timestamps": str(self._settings.sentence_timestamps).lower(), + "redact_pii": str(self._settings.redact_pii).lower(), + "redact_pci": str(self._settings.redact_pci).lower(), + "numerals": self._settings.numerals, + "diarize": str(self._settings.diarize).lower(), + } + + ws_url = f"{self._base_url}/waves/v1/pulse/get_text?{urlencode(query_params)}" + + self._websocket = await websocket_connect( + ws_url, + additional_headers={"Authorization": f"Bearer {self._api_key}"}, + ) + await self._call_event_handler("on_connected") + logger.debug("Connected to Smallest STT") + except Exception as e: + await self.push_error(error_msg=f"Smallest STT connection error: {e}", exception=e) + self._websocket = None + await self._call_event_handler("on_connection_error", f"{e}") + + async def _disconnect_websocket(self): + """Close the WebSocket connection.""" + try: + if self._websocket and self._websocket.state is State.OPEN: + logger.debug("Disconnecting from Smallest STT") + await self._websocket.close() + except Exception as e: + logger.error(f"{self} error closing websocket: {e}") + finally: + self._websocket = None + await self._call_event_handler("on_disconnected") + + def _get_websocket(self): + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + + async def _receive_messages(self): + """Receive and process messages from the Smallest Pulse WebSocket.""" + async for message in self._get_websocket(): + try: + data = json.loads(message) + await self._process_response(data) + except json.JSONDecodeError: + logger.warning(f"{self} received non-JSON message: {message}") + except Exception as e: + logger.error(f"{self} error processing message: {e}") + + async def _process_response(self, data: dict): + """Process a transcription response from the Pulse API. + + Args: + data: Parsed JSON response containing transcript data. + """ + is_final = data.get("is_final", False) + text = data.get("transcript", "").strip() + + if not text: + return + + if is_final: + await self.stop_processing_metrics() + logger.debug(f"Smallest final transcript: [{text}]") + await self._handle_transcription(text, True, data.get("language")) + await self.push_frame( + TranscriptionFrame( + text, + self._user_id, + time_now_iso8601(), + data.get("language"), + result=data, + ) + ) + else: + logger.trace(f"Smallest interim transcript: [{text}]") + await self.push_frame( + InterimTranscriptionFrame( + text, + self._user_id, + time_now_iso8601(), + data.get("language"), + result=data, + ) + ) + + @traced_stt + async def _handle_transcription( + self, transcript: str, is_final: bool, language: Optional[str] = None + ): + """Handle a transcription result with tracing.""" + pass diff --git a/src/pipecat/services/smallest/tts.py b/src/pipecat/services/smallest/tts.py new file mode 100644 index 0000000000..ea2d03c663 --- /dev/null +++ b/src/pipecat/services/smallest/tts.py @@ -0,0 +1,424 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Smallest AI text-to-speech service implementation. + +This module provides a WebSocket-based integration with Smallest AI's +Waves API for real-time text-to-speech synthesis. +""" + +import asyncio +import base64 +import json +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, AsyncGenerator, Optional + +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + ErrorFrame, + Frame, + StartFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven +from pipecat.services.tts_service import InterruptibleTTSService +from pipecat.transcriptions.language import Language +from pipecat.utils.tracing.service_decorators import traced_tts + +try: + from websockets.asyncio.client import connect as websocket_connect + from websockets.protocol import State +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use Smallest, you need to `pip install pipecat-ai[smallest]`.") + raise Exception(f"Missing module: {e}") + + +class SmallestTTSModel(str, Enum): + """Available Smallest AI TTS models.""" + + LIGHTNING_V2 = "lightning-v2" + LIGHTNING_V3_1 = "lightning-v3.1" + + +def language_to_smallest_tts_language(language: Language) -> Optional[str]: + """Convert a Language enum to a Smallest TTS language string. + + Args: + language: The Language enum value to convert. + + Returns: + The Smallest language code string, or None if unsupported. + """ + BASE_LANGUAGES = { + Language.AR: "ar", + Language.BN: "bn", + Language.DE: "de", + Language.EN: "en", + Language.ES: "es", + Language.FR: "fr", + Language.GU: "gu", + Language.HE: "he", + Language.HI: "hi", + Language.IT: "it", + Language.KN: "kn", + Language.MR: "mr", + Language.NL: "nl", + Language.PL: "pl", + Language.RU: "ru", + Language.TA: "ta", + } + + result = BASE_LANGUAGES.get(language) + + if not result: + lang_str = str(language.value) + base_code = lang_str.split("-")[0].lower() + result = base_code if base_code in BASE_LANGUAGES.values() else None + + return result + + +@dataclass +class SmallestTTSSettings(TTSSettings): + """Settings for SmallestTTSService. + + Parameters: + speed: Speech speed multiplier. + consistency: Consistency level for voice generation (0-1). + similarity: Similarity level for voice generation (0-1). + enhancement: Enhancement level for voice generation (0-2). + """ + + speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + consistency: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + similarity: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + enhancement: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN) + + +class SmallestTTSService(InterruptibleTTSService): + """Smallest AI real-time text-to-speech service using WebSocket streaming. + + Provides real-time text-to-speech synthesis using Smallest AI's WebSocket API. + Supports streaming audio generation with configurable voice parameters and + language settings. Handles interruptions by reconnecting the WebSocket. + + Example:: + + tts = SmallestTTSService( + api_key="your-api-key", + settings=SmallestTTSService.Settings( + voice="sophia", + language="en", + speed=1.0, + ), + ) + """ + + Settings = SmallestTTSSettings + _settings: Settings + + def __init__( + self, + *, + api_key: str, + base_url: str = "wss://waves-api.smallest.ai", + sample_rate: Optional[int] = None, + settings: Optional[Settings] = None, + **kwargs, + ): + """Initialize the Smallest AI WebSocket TTS service. + + Args: + api_key: Smallest AI API key for authentication. + base_url: Base WebSocket URL for the Smallest API. + sample_rate: Audio sample rate in Hz. If None, uses default. + settings: Runtime-updatable settings for the TTS service. + **kwargs: Additional arguments passed to parent InterruptibleTTSService. + """ + default_settings = self.Settings( + model=SmallestTTSModel.LIGHTNING_V3_1.value, + voice="sophia", + language=language_to_smallest_tts_language(Language.EN), + speed=None, + consistency=None, + similarity=None, + enhancement=None, + ) + + if settings is not None: + default_settings.apply_update(settings) + + super().__init__( + aggregate_sentences=True, + push_text_frames=True, + pause_frame_processing=True, + sample_rate=sample_rate, + settings=default_settings, + **kwargs, + ) + + self._api_key = api_key + self._base_url = base_url.rstrip("/") + self._receive_task = None + self._keepalive_task = None + self._context_id: Optional[str] = None + + def can_generate_metrics(self) -> bool: + """Check if this service can generate processing metrics. + + Returns: + True, as Smallest service supports metrics generation. + """ + return True + + def language_to_service_language(self, language: Language) -> Optional[str]: + """Convert a Language enum to Smallest service language format. + + Args: + language: The language to convert. + + Returns: + The Smallest-specific language code, or None if not supported. + """ + return language_to_smallest_tts_language(language) + + def _build_msg(self, text: str) -> dict: + """Build a WebSocket message for the Smallest API. + + Args: + text: The text to synthesize. + + Returns: + Dictionary with the API message payload. + """ + msg = { + "text": text, + "voice_id": self._settings.voice, + "language": self._settings.language, + "sample_rate": self.sample_rate, + } + + if self._settings.speed is not None: + msg["speed"] = self._settings.speed + if self._settings.consistency is not None: + msg["consistency"] = self._settings.consistency + if self._settings.similarity is not None: + msg["similarity"] = self._settings.similarity + if self._settings.enhancement is not None: + msg["enhancement"] = self._settings.enhancement + + if self._context_id: + msg["request_id"] = self._context_id + + return msg + + def _build_websocket_url(self) -> str: + """Build the WebSocket URL from base URL and model.""" + return f"{self._base_url}/api/v1/{self._settings.model}/get_speech/stream" + + async def start(self, frame: StartFrame): + """Start the Smallest TTS service. + + Args: + frame: The start frame containing initialization parameters. + """ + await super().start(frame) + await self._connect() + + async def stop(self, frame: EndFrame): + """Stop the Smallest TTS service. + + Args: + frame: The end frame. + """ + await super().stop(frame) + await self._disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the Smallest TTS service. + + Args: + frame: The cancel frame. + """ + await super().cancel(frame) + await self._disconnect() + + async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]: + """Apply a settings delta, reconnecting if model changed. + + Per-message fields (speed, consistency, similarity, enhancement, voice, + language) apply automatically on the next ``_build_msg`` call. A model + change requires reconnecting because the model is part of the WebSocket URL. + """ + changed = await super()._update_settings(delta) + + if not changed: + return changed + + if "model" in changed: + await self._disconnect() + await self._connect() + + return changed + + async def _connect(self): + """Connect to Smallest WebSocket and start receive task.""" + await super()._connect() + + await self._connect_websocket() + + if self._websocket and not self._receive_task: + self._receive_task = self.create_task(self._receive_task_handler(self._report_error)) + + if self._websocket and not self._keepalive_task: + self._keepalive_task = self.create_task(self._keepalive_task_handler()) + + async def _disconnect(self): + """Disconnect from Smallest WebSocket and clean up tasks.""" + await super()._disconnect() + + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + if self._keepalive_task: + await self.cancel_task(self._keepalive_task) + self._keepalive_task = None + + await self._disconnect_websocket() + + async def _connect_websocket(self): + """Establish WebSocket connection to the Smallest API.""" + try: + if self._websocket and self._websocket.state is State.OPEN: + return + + logger.debug("Connecting to Smallest TTS") + + self._websocket = await websocket_connect( + self._build_websocket_url(), + additional_headers={"Authorization": f"Bearer {self._api_key}"}, + ) + + await self._call_event_handler("on_connected") + except Exception as e: + await self.push_error(error_msg=f"Smallest TTS connection error: {e}", exception=e) + self._websocket = None + await self._call_event_handler("on_connection_error", f"{e}") + + async def _disconnect_websocket(self): + """Close the WebSocket connection and clean up state.""" + try: + await self.stop_all_metrics() + + if self._websocket: + logger.debug("Disconnecting from Smallest TTS") + await self._websocket.close() + except Exception as e: + logger.error(f"{self} error closing websocket: {e}") + finally: + self._context_id = None + self._websocket = None + await self._call_event_handler("on_disconnected") + + def _get_websocket(self): + """Get the WebSocket connection if available. + + Returns: + The active WebSocket connection. + + Raises: + Exception: If no WebSocket connection is available. + """ + if self._websocket: + return self._websocket + raise Exception("Websocket not connected") + + async def _keepalive_task_handler(self): + """Send periodic keepalive messages to prevent idle timeout.""" + KEEPALIVE_INTERVAL = 30 + while True: + await asyncio.sleep(KEEPALIVE_INTERVAL) + await self._send_keepalive() + + async def _send_keepalive(self): + """Send a flush message to keep the connection alive.""" + if self._websocket and self._websocket.state is State.OPEN: + msg = {"flush": True} + await self._websocket.send(json.dumps(msg)) + + async def _receive_messages(self): + """Receive and process messages from the Smallest WebSocket API.""" + async for message in self._get_websocket(): + msg = json.loads(message) + status = msg.get("status") + + if status == "complete": + msg_request_id = msg.get("request_id") + if self._context_id and msg_request_id and msg_request_id == self._context_id: + await self.stop_all_metrics() + await self.push_frame(TTSStoppedFrame(context_id=self._context_id)) + self._context_id = None + elif status == "chunk": + await self.stop_ttfb_metrics() + frame = TTSAudioRawFrame( + audio=base64.b64decode(msg["data"]["audio"]), + sample_rate=self.sample_rate, + num_channels=1, + context_id=self._context_id, + ) + await self.push_frame(frame) + elif status == "error": + logger.error(f"{self} error: {msg}") + await self.push_frame(TTSStoppedFrame(context_id=self._context_id)) + await self.stop_all_metrics() + await self.push_error(error_msg=f"Smallest TTS error: {msg.get('error', msg)}") + self._context_id = None + else: + logger.warning(f"{self} unknown message status: {msg}") + + @traced_tts + async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]: + """Generate speech from text using Smallest's WebSocket streaming API. + + Args: + text: The text to synthesize into speech. + context_id: Unique identifier for this TTS context. + + Yields: + Frame: TTSStartedFrame to signal start; audio arrives via WebSocket. + """ + logger.debug(f"{self}: Generating TTS [{text}]") + + try: + if not self._websocket or self._websocket.state is State.CLOSED: + await self._connect() + + try: + self._context_id = context_id + yield TTSStartedFrame(context_id=context_id) + + msg = self._build_msg(text=text) + await self._get_websocket().send(json.dumps(msg)) + await self.start_tts_usage_metrics(text) + except Exception as e: + logger.error(f"{self} error sending message: {e}") + yield ErrorFrame(error=f"Smallest TTS send error: {e}") + yield TTSStoppedFrame(context_id=context_id) + await self._disconnect() + await self._connect() + return + yield None + except Exception as e: + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"Smallest TTS error: {e}") diff --git a/src/pipecat/services/stt_latency.py b/src/pipecat/services/stt_latency.py index 351e041a60..974fbd9acf 100644 --- a/src/pipecat/services/stt_latency.py +++ b/src/pipecat/services/stt_latency.py @@ -44,6 +44,7 @@ OPENAI_REALTIME_TTFS_P99: float = 1.66 SAMBANOVA_TTFS_P99: float = 2.20 SARVAM_TTFS_P99: float = 1.17 +SMALLEST_TTFS_P99: float = DEFAULT_TTFS_P99 SONIOX_TTFS_P99: float = 0.35 SPEECHMATICS_TTFS_P99: float = 0.74 diff --git a/uv.lock b/uv.lock index b68216261e..dad52f51fd 100644 --- a/uv.lock +++ b/uv.lock @@ -4715,6 +4715,9 @@ sentry = [ simli = [ { name = "simli-ai" }, ] +smallest = [ + { name = "websockets" }, +] soniox = [ { name = "websockets" }, ] @@ -4851,6 +4854,7 @@ requires-dist = [ { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'resembleai'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'rime'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'sarvam'" }, + { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'smallest'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'soniox'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'ultravox'" }, { name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'websocket'" }, @@ -4888,7 +4892,7 @@ requires-dist = [ { name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1,<1" }, { name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" }, ] -provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "camb", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "kokoro", "krisp", "langchain", "lemonslice", "livekit", "lmnt", "local", "local-smart-turn", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "rnnoise", "openpipe", "openrouter", "perplexity", "piper", "qwen", "remote-smart-turn", "resembleai", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"] +provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "camb", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "kokoro", "krisp", "langchain", "lemonslice", "livekit", "lmnt", "local", "local-smart-turn", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "rnnoise", "openpipe", "openrouter", "perplexity", "piper", "qwen", "remote-smart-turn", "resembleai", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "smallest", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"] [package.metadata.requires-dev] dev = [