Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fe9aa33
Adding support for new bot-output RTVI Message:
mattieruth Oct 21, 2025
5c86355
test fixes
mattieruth Oct 21, 2025
69945c5
Various fixes:
mattieruth Oct 23, 2025
e6dc1a5
Introduce AggregatedLLMTextFrame to allow a separation of TTSTextFram…
mattieruth Oct 28, 2025
ccca6e8
Make the PatternPair action an Enum
mattieruth Oct 28, 2025
8a90dec
codepilot review fixes
mattieruth Oct 29, 2025
bc6a9ca
Add append_to_context boolean field to TextFrames
mattieruth Oct 30, 2025
29417ba
Move aggregation logic when skip_tts is on to the assistant aggregator
mattieruth Oct 30, 2025
0d2c528
Support customization over the way the assistant aggregator aggregate…
mattieruth Oct 30, 2025
5dfe20b
Update Changelog
mattieruth Oct 31, 2025
82b9c4f
various PR Review fixes:
mattieruth Nov 4, 2025
e9de9da
Update PatternPairAggregator patterns to replace pattern_id with type…
mattieruth Nov 4, 2025
ed808a9
Fix new test and str version of PatternMatch
mattieruth Nov 5, 2025
124f147
CHANGELOG improvements
mattieruth Nov 5, 2025
8ab0c92
Rename AggregatedLLMTextFrame to AggregatedTextFrame and made built-i…
mattieruth Nov 7, 2025
9a3902a
Introducing a new processor: LLMTextProcessor
mattieruth Nov 12, 2025
5ca04ad
CHANGELOG updates
mattieruth Nov 12, 2025
4c69877
PR Feedback
mattieruth Nov 13, 2025
3f269f9
Add backwards compatibility for add_pattern_pair
mattieruth Nov 13, 2025
71b87fd
add transformers to initialization args
mattieruth Nov 13, 2025
713b488
Final PR Feedback changes
mattieruth Nov 14, 2025
23e4e29
CHANGELOG fixes
mattieruth Nov 14, 2025
e8640d8
test fix now that we send an aggregated text frame for non word-by-wo…
mattieruth Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions CHANGELOG.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to rebase with the latest changes from main so that we can more easily analyze only what you are including here, just to make sure we’re not missing anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i rebased yesterday just before pushing and fixing the CHANGELOG. Is it already way out of date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see. there is stuff mixed in below that's not mine. i must have botched the rebase. the conflicts in this file were a bear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, it is showing a couple of things that I believe are not related with this PR. For example:

### Added

- Added an asyncio event `finished_event` field to `InterruptionFrame`. When
  assigned, the asyncio event will be set when the frame reaches the end of the
  pipeline. You can use this field to know when an interruption made it all the
  way to the end of a pipeline. The field has been also added to
  `InterruptionTaskFrame`.

- Added support for loading external observers. You can now register custom
  pipeline observers by setting the `PIPECAT_OBSERVER_FILES` environment
  variable. This variable should contain a colon-separated list of Python files
  (e.g. `export PIPECAT_OBSERVER_FILES="observer1.py:observer2.py:..."`). Each
  file must define a function with the following signature:

But those things I believe are already more than one week old.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. i think the rebase went bad after the release. i just re-rebased and cleaned it up. I've got to get this in because these rebases are rough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Done

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,82 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
services that subclass `TTSService` can indicate whether the text in the
`TTSTextFrame`s they push already contain any necessary inter-frame spaces.

- Introduced new `AggregatedTextFrame` type to support representing a best effort of
the perceived llm output whether or not it is processed by the TTS. This new frame
type includes the field `aggregated_by` to represent the conceptual format by which
the given text is aggregated. `TTSTextFrame`s now inherit from `AggregatedTextFrame`.
With this inheritance, an observer can watch for `AggregatedTextFrame`s to accumlate
the perceived output and determine whether or not the text was spoken based on if that
frame is also a `TTSTextFrame`. (See bullet below on new `bot-output` which takes
advantage of this)

- Introduced `LLMTextProcessor`: A new processor meant to allow customization for how
LLMTextFrames should be aggregated and considered. It's purpose is to turn
`LLMTextFrame`s into `AggregatedTextFrame`s. By default, a TTSService will still
aggregate `LLMTextFrame`s by sentence for the service to consume. However, if you
wish to override how the llm text is aggregated, you should no longer override the
TTS's internal aggregator, but instead, insert this processor between your LLM and
TTS in the pipeline.

- New `bot-output` RTVI message to represent what the bot actually "says".
- The `RTVIObserver` now emits `bot-output` messages based off the new `AggregatedTextFrame`s
(`bot-tts-text` and `bot-llm-text` are still supported and generated, but `bot-transcript` is
now deprecated in lieu of this new, more thorough, message).
- The new `RTVIBotOutputMessage` includes the fields:
- `spoken`: A boolean indicating whether the text was spoken by TTS
- `aggregated_by`: A string representing how the text was aggregated ("sentence", "word",
Copy link
Contributor

@kompfner kompfner Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the literal "custom" or "<my custom aggregation>", like in Aggregation.type? I know I can read on to get the answer, but just making a note here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. It's the latter. So maybe here use "my custom aggregation", like you do elsewhere to indicate that the string is the developer-provided aggregation type string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Done

"my custom aggregation")
- Introduced new fields to `RTVIObserver` to support the new `bot-output` messaging:
- `bot_output_enabled`: Defaults to True. Set to false to disable bot-output messages.
- `skip_aggregator_types`: Defaults to `None`. Set to a list of strings that match
aggregation types that should not be included in bot-output messages. (Ex. `credit_card`)
- Introduced new methods, `add_text_transformer()` and `remove_text_transformer()`, to `RTVIObserver` to support providing (and subsequently removing)
callbacks for various types of aggregations (or all aggregations with `*`) that can modify the
text before being sent as a `bot-output` or `tts-text` message. (Think obscuring the credit card
or inserting extra detail the client might want that the context doesn't need.)

- Updated the base aggregator type:
- Introduced a new `Aggregation` dataclass to represent both the aggregated `text` and
a string identifying the `type` of aggregation (ex. "sentence", "word", "my custom
aggregation")
- **BREAKING**: `BaseTextAggregator.text` now returns an `Aggregation` (instead of `str`).
To update: `aggregated_text = myAggregator.text` -> `aggregated_text = myAggregator.text.text`
- **BREAKING**: `BaseTextAggregator.aggregate()` now returns `Optional[Aggregation]`
(instead of `Optional[str]`). To update:
```
aggregation = myAggregator.aggregate(text)
if (aggregation):
print(f"successfully aggregated text: {aggregation.text}") // instead of {aggregation}
```
- `SimpleTextAggregator`, `SkipTagsAggregator`, `PatternPairAggregator` updated to
produce/consume `Aggregation` objects.

- Augmented the `PatternPairAggregator`:
- Introduced a new, preferred version of `add_pattern` to support a new option for treating a
match as a separate aggregation returned from `aggregate()`. This replaces the now
deprecated `add_pattern_pair` method and you provide a `MatchAction` in lieu of the `remove_match` field.
- `MatchAction` enum: `REMOVE`, `KEEP`, `AGGREGATE`, allowing customization for how
a match should be handled.
- `REMOVE`: The text along with its delimiters will be removed from the streaming text.
Sentence aggregation will continue on as if this text did not exist.
- `KEEP`: The delimiters will be removed, but the content between them will be kept.
Sentence aggregation will continue on with the internal text included.
- `AGGREGATE`: The delimiters will be removed and the content between will be treated
as a separate aggregation. Any text before the start of the pattern will be
returned early, whether or not a complete sentence was found. Then the pattern
will be returned. Then the aggregation will continue on sentence matching after
the closing delimiter is found. The content between the delimiters is not
aggregated by sentence. It is aggregated as one single block of text.
- `PatternMatch` now extends `Aggregation` and provides richer info to handlers.
- **BREAKING**: The `PatternMatch` type returned to handlers registered via `on_pattern_match`
has been updated to subclass from the new `Aggregation` type, which means that `content`
has been replaced with `text` and `pattern_id` has been replaced with `type`:
```
async dev on_match_tag(match: PatternMatch):
pattern = match.type # instead of match.pattern_id
text = match.text # instead of match.content
```

### Changed

- Updated all STT and TTS services to use consistent error handling pattern with
Expand All @@ -33,11 +109,42 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Updated language mappings for the Google and Gemini TTS services to match
official documentation.

- `TextFrame` new field `append_to_context` used to indicate if the encompassing
text should be added to the LLM context (by the LLM assistant aggregator). It
defaults to `True`.

- TTS flow respects aggregation metadata
- `TTSService` accepts a new `skip_aggregator_types` to avoid speaking certain aggregation types
(now determined/returned by the aggregator)
- TTS services push `AggregatedTextFrame` in addition to `TTSTextFrame`s when either an
aggregation occurs that should not be spoken or when the TTS service supports word-by-word
timestamping. In the latter case, the `TTSService` preliminarily generates an
`AggregatedTextFrame`, aggregated by sentence to generate the full sentence content as early
as possible.
- Introduced a new methods, `add_text_transformer()` and `remove_text_transformer()`:
These functions introduce the ability to provide (and subsequently remove) callbacks to the TTS to transform text based on
its aggregated type prior to sending the text to the underlying TTS service. This makes it
possible to do things like introduce TTS-specific tags for spelling or emotion or change the
pronunciation of something on the fly.

### Deprecated

- The `api_key` parameter in `GeminiTTSService` is deprecated. Use
`credentials` or `credentials_path` instead for Google Cloud authentication.

- The RTVI `bot-transcription` event is deprecated in favor of the new `bot-output`
message which is the canonical representation of bot output (spoken or not). The code
still emits a transcription message for backwards compatibility while transition occurs.

- The TTS constructor field, `text_aggregator` is deprecated in favor of the new
`LLMTextProcessor`. TTSServices still have an internal aggregator for support of default
behavior, but if you want to override the aggregation behavior, you should use the new
processor.

- Deprecated `add_pattern_pair` in the `PatternPairAggregator` which takes a `pattern_id`
and `remove_match` field in favor of the new `add_pattern` method which takes a `type` and an
`action`

### Fixed

- Fixed subtle issue of assistant context messages ending up with double spaces
Expand Down
16 changes: 10 additions & 6 deletions examples/foundational/35-pattern-pair-voice-switching.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.utils.text.pattern_pair_aggregator import PatternMatch, PatternPairAggregator
from pipecat.utils.text.pattern_pair_aggregator import (
MatchAction,
PatternMatch,
PatternPairAggregator,
)

load_dotenv(override=True)

Expand Down Expand Up @@ -106,16 +110,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
pattern_aggregator = PatternPairAggregator()

# Add pattern for voice switching
pattern_aggregator.add_pattern_pair(
pattern_id="voice_tag",
pattern_aggregator.add_pattern(
type="voice",
start_pattern="<voice>",
end_pattern="</voice>",
remove_match=True,
action=MatchAction.REMOVE, # Remove tags from final text
)

# Register handler for voice switching
async def on_voice_tag(match: PatternMatch):
voice_name = match.content.strip().lower()
voice_name = match.text.strip().lower()
if voice_name in VOICE_IDS:
# First flush any existing audio to finish the current context
await tts.flush_audio()
Expand All @@ -125,7 +129,7 @@ async def on_voice_tag(match: PatternMatch):
else:
logger.warning(f"Unknown voice: {voice_name}")

pattern_aggregator.on_pattern_match("voice_tag", on_voice_tag)
pattern_aggregator.on_pattern_match("voice", on_voice_tag)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

Expand Down
18 changes: 11 additions & 7 deletions src/pipecat/extensions/ivr/ivr_navigator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import LLMService
from pipecat.utils.text.pattern_pair_aggregator import PatternMatch, PatternPairAggregator
from pipecat.utils.text.pattern_pair_aggregator import (
MatchAction,
PatternMatch,
PatternPairAggregator,
)


class IVRStatus(Enum):
Expand Down Expand Up @@ -114,15 +118,15 @@ def _get_conversation_history(self) -> List[dict]:
def _setup_xml_patterns(self):
"""Set up XML pattern detection and handlers."""
# Register DTMF pattern
self._aggregator.add_pattern_pair("dtmf", "<dtmf>", "</dtmf>", remove_match=True)
self._aggregator.add_pattern("dtmf", "<dtmf>", "</dtmf>", action=MatchAction.REMOVE)
self._aggregator.on_pattern_match("dtmf", self._handle_dtmf_action)

# Register mode pattern
self._aggregator.add_pattern_pair("mode", "<mode>", "</mode>", remove_match=True)
self._aggregator.add_pattern("mode", "<mode>", "</mode>", action=MatchAction.REMOVE)
self._aggregator.on_pattern_match("mode", self._handle_mode_action)

# Register IVR pattern
self._aggregator.add_pattern_pair("ivr", "<ivr>", "</ivr>", remove_match=True)
self._aggregator.add_pattern("ivr", "<ivr>", "</ivr>", action=MatchAction.REMOVE)
self._aggregator.on_pattern_match("ivr", self._handle_ivr_action)

async def process_frame(self, frame: Frame, direction: FrameDirection):
Expand Down Expand Up @@ -159,7 +163,7 @@ async def _handle_dtmf_action(self, match: PatternMatch):
Args:
match: The pattern match containing DTMF content.
"""
value = match.content
value = match.text
logger.debug(f"DTMF detected: {value}")

try:
Expand All @@ -180,7 +184,7 @@ async def _handle_ivr_action(self, match: PatternMatch):
Args:
match: The pattern match containing IVR status content.
"""
status = match.content
status = match.text
logger.trace(f"IVR status detected: {status}")

# Convert string to enum, with validation
Expand Down Expand Up @@ -211,7 +215,7 @@ async def _handle_mode_action(self, match: PatternMatch):
Args:
match: The pattern match containing mode content.
"""
mode = match.content
mode = match.text
logger.debug(f"Mode detected: {mode}")
if mode == "conversation":
await self._handle_conversation()
Expand Down
30 changes: 29 additions & 1 deletion src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

from dataclasses import dataclass, field
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -337,11 +338,14 @@ class TextFrame(DataFrame):
# mandatory fields of theirs to have defaults to preserve
# non-default-before-default argument order)
includes_inter_frame_spaces: bool = field(init=False)
# Whether this text frame should be appended to the LLM context.
append_to_context: bool = field(init=False)

def __post_init__(self):
super().__post_init__()
self.skip_tts = False
self.includes_inter_frame_spaces = False
self.append_to_context = True

def __str__(self):
pts = format_pts(self.pts)
Expand All @@ -355,8 +359,32 @@ class LLMTextFrame(TextFrame):
pass


class AggregationType(str, Enum):
"""Built-in aggregation strings."""

SENTENCE = "sentence"
WORD = "word"

def __str__(self):
return self.value


@dataclass
class AggregatedTextFrame(TextFrame):
"""Text frame representing an aggregation of TextFrames.

This frame contains multiple TextFrames aggregated together for processing
or output along with a field to indicate how they are aggregated.

Parameters:
aggregated_by: Method used to aggregate the text frames.
"""

aggregated_by: AggregationType | str


@dataclass
class TTSTextFrame(TextFrame):
class TTSTextFrame(AggregatedTextFrame):
"""Text frame generated by Text-to-Speech services."""

pass
Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/processors/aggregators/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
await self.push_aggregation()

async def _handle_text(self, frame: TextFrame):
if not self._started:
if not self._started or not frame.append_to_context:
return

if self._params.expect_stripped_words:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
await self.push_aggregation()

async def _handle_text(self, frame: TextFrame):
if not self._started:
if not self._started or not frame.append_to_context:
return

# Make sure we really have text (spaces count, too!)
Expand Down
Loading