Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`maybe_capture_participant_screen()` for `SmallWebRTCTransport` in the runner
utils.

- Improved `SimpleTextAggregator` and `TTSService` to properly handle buffered
sentences at the end of LLM responses. Previously, when an LLM response ended,
any complete sentences remaining in the aggregator's buffer would be sent to
TTS as one large chunk. Now these sentences are flushed individually, providing
better interruption points. Added `flush_next_sentence()` method to
`SimpleTextAggregator` to extract buffered sentences without adding new text.

- Added Hindi support for Rime TTS services.

- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API
Expand Down
21 changes: 15 additions & 6 deletions src/pipecat/services/tts_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,17 +352,26 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
# pause to avoid audio overlapping.
await self._maybe_pause_frame_processing()

sentence = self._text_aggregator.text
# Flush any remaining complete sentences from the aggregator.
# This ensures all buffered sentences are sent to TTS individually.
sentence = await self._text_aggregator.flush_next_sentence()
includes_inter_frame_spaces = self._aggregated_text_includes_inter_frame_spaces
while sentence:
await self._push_tts_frames(
sentence, includes_inter_frame_spaces=includes_inter_frame_spaces
)
sentence = await self._text_aggregator.flush_next_sentence()

# Send any remaining incomplete text
remaining_text = self._text_aggregator.text
if remaining_text:
await self._push_tts_frames(
remaining_text, includes_inter_frame_spaces=includes_inter_frame_spaces
)

# Reset aggregator state
await self._text_aggregator.reset()
self._processing_text = False
self._aggregated_text_includes_inter_frame_spaces = False

await self._push_tts_frames(
sentence, includes_inter_frame_spaces=includes_inter_frame_spaces
)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)
Expand Down
51 changes: 39 additions & 12 deletions src/pipecat/utils/text/simple_text_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,57 @@ def text(self) -> str:
"""
return self._text

def _extract_next_sentence(self) -> Optional[str]:
"""Extract the next complete sentence from the buffer.

Returns:
The first complete sentence if a sentence boundary is found,
or None if the buffer is empty or contains only incomplete text.
"""
eos_end_marker = match_endofsentence(self._text)
if eos_end_marker:
# Extract the first complete sentence
sentence = self._text[:eos_end_marker]
# Remove it from buffer
self._text = self._text[eos_end_marker:]
return sentence

return None

async def aggregate(self, text: str) -> Optional[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this could be considered a breaking change for anyone using it. We’re keeping the API the same but changing the behavior, and now, to extract the full text, they would need to use it together with flush_next_sentence.

So I am not sure if it would be better if we introduce a new method instead. What do you think ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this changes any functionality. aggregate() works consistently between versions, I think.

Previously, it would:

  • Return either None if no sentences found or the first sentence if an end of sentence is found. The same is true today. It also uses the same logic.

I extracted the logic to avoid duplication. Otherwise, it could have remaining as is without change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you’re right. Somehow I got confused the first time I looked, but it makes sense now.

And in both versions we would still need to get any remaining text using self._text_aggregator.text at the end.

Cool, makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.

Because of Mattie's bot output work, I'm going to have to hold off on merging this. We can stitch in this change once she wraps her work up. Thanks for confirming the approach 🙏

"""Aggregate text and return completed sentences.
"""Aggregate text and return the first completed sentence.

Adds the new text to the buffer and checks for end-of-sentence markers.
When a sentence boundary is found, returns the completed sentence and
removes it from the buffer.
Adds the new text to the buffer and checks for sentence boundaries.
When a sentence boundary is found, returns the first completed sentence
and removes it from the buffer. Subsequent calls (even with empty strings)
will return additional complete sentences if they exist in the buffer.

This handles varying input patterns from different LLM providers:
- Word-by-word tokens (e.g., 'Hello', '!', ' I', ' am', ' Doug.')
- Chunks with one or more sentences (e.g., 'Hello! I am Doug. Nice to meet you!')

Args:
text: New text to add to the aggregation buffer.

Returns:
A complete sentence if an end-of-sentence marker is found,
The first complete sentence if a sentence boundary is found,
or None if more text is needed to complete a sentence.
"""
result: Optional[str] = None

self._text += text
return self._extract_next_sentence()

eos_end_marker = match_endofsentence(self._text)
if eos_end_marker:
result = self._text[:eos_end_marker]
self._text = self._text[eos_end_marker:]
async def flush_next_sentence(self) -> Optional[str]:
"""Retrieve the next complete sentence from the buffer without adding new text.

This method extracts the next complete sentence from the internal buffer
without requiring new input text. It's useful for draining multiple
complete sentences that were received in a single chunk.

return result
Returns:
The next complete sentence if one exists in the buffer, or None if
the buffer is empty or contains only incomplete text.
"""
return self._extract_next_sentence()

async def handle_interruption(self):
"""Handle interruptions by clearing the text buffer.
Expand Down
40 changes: 34 additions & 6 deletions tests/test_simple_text_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,39 @@ async def test_reset_aggregations(self):
await self.aggregator.reset()
assert self.aggregator.text == ""

async def test_simple_sentence(self):
assert await self.aggregator.aggregate("Hello ") == None
assert await self.aggregator.aggregate("Pipecat!") == "Hello Pipecat!"
async def test_word_by_word(self):
"""Test word-by-word token aggregation (e.g., OpenAI)."""
assert await self.aggregator.aggregate("Hello") == None
assert await self.aggregator.aggregate("!") == "Hello!"
assert await self.aggregator.aggregate(" I") == None
assert await self.aggregator.aggregate(" am") == None
assert await self.aggregator.aggregate(" Doug.") == " I am Doug."
assert self.aggregator.text == ""

async def test_multiple_sentences(self):
assert await self.aggregator.aggregate("Hello Pipecat! How are ") == "Hello Pipecat!"
assert await self.aggregator.aggregate("you?") == " How are you?"
async def test_chunks_with_partial_sentences(self):
"""Test chunks with partial sentences."""
assert await self.aggregator.aggregate("Hey!") == "Hey!"
assert await self.aggregator.aggregate(" Nice to meet you! So") == " Nice to meet you!"
assert self.aggregator.text == " So"
assert await self.aggregator.aggregate(" what") == None
assert await self.aggregator.aggregate("'d you like?") == " So what'd you like?"

async def test_multi_sentence_chunk(self):
"""Test chunks with multiple complete sentences."""
result = await self.aggregator.aggregate("Hello! I am Doug. Nice to meet you!")
assert result == "Hello!"
# Drain remaining sentences
assert await self.aggregator.flush_next_sentence() == " I am Doug."
assert await self.aggregator.flush_next_sentence() == " Nice to meet you!"
assert await self.aggregator.flush_next_sentence() == None
assert self.aggregator.text == ""

async def test_flush_next_sentence_with_incomplete(self):
"""Test flush_next_sentence with incomplete sentence in buffer."""
assert await self.aggregator.aggregate("Hello! I am") == "Hello!"
assert await self.aggregator.flush_next_sentence() == None
assert self.aggregator.text == " I am"

async def test_flush_next_sentence_empty_buffer(self):
"""Test flush_next_sentence with empty buffer."""
assert await self.aggregator.flush_next_sentence() == None