From 40ed9a7e2579081fb72b5a5f5d9822cd6da75efe Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 14 Nov 2025 09:08:57 -0500 Subject: [PATCH] Improve SimpleTextAggregator to handle multi-sentence chunks --- CHANGELOG.md | 7 +++ src/pipecat/services/tts_service.py | 21 +++++--- .../utils/text/simple_text_aggregator.py | 51 ++++++++++++++----- tests/test_simple_text_aggregator.py | 40 ++++++++++++--- 4 files changed, 95 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de1f520a79..2daeae1e83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `maybe_capture_participant_screen()` for `SmallWebRTCTransport` in the runner utils. +- Improved `SimpleTextAggregator` and `TTSService` to properly handle buffered + sentences at the end of LLM responses. Previously, when an LLM response ended, + any complete sentences remaining in the aggregator's buffer would be sent to + TTS as one large chunk. Now these sentences are flushed individually, providing + better interruption points. Added `flush_next_sentence()` method to + `SimpleTextAggregator` to extract buffered sentences without adding new text. + - Added Hindi support for Rime TTS services. - Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index f0d602a405..9bc19f4024 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -352,17 +352,26 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # pause to avoid audio overlapping. await self._maybe_pause_frame_processing() - sentence = self._text_aggregator.text + # Flush any remaining complete sentences from the aggregator. + # This ensures all buffered sentences are sent to TTS individually. + sentence = await self._text_aggregator.flush_next_sentence() includes_inter_frame_spaces = self._aggregated_text_includes_inter_frame_spaces + while sentence: + await self._push_tts_frames( + sentence, includes_inter_frame_spaces=includes_inter_frame_spaces + ) + sentence = await self._text_aggregator.flush_next_sentence() + + # Send any remaining incomplete text + remaining_text = self._text_aggregator.text + if remaining_text: + await self._push_tts_frames( + remaining_text, includes_inter_frame_spaces=includes_inter_frame_spaces + ) - # Reset aggregator state await self._text_aggregator.reset() self._processing_text = False self._aggregated_text_includes_inter_frame_spaces = False - - await self._push_tts_frames( - sentence, includes_inter_frame_spaces=includes_inter_frame_spaces - ) if isinstance(frame, LLMFullResponseEndFrame): if self._push_text_frames: await self.push_frame(frame, direction) diff --git a/src/pipecat/utils/text/simple_text_aggregator.py b/src/pipecat/utils/text/simple_text_aggregator.py index f9eb7d83a4..aa65222560 100644 --- a/src/pipecat/utils/text/simple_text_aggregator.py +++ b/src/pipecat/utils/text/simple_text_aggregator.py @@ -41,30 +41,57 @@ def text(self) -> str: """ return self._text + def _extract_next_sentence(self) -> Optional[str]: + """Extract the next complete sentence from the buffer. + + Returns: + The first complete sentence if a sentence boundary is found, + or None if the buffer is empty or contains only incomplete text. + """ + eos_end_marker = match_endofsentence(self._text) + if eos_end_marker: + # Extract the first complete sentence + sentence = self._text[:eos_end_marker] + # Remove it from buffer + self._text = self._text[eos_end_marker:] + return sentence + + return None + async def aggregate(self, text: str) -> Optional[str]: - """Aggregate text and return completed sentences. + """Aggregate text and return the first completed sentence. - Adds the new text to the buffer and checks for end-of-sentence markers. - When a sentence boundary is found, returns the completed sentence and - removes it from the buffer. + Adds the new text to the buffer and checks for sentence boundaries. + When a sentence boundary is found, returns the first completed sentence + and removes it from the buffer. Subsequent calls (even with empty strings) + will return additional complete sentences if they exist in the buffer. + + This handles varying input patterns from different LLM providers: + - Word-by-word tokens (e.g., 'Hello', '!', ' I', ' am', ' Doug.') + - Chunks with one or more sentences (e.g., 'Hello! I am Doug. Nice to meet you!') Args: text: New text to add to the aggregation buffer. Returns: - A complete sentence if an end-of-sentence marker is found, + The first complete sentence if a sentence boundary is found, or None if more text is needed to complete a sentence. """ - result: Optional[str] = None - self._text += text + return self._extract_next_sentence() - eos_end_marker = match_endofsentence(self._text) - if eos_end_marker: - result = self._text[:eos_end_marker] - self._text = self._text[eos_end_marker:] + async def flush_next_sentence(self) -> Optional[str]: + """Retrieve the next complete sentence from the buffer without adding new text. + + This method extracts the next complete sentence from the internal buffer + without requiring new input text. It's useful for draining multiple + complete sentences that were received in a single chunk. - return result + Returns: + The next complete sentence if one exists in the buffer, or None if + the buffer is empty or contains only incomplete text. + """ + return self._extract_next_sentence() async def handle_interruption(self): """Handle interruptions by clearing the text buffer. diff --git a/tests/test_simple_text_aggregator.py b/tests/test_simple_text_aggregator.py index ff6dd18478..eb4980dc0b 100644 --- a/tests/test_simple_text_aggregator.py +++ b/tests/test_simple_text_aggregator.py @@ -19,11 +19,39 @@ async def test_reset_aggregations(self): await self.aggregator.reset() assert self.aggregator.text == "" - async def test_simple_sentence(self): - assert await self.aggregator.aggregate("Hello ") == None - assert await self.aggregator.aggregate("Pipecat!") == "Hello Pipecat!" + async def test_word_by_word(self): + """Test word-by-word token aggregation (e.g., OpenAI).""" + assert await self.aggregator.aggregate("Hello") == None + assert await self.aggregator.aggregate("!") == "Hello!" + assert await self.aggregator.aggregate(" I") == None + assert await self.aggregator.aggregate(" am") == None + assert await self.aggregator.aggregate(" Doug.") == " I am Doug." assert self.aggregator.text == "" - async def test_multiple_sentences(self): - assert await self.aggregator.aggregate("Hello Pipecat! How are ") == "Hello Pipecat!" - assert await self.aggregator.aggregate("you?") == " How are you?" + async def test_chunks_with_partial_sentences(self): + """Test chunks with partial sentences.""" + assert await self.aggregator.aggregate("Hey!") == "Hey!" + assert await self.aggregator.aggregate(" Nice to meet you! So") == " Nice to meet you!" + assert self.aggregator.text == " So" + assert await self.aggregator.aggregate(" what") == None + assert await self.aggregator.aggregate("'d you like?") == " So what'd you like?" + + async def test_multi_sentence_chunk(self): + """Test chunks with multiple complete sentences.""" + result = await self.aggregator.aggregate("Hello! I am Doug. Nice to meet you!") + assert result == "Hello!" + # Drain remaining sentences + assert await self.aggregator.flush_next_sentence() == " I am Doug." + assert await self.aggregator.flush_next_sentence() == " Nice to meet you!" + assert await self.aggregator.flush_next_sentence() == None + assert self.aggregator.text == "" + + async def test_flush_next_sentence_with_incomplete(self): + """Test flush_next_sentence with incomplete sentence in buffer.""" + assert await self.aggregator.aggregate("Hello! I am") == "Hello!" + assert await self.aggregator.flush_next_sentence() == None + assert self.aggregator.text == " I am" + + async def test_flush_next_sentence_empty_buffer(self): + """Test flush_next_sentence with empty buffer.""" + assert await self.aggregator.flush_next_sentence() == None