-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Adding support for new bot-output RTVI Message: #2899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for the new bot-output RTVI message, which consolidates bot output communication by including metadata about whether text was spoken and how it was aggregated. The changes expand the text aggregation framework to return both aggregated text and its type, deprecate the bot-transcription event in favor of bot-output, and enhance TTSTextFrame to carry spoken status and aggregation type metadata.
Key changes:
- Enhanced
TTSTextFramewithspokenandaggregated_bymetadata fields - Modified text aggregators to return
Aggregationobjects containing text and type information - Introduced new
RTVIBotOutputMessageto represent bot output with metadata about speech and aggregation
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/pipecat/frames/frames.py | Added aggregated_by and spoken fields to TTSTextFrame |
| src/pipecat/utils/text/base_text_aggregator.py | Introduced Aggregation dataclass and updated aggregator interface to return Aggregation objects |
| src/pipecat/utils/text/simple_text_aggregator.py | Updated to return Aggregation objects instead of strings |
| src/pipecat/utils/text/skip_tags_aggregator.py | Updated to return Aggregation objects instead of strings |
| src/pipecat/utils/text/pattern_pair_aggregator.py | Extended PatternMatch from Aggregation and updated to handle aggregation types |
| src/pipecat/services/tts_service.py | Modified to handle Aggregation objects and pass metadata to TTSTextFrame |
| src/pipecat/services/openai_realtime_beta/openai.py | Updated TTSTextFrame creation with new parameters |
| src/pipecat/services/openai/realtime/llm.py | Updated TTSTextFrame creation with new parameters |
| src/pipecat/services/google/gemini_live/llm.py | Updated TTSTextFrame creation with new parameters |
| src/pipecat/services/aws/nova_sonic/llm.py | Updated TTSTextFrame creation with new parameters |
| src/pipecat/processors/frameworks/rtvi.py | Introduced RTVIBotOutputMessage and updated LLM text handling logic |
| tests/test_transcript_processor.py | Updated test cases to include aggregated_by parameter in TTSTextFrame |
Comments suppressed due to low confidence (1)
src/pipecat/utils/text/pattern_pair_aggregator.py:1
- Corrected spelling of 'message' to 'messages'.
#
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/pipecat/frames/frames.py
Outdated
| class TTSTextFrame(TextFrame): | ||
| """Text frame generated by Text-to-Speech services.""" | ||
|
|
||
| aggregated_by: Literal["sentence", "word"] | str |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The aggregated_by field should have a default value to maintain backward compatibility with existing code that creates TTSTextFrame without this parameter. Consider adding a default like aggregated_by: Literal[\"sentence\", \"word\"] | str = \"word\".
| aggregated_by: Literal["sentence", "word"] | str | |
| aggregated_by: Literal["sentence", "word"] | str = "word" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. not sure what the default should be, though. would love any of the reviewers to weigh in here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I vote for "word"
TTSTextFrames of the most common TTS services tend to have a word in .text value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per discussion in maintainers meeting, i think we decided it was ok NOT to have a default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think that's fine. We want to force the developer specify what type of aggregation is this frame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of Literal["sentence", "word"] should we have an enum?
class AggregationType(Enum):
SENTENCE = "sentence"
WORD = "word"
| return [start_index, pattern_info["type"]] | ||
|
|
||
| return False | ||
| return None, None |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation indicates Optional[Tuple[int, str]], but this returns a tuple of two None values which doesn't match the expected single None for the Optional type. Change to return (None, None) or update the return type to Optional[Tuple[Optional[int], Optional[str]]].
| return None, None | |
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh. when I returned just None, my handling of the return value struggled and changing to None,None fixed it. i'll have to up my python game i guess 🤔
src/pipecat/services/tts_service.py
Outdated
| await self.push_frame(TTSTextFrame(text, spoken=True, aggregated_by=aggregated_by)) | ||
| await self.process_generator(self.run_tts(text)) |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The frame is pushed before TTS processing begins, but if TTS processing fails, the frame will have already been marked as spoken=True. Consider pushing this frame after successful TTS processing or handling potential failures appropriately.
| await self.push_frame(TTSTextFrame(text, spoken=True, aggregated_by=aggregated_by)) | |
| await self.process_generator(self.run_tts(text)) | |
| try: | |
| await self.process_generator(self.run_tts(text)) | |
| except Exception as e: | |
| logger.error(f"TTS processing failed: {e}") | |
| # Optionally, push an error frame here if desired: | |
| # await self.push_frame(ErrorFrame(str(e))) | |
| else: | |
| await self.push_frame(TTSTextFrame(text, spoken=True, aggregated_by=aggregated_by)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. HOWEVER. timing-wise, the goal is to get this out before the tts starts spewing the words as they are spoken. For this reason, I believe we want to go ahead and push the frame first. If the run_tts fails, we have bigger problems than our "spoken=True" flag not being right.
a778db7 to
276fd1b
Compare
9a11460 to
89c7277
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| start_pattern: str, | ||
| end_pattern: str, | ||
| type: str, | ||
| action: MatchAction = MatchAction.REMOVE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing remove_match is a breaking change. Maybe you can keep it, mark it as deprecated, warn when using it, and map the value to MatchAction.REMOVE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we can't go with this approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh is it because it's not strictly a keyword argument, it's positional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me know what you think about this approach to backwards compatibility (introducing a new method and calling it just add_pattern)
NOTE: There are still breaking changes in this class because the handlers and the aggregate() method return a new type. I don't know any way around those.
3d79718 to
a0da0a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """ | ||
|
|
||
| spoken: bool = True # Indicates if the text has been spoken by TTS | ||
| aggregated_by: Optional[Literal["word", "sentence"] | str] = None |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The union syntax Literal['word', 'sentence'] | str is redundant since str already includes all possible literal values. Consider simplifying to just Optional[str] = None.
| aggregated_by: Optional[Literal["word", "sentence"] | str] = None | |
| aggregated_by: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while it may be redundant, it makes it clear that there's a set of options that can definitely be expected.
a0da0a5 to
3b711c5
Compare
| self._llm_text_aggregator: BaseTextAggregator = ( | ||
| self._params.llm_text_aggregator or SimpleTextAggregator() | ||
| ) | ||
| self._skip_tts: Optional[bool] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can drop the Optional here. Optional is more commonly used for __init__ or function args.
| self._skip_tts: Optional[bool] = None | |
| self._skip_tts: bool = None |
| self._started += 1 | ||
| if self._skip_tts is None: | ||
| self._skip_tts = frame.skip_tts | ||
| await self._maybe_push_llm_aggregation(frame) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? The LLMFullResponseStartFrame doesn't contain text to aggregate. Perhaps the LLMFullResponseStartFrame is for initializing state for aggregation only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know what, you're right. I thought I needed it for the scenario where there is dangling text that might need to be sent from a previous llm output. But sense we always send whatever is left on LLMFullResponseEndFrame, there shouldn't ever be dangling text. I'll leave in the initialization of self._skip_tts though (and add a comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, to confirm what i said above. If you get an interrupt, do you still get an LLMFullResponseEndFrame?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess even then, the next text frame will have the same affect of flipping and sending any dangling text... all that to say: you're right. we don't have to maybe push here :)
| ): | ||
| aggregate = None | ||
| should_reset_aggregator = False | ||
| if self._skip_tts and not frame.skip_tts: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of these if/else conditions, you only want one to run at any given time, right? And the ordering would be to execute with the first one encountered? If so, this might require a slightly different logic to ensure that's the case. If not, then, it's fine as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure i follow, but there is definitely only one path for any given frame here 🤔
| """Handle aggregated LLM text output frames.""" | ||
| isTTS = isinstance(frame, TTSTextFrame) | ||
| message = RTVIBotOutputMessage( | ||
| data=RTVIBotOutputMessageData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add a bot_output_enabled flag to turn this RTVI event on and off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, probably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually... i don't know. this is more of a replacement for bot-transcription which didn't have a flag 🤔 -- what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh actually, it looks like it was behind the bot_tts_enabled flag (weird). and everything else has a flag. i'll add it.
| # TODO: Remove all this logic when we fully deprecate bot-transcription messages. | ||
| self._bot_transcription += frame.text | ||
| if match_endofsentence(self._bot_transcription): | ||
| await self._push_bot_transcription() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can remove the corresponding _push_bot_transcription function now that it's part of _handle_llm_text_frame. Not sure if it's used elsewhere, but it doesn't seem to be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
src/pipecat/services/tts_service.py
Outdated
| # Store if we were processing text or not so we can set it back. | ||
| processing_text = self._processing_text | ||
| await self._push_tts_frames(frame.text) | ||
| await self._push_tts_frames(frame.text, should_speak=True, aggregated_by="word") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is word the correct aggregated_by field here? TTSSpeakFrame is usually provided as a complete phrase, e.g.:
TTSSpeakFrame("Hello there, how are you?")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah! makes sense. not sure why i (or maybe copilot) put "word". Changing to "sentence"
src/pipecat/services/tts_service.py
Outdated
| # before the audio so downstream processors know what text | ||
| # is being spoken. Here, we assume this flag is used when the TTS | ||
| # provider supports word timestamps and the TTSTextFrames will be | ||
| # generated in the word_task_handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify:
| # generated in the word_task_handler. | |
| # generated in the words_task_handler in the WordTTSService subclass. |
src/pipecat/services/tts_service.py
Outdated
|
|
||
| if text: | ||
| if not self._push_text_frames: | ||
| # If we are not pushing text frames, we send a TTSTextFrame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tricky. In this case, the TTSService is pushing AggregatedLLMTextFrame, which means that it's pushing both AggregatedLLMTextFrame and TTSTextFrame. But, the AggregatedLLMTextFrame is ignored in the LLMAssistantAggregator, meaning that two TextFrame subclasses are pushed, only one results in assistant messages being added to the context.
And, the AggregatedLLMTextFrame is pushed by the TTSService so that the RTVIObserver can see the frame in order to emit the messages to the client.
This is pretty complicated; do I have it right? We should capture this somewhere in a comment, so that it's documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm questioning my understanding of what this is doing. Can you explain? Why are we pushing the AggregatedLLMTextFrame in this case`?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. i've updated the comment here. let me know if that's good enough or still confusing.
| OpenAILLMContextFrame, | ||
| ) | ||
| from pipecat.processors.frame_processor import FrameDirection, FrameProcessor | ||
| from pipecat.utils.text.base_text_aggregator import BaseTextAggregator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does llm_response.py need the same changes from llm_response_universal.py to be backwards compatible? Or do we have a way to ensure that only the newest aggregator can use these features?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i follow...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is the last comment I have not addressed. happy to chat about it so i can understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per an offline discussion... no. we are only going to support the feature of being able to generate bot-output when skip_tts is True in the new universal context.
src/pipecat/services/tts_service.py
Outdated
|
|
||
| if self._push_text_frames: | ||
| # We send the original text after the audio. This way, if we are | ||
| if not should_speak: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to confirm my understanding. You can reach this case if you're using an aggregator that aggregates text that should not be spoken right? This is different than skip_tts being set in a TextFrame. A clarifying comment could be helpful for our future selves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, this essentially maps to text that was aggregated into a a type listed in the new _skip_aggregator_types list. but i think i can do better than a comment...
| return Aggregation(self._text, "sentence") | ||
|
|
||
| def add_pattern_pair( | ||
| self, pattern_id: str, start_pattern: str, end_pattern: str, remove_match: bool = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. Instead, you should:
- Leave the
remove_matcharg in theadd_pattern_pairfunction, marking it as deprecated - Add a warning about the deprecation
- Use the
remove_matchto set theREMOVEMatchAction
Is that doable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. and no. the issue is that type doesn't have a default, so it has to be listed in the args first. And that in and of itself is breaking.
That leads to a follow-up question of whether "type" is required or if we can just use the existing "pattern_id", but i sure like the name "type" better for how it's used...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per an offline chat, there is not great way to retain backwards compatibility without introducing ugly API. So instead, I leaned into the breakage and not only kept remove_match as a removed field, but replaced pattern_id with the new type -- In practice, these essentially are the same thing; a way to identify the pattern type. The key difference of thinking of it less as a unique id vs identifying the conceptual way the pattern will aggregate text (which should still be unique)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, type confuses me a bit, but pattern_id makes sense. In the docstring description of it, it's even described as "Identifier for this pattern pair".
Seems like if the rename doesn't make things obviously clearer, it'd be preferable to default to breaking fewer things?
markbackman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good! Added a few comments and asked questions. This is a pretty complex addition, so we'll want to make sure there are good docstrings and comments throughout explaining how it works.
filipi87
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still have a couple of small nits, like maybe grouping the transformers from each TTS provider inside a class, but I know we first need to think about a good alternative for Rime in this case.
So I am leaving my approval here, because I know how hard it is to keep a PR like this up to date, and we can implement those improvements in a follow up PR if we decide they are worth it.
Amazing job! This will be a great improvement to Pipecat. 👏🚀
1. TTSTextFrames now include metadata about whether the text was spoken or not along with a type string to describe what the text represents: ex. "sentence", "word", "custom aggregation" 2. Expanded how aggregators work so that the aggregate method returns aggregated text along with the type of aggregation used to create it 3. Deprecated the RTVI bot-transcription event in lieu of... 4. Introduced support for a new bot-output event. This event is meant to be the one stop shop for communicating what the bot actually "says". It is based off TTSTextFrames to communicate both sentence by sentence (or whatever aggregation is used) as well as word by word. In addition, it will include LLMTextFrames, aggregated by sentence when tts is turned off (i.e. skip_tts is true). Resolves pipecat-ai/pipecat-client-web#158
1. Fixed pattern_pair_aggregator to support various ways of handling pattern matches (remove, keep and just trigger a callback, or aggregate 2. Fixed ivr_navigator use of pattern_pair_aggregator 3. Test fixes -- Tests now pass
…e, indicating a spoken frame vs other aggregated, non-spoken frames
This allows any given TextFrame to be marked in a way such that it does not get added to the context. Specifically, this fixes a problem with the new AggregatedTextFrames where we need to send LLM text both in an aggregated form as well as word-by-word but avoid duplicating the text in the context.
…s LLMTextFrames when tts_skip is on
1. Added support for turning off bot-output messages with the bot_output_enabled flag 2. Cleaned up logic and comments around TTSService:_push_tts_frames to hopefully make it easier to understand 3. Other minor cleanup
… to simplify the API
This new processor wraps an aggregator that can be overridden for the purposes
of customizing how the llm output gets categorized and handled in the pipeline.
Along with this, we are deprecating the ability to override the default
aggregator in the TTS to encourage use of the LLMTextProcessor in cases where
custome aggregation is needed.
This PR also:
- Introduces TTSService.transform_aggregation_type():
This function provides the ability to provide callbacks to the TTS to
transform text based on its aggregated type prior to sending the text to the
underlying TTS service. This makes it possible to do things like introduce
TTS-specific tags for spelling or emotion or change the pronunciation of
something on the fly.
- Introduces to the RTVIObserver:
- new init field skip_aggregator_types: A way to provide a list of aggregation
types that should not be included in bot-output (or tts-text) messages
- transform_aggregation_type(): Same as with TTSService, this allows you
to provide a callback to transform text being sent as bot-output before
it gets sent.
4254954 to
23e4e29
Compare
|
closing in favor of #3107 |
|
closing in favor of cleaned up and rebased #3107 |
Resolves pipecat-ai/pipecat-client-web#158
To help with reviewing this PR, I've written up some of the use cases we were trying to solve with these changes and how you would do so after these changes go in:
Use-Cases:
Identify code blocks, keeping them in the context, but not speaking them and treating them as a unique type so that they can be formatted or treated accordingly.
This can be accomplished by providing a custom aggregator to the TTS service that looks for text inside a set of delimeters, along with updating the LLM's system prompt to specify that code should be demarcated with these same delimeters. The TTS service should be configured to
skip any aggregate found with the given code type.
Identify speaker changes to change the tts voice. Speaker tags should be removed, but the speaker should be left in the context and not spoken.
This is essentially the same as above, but you would add a callback for when the speaker aggregate is found:
Pipeline can switch between voice and text modes and no matter the mode, the context and the resulting bot-output is updated accordingly, flowing seamlessly and represents the full picture. The aggregation of the output should also be customizable in both modes, so that, for example, if you have customized the tts aggregation to treat a code block separately, code blocks can also be treated separately when the tts is skipped.
This is accomplished the same as above but with the addition of the new ability to set a custom aggregator on the assistant context. The assistant context now looks for LLMTextFrames with
skip_ttsset to True and aggregates those separately. By default it uses a SimpleTextAggregator to segregate sentence-by-sentence. Bot code should look the same as above but providing a copy of the same aggregator to the assistant context:The pipeline does not have a TTS. The context and resulting bot-output should be generated from the raw llm output and the aggregation of that output should be customizable.
Maybe hacky, but you can solve this by simply setting the tts_skip flag on the llm service to true. This will cause all LLMTextFrames to be marked as tts_skip=True and thus be aggregated by the assistant context's llm_text_aggregator. See above for how to customize that aggregator.