diff --git a/changelog/4009.added.md b/changelog/4009.added.md new file mode 100644 index 0000000000..9ebbec7dd9 --- /dev/null +++ b/changelog/4009.added.md @@ -0,0 +1 @@ +- Added `PerplexityLLMAdapter` that automatically transforms conversation messages to satisfy Perplexity's stricter API constraints (strict role alternation, no non-initial system messages, last message must be user/tool). Previously, certain conversation histories could cause Perplexity API errors that didn't occur with OpenAI (`PerplexityLLMService` subclasses `OpenAILLMService` since Perplexity uses an OpenAI-compatible API). diff --git a/src/pipecat/adapters/services/perplexity_adapter.py b/src/pipecat/adapters/services/perplexity_adapter.py new file mode 100644 index 0000000000..a8fbe3c189 --- /dev/null +++ b/src/pipecat/adapters/services/perplexity_adapter.py @@ -0,0 +1,152 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Perplexity LLM adapter for Pipecat. + +Perplexity's API uses an OpenAI-compatible interface but enforces stricter +constraints on conversation history structure: + +1. **Strict role alternation** — Messages must alternate between "user"/"tool" + and "assistant" roles. Consecutive messages with the same role (e.g. two + "user" messages in a row) are rejected with: + ``"messages must be an alternating sequence of user/tool and assistant messages"`` + +2. **No non-initial system messages** — "system" messages are only allowed at + the start of the conversation. A system message after a non-system message + causes: + ``"only the initial message can have the system role"`` + +3. **Last message must be user/tool** — The final message in the conversation + must have role "user" or "tool". A trailing "assistant" message causes: + ``"the last message must have the user or tool role"`` + +This adapter transforms the message list to satisfy all three constraints before +the messages are sent to Perplexity's API. +""" + +import copy +from typing import List + +from openai.types.chat import ChatCompletionMessageParam + +from pipecat.adapters.services.open_ai_adapter import OpenAILLMAdapter, OpenAILLMInvocationParams +from pipecat.processors.aggregators.llm_context import LLMContext + + +class PerplexityLLMAdapter(OpenAILLMAdapter): + """Adapter that transforms messages to satisfy Perplexity's API constraints. + + Perplexity's API is stricter than OpenAI about message structure. This + adapter extends ``OpenAILLMAdapter`` and applies message transformations + to ensure compliance with Perplexity's constraints (role alternation, + no non-initial system messages, last message must be user/tool). + + The transformations are applied in ``get_llm_invocation_params`` after the + parent adapter extracts messages from the LLM context, and before + ``build_chat_completion_params`` prepends ``system_instruction``. + """ + + def get_llm_invocation_params(self, context: LLMContext) -> OpenAILLMInvocationParams: + """Get OpenAI-compatible invocation parameters with Perplexity message fixes applied. + + Args: + context: The LLM context containing messages, tools, etc. + + Returns: + Dictionary of parameters for Perplexity's ChatCompletion API, with + messages transformed to satisfy Perplexity's constraints. + """ + params = super().get_llm_invocation_params(context) + params["messages"] = self._transform_messages(list(params["messages"])) + return params + + def _transform_messages( + self, messages: List[ChatCompletionMessageParam] + ) -> List[ChatCompletionMessageParam]: + """Transform messages to satisfy Perplexity's API constraints. + + Applies three transformation steps in order: + + 1. **Convert non-initial system messages to user** — Any system message + after the initial system message block is converted to role "user", + since Perplexity rejects system messages after a non-system message. + + 2. **Merge consecutive same-role messages** — After the above + conversions, adjacent messages with the same role are merged using + list-of-dicts content format. This ensures strict role alternation + (e.g. a converted system→user message adjacent to an existing user + message gets merged). + + 3. **Remove trailing assistant messages** — If the last message is + "assistant", remove it. OpenAI appears to silently ignore trailing + assistant messages server-side, so removing them preserves equivalent + behavior while satisfying Perplexity's "last message must be + user/tool" constraint. + + Note: we intentionally do *not* convert a trailing system message to + "user". That would make the transformation unstable across calls — + Perplexity appears to have statefulness/caching within a conversation, + so a message that was sent as "user" in one call but becomes "system" + in the next (once more messages are appended) causes errors. If the + context consists entirely of system messages, the Perplexity API call + will fail, but that mistake will be caught right away. + + Args: + messages: List of message dicts with "role" and "content" keys. + + Returns: + Transformed list of message dicts satisfying Perplexity's constraints. + """ + if not messages: + return messages + + messages = copy.deepcopy(messages) + + # Step 1: Convert non-initial system messages to "user". + # Perplexity allows system messages at the start, but rejects them + # after any non-system message. + in_initial_system_block = True + for i in range(len(messages)): + if messages[i].get("role") == "system": + if not in_initial_system_block: + messages[i]["role"] = "user" + else: + in_initial_system_block = False + + # Step 2: Merge consecutive same-role messages. + # After system→user conversions above, we may have adjacent same-role + # messages that violate Perplexity's strict alternation requirement. + # Skip consecutive system messages at the start — Perplexity allows those. + i = 0 + while i < len(messages) - 1: + current = messages[i] + next_msg = messages[i + 1] + if current["role"] == next_msg["role"] == "system": + # Perplexity allows multiple initial system messages, don't merge + i += 1 + elif current["role"] == next_msg["role"]: + # Convert string content to list-of-dicts format for merging + if isinstance(current.get("content"), str): + current["content"] = [{"type": "text", "text": current["content"]}] + if isinstance(next_msg.get("content"), str): + next_msg["content"] = [{"type": "text", "text": next_msg["content"]}] + # Merge content from next message into current + if isinstance(current.get("content"), list) and isinstance( + next_msg.get("content"), list + ): + current["content"].extend(next_msg["content"]) + messages.pop(i + 1) + else: + i += 1 + + # Step 3: Remove trailing assistant messages. + # Perplexity requires the last message to be "user" or "tool". + # OpenAI appears to silently ignore trailing assistant messages + # server-side, so removing them preserves equivalent behavior. + while messages and messages[-1].get("role") == "assistant": + messages.pop() + + return messages diff --git a/src/pipecat/services/cerebras/llm.py b/src/pipecat/services/cerebras/llm.py index 7c31a68573..dfb62baf82 100644 --- a/src/pipecat/services/cerebras/llm.py +++ b/src/pipecat/services/cerebras/llm.py @@ -117,6 +117,10 @@ def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationP # Prepend system instruction if set if self._settings.system_instruction: messages = params.get("messages", []) + if messages and messages[0].get("role") == "system": + logger.warning( + f"{self}: Both system_instruction and an initial system message in context are set. This may be unintended." + ) params["messages"] = [ {"role": "system", "content": self._settings.system_instruction} ] + messages diff --git a/src/pipecat/services/fireworks/llm.py b/src/pipecat/services/fireworks/llm.py index bf141fac19..5efa607939 100644 --- a/src/pipecat/services/fireworks/llm.py +++ b/src/pipecat/services/fireworks/llm.py @@ -118,6 +118,10 @@ def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationP # Prepend system instruction if set if self._settings.system_instruction: messages = params.get("messages", []) + if messages and messages[0].get("role") == "system": + logger.warning( + f"{self}: Both system_instruction and an initial system message in context are set. This may be unintended." + ) params["messages"] = [ {"role": "system", "content": self._settings.system_instruction} ] + messages diff --git a/src/pipecat/services/mistral/llm.py b/src/pipecat/services/mistral/llm.py index 3ee1b26230..063dac3aa7 100644 --- a/src/pipecat/services/mistral/llm.py +++ b/src/pipecat/services/mistral/llm.py @@ -236,6 +236,10 @@ def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationP # Prepend system instruction if set if self._settings.system_instruction: messages = params.get("messages", []) + if messages and messages[0].get("role") == "system": + logger.warning( + f"{self}: Both system_instruction and an initial system message in context are set. This may be unintended." + ) params["messages"] = [ {"role": "system", "content": self._settings.system_instruction} ] + messages diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index 41b26bd206..eb8ce3cc66 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -332,8 +332,7 @@ def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationP messages = params.get("messages", []) if messages and messages[0].get("role") == "system": logger.warning( - f"{self}: Both system_instruction and a system message in context are set." - " Using system_instruction." + f"{self}: Both system_instruction and an initial system message in context are set. This may be unintended." ) params["messages"] = [ {"role": "system", "content": self._settings.system_instruction} @@ -381,8 +380,7 @@ async def run_inference( messages = params.get("messages", []) if messages and messages[0].get("role") == "system": logger.warning( - f"{self}: Both system_instruction and a system message in context are set." - " Using system_instruction." + f"{self}: Both system_instruction and an initial system message in context are set. This may be unintended." ) params["messages"] = [{"role": "system", "content": system_instruction}] + messages diff --git a/src/pipecat/services/perplexity/llm.py b/src/pipecat/services/perplexity/llm.py index 6c2ceba355..9ea323c5d5 100644 --- a/src/pipecat/services/perplexity/llm.py +++ b/src/pipecat/services/perplexity/llm.py @@ -14,7 +14,10 @@ from dataclasses import dataclass from typing import Optional +from loguru import logger + from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams +from pipecat.adapters.services.perplexity_adapter import PerplexityLLMAdapter from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext @@ -37,6 +40,8 @@ class PerplexityLLMService(OpenAILLMService): in token usage reporting between Perplexity (incremental) and OpenAI (final summary). """ + adapter_class = PerplexityLLMAdapter + Settings = PerplexityLLMSettings _settings: Settings @@ -119,6 +124,10 @@ def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationP # Prepend system instruction if set if self._settings.system_instruction: messages = params.get("messages", []) + if messages and messages[0].get("role") == "system": + logger.warning( + f"{self}: Both system_instruction and an initial system message in context are set. This may be unintended." + ) params["messages"] = [ {"role": "system", "content": self._settings.system_instruction} ] + messages diff --git a/src/pipecat/services/sambanova/llm.py b/src/pipecat/services/sambanova/llm.py index 3c7d767371..710a22db21 100644 --- a/src/pipecat/services/sambanova/llm.py +++ b/src/pipecat/services/sambanova/llm.py @@ -134,6 +134,10 @@ def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationP # Prepend system instruction if set if self._settings.system_instruction: messages = params.get("messages", []) + if messages and messages[0].get("role") == "system": + logger.warning( + f"{self}: Both system_instruction and an initial system message in context are set. This may be unintended." + ) params["messages"] = [ {"role": "system", "content": self._settings.system_instruction} ] + messages diff --git a/tests/test_get_llm_invocation_params.py b/tests/test_get_llm_invocation_params.py index c93275b67b..9cfeb89339 100644 --- a/tests/test_get_llm_invocation_params.py +++ b/tests/test_get_llm_invocation_params.py @@ -48,6 +48,7 @@ from pipecat.adapters.services.bedrock_adapter import AWSBedrockLLMAdapter from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter from pipecat.adapters.services.open_ai_adapter import OpenAILLMAdapter +from pipecat.adapters.services.perplexity_adapter import PerplexityLLMAdapter from pipecat.processors.aggregators.llm_context import ( LLMContext, LLMStandardMessage, @@ -992,5 +993,222 @@ def test_single_system_message_handling(self): self.assertEqual(len(params["messages"]), 0) +class TestPerplexityGetLLMInvocationParams(unittest.TestCase): + def setUp(self) -> None: + """Sets up a common adapter instance for all tests.""" + self.adapter = PerplexityLLMAdapter() + + def test_standard_messages_pass_through(self): + """Test that a valid [user, assistant, user] sequence passes through unchanged.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + {"role": "user", "content": "How are you?"}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["messages"]), 3) + self.assertEqual(params["messages"][0]["role"], "user") + self.assertEqual(params["messages"][0]["content"], "Hello") + self.assertEqual(params["messages"][1]["role"], "assistant") + self.assertEqual(params["messages"][1]["content"], "Hi there!") + self.assertEqual(params["messages"][2]["role"], "user") + self.assertEqual(params["messages"][2]["content"], "How are you?") + + def test_initial_system_message_preserved(self): + """Test that a valid [system, user, assistant, user] sequence passes through unchanged.""" + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi!"}, + {"role": "user", "content": "Bye"}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["messages"]), 4) + self.assertEqual(params["messages"][0]["role"], "system") + self.assertEqual(params["messages"][0]["content"], "You are a helpful assistant.") + self.assertEqual(params["messages"][1]["role"], "user") + self.assertEqual(params["messages"][2]["role"], "assistant") + self.assertEqual(params["messages"][3]["role"], "user") + + def test_consecutive_same_role_messages_merged(self): + """Test that consecutive user messages are merged into list-of-dicts content.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "First message"}, + {"role": "user", "content": "Second message"}, + {"role": "assistant", "content": "Response"}, + {"role": "user", "content": "Third message"}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["messages"]), 3) + + # First message should be merged users + merged = params["messages"][0] + self.assertEqual(merged["role"], "user") + self.assertIsInstance(merged["content"], list) + self.assertEqual(len(merged["content"]), 2) + self.assertEqual(merged["content"][0]["type"], "text") + self.assertEqual(merged["content"][0]["text"], "First message") + self.assertEqual(merged["content"][1]["type"], "text") + self.assertEqual(merged["content"][1]["text"], "Second message") + + self.assertEqual(params["messages"][1]["role"], "assistant") + self.assertEqual(params["messages"][2]["role"], "user") + + def test_non_initial_system_converted_to_user(self): + """Test that non-initial system messages are converted to user and merged with adjacent user.""" + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi!"}, + {"role": "system", "content": "Be concise."}, + {"role": "user", "content": "Tell me about Python."}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + # system(initial), user, assistant, merged(system→user + user) + self.assertEqual(len(params["messages"]), 4) + self.assertEqual(params["messages"][0]["role"], "system") + self.assertEqual(params["messages"][1]["role"], "user") + self.assertEqual(params["messages"][2]["role"], "assistant") + + # The converted system→user and the following user should be merged + merged = params["messages"][3] + self.assertEqual(merged["role"], "user") + self.assertIsInstance(merged["content"], list) + self.assertEqual(len(merged["content"]), 2) + self.assertEqual(merged["content"][0]["text"], "Be concise.") + self.assertEqual(merged["content"][1]["text"], "Tell me about Python.") + + def test_multiple_system_messages_at_start_preserved(self): + """Test that multiple consecutive system messages at start pass through unchanged.""" + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "system", "content": "Always be polite."}, + {"role": "user", "content": "Hello"}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["messages"]), 3) + self.assertEqual(params["messages"][0]["role"], "system") + self.assertEqual(params["messages"][0]["content"], "You are a helpful assistant.") + self.assertEqual(params["messages"][1]["role"], "system") + self.assertEqual(params["messages"][1]["content"], "Always be polite.") + self.assertEqual(params["messages"][2]["role"], "user") + self.assertEqual(params["messages"][2]["content"], "Hello") + + def test_trailing_assistant_removed(self): + """Test that a trailing assistant message is removed.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["messages"]), 1) + self.assertEqual(params["messages"][0]["role"], "user") + self.assertEqual(params["messages"][0]["content"], "Hello") + + def test_only_system_messages_preserved(self): + """Test that system-only contexts are left unchanged (no system→user conversion). + + We intentionally do not convert trailing system messages to "user" + because that would make the transformation unstable across calls — + Perplexity has statefulness within a conversation, so a message that + was "user" in one call but becomes "system" in the next causes errors. + """ + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are a helpful assistant."}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["messages"]), 1) + self.assertEqual(params["messages"][0]["role"], "system") + + def test_system_exposed_after_trailing_assistant_removed(self): + """Test that a system message exposed by trailing assistant removal stays system. + + It's important that initial system messages are never converted to + "user", because Perplexity has statefulness within a conversation — if + a message was sent as "system" in one call and then becomes "user" in a + later call (after more messages are appended), the API rejects it. + """ + messages: list[LLMStandardMessage] = [ + {"role": "system", "content": "You are helpful."}, + {"role": "assistant", "content": "Sure thing."}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + # Trailing assistant removed → [system], system stays as-is + self.assertEqual(len(params["messages"]), 1) + self.assertEqual(params["messages"][0]["role"], "system") + self.assertEqual(params["messages"][0]["content"], "You are helpful.") + + def test_consecutive_assistants_merged_then_trailing_removed(self): + """Test that consecutive assistant messages are merged, then trailing assistant is removed.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "First response"}, + {"role": "assistant", "content": "Second response"}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + # After merging assistants we get [user, assistant(merged)], then trailing + # assistant is removed, leaving just [user] + self.assertEqual(len(params["messages"]), 1) + self.assertEqual(params["messages"][0]["role"], "user") + self.assertEqual(params["messages"][0]["content"], "Hello") + + def test_tool_messages_preserved(self): + """Test that tool messages pass through without modification.""" + messages: list[LLMStandardMessage] = [ + {"role": "user", "content": "What's the weather?"}, + { + "role": "assistant", + "content": "Let me check.", + "tool_calls": [{"id": "1", "function": {"name": "get_weather", "arguments": "{}"}}], + }, + {"role": "tool", "content": "Sunny, 72F", "tool_call_id": "1"}, + {"role": "user", "content": "Thanks!"}, + ] + + context = LLMContext(messages=messages) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(len(params["messages"]), 4) + self.assertEqual(params["messages"][0]["role"], "user") + self.assertEqual(params["messages"][1]["role"], "assistant") + self.assertEqual(params["messages"][2]["role"], "tool") + self.assertEqual(params["messages"][2]["content"], "Sunny, 72F") + self.assertEqual(params["messages"][3]["role"], "user") + + def test_empty_messages(self): + """Test that empty messages list returns empty.""" + context = LLMContext(messages=[]) + params = self.adapter.get_llm_invocation_params(context) + + self.assertEqual(params["messages"], []) + + if __name__ == "__main__": unittest.main()