Skip to content

Commit 9a58cd3

Browse files
authored
Support half-duplex mode for Openai Realtime API (#814)
1 parent b73db75 commit 9a58cd3

File tree

8 files changed

+315
-62
lines changed

8 files changed

+315
-62
lines changed

.changeset/sour-mugs-lay.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@livekit/agents-plugin-google': patch
3+
'@livekit/agents-plugin-openai': patch
4+
'@livekit/agents': patch
5+
---
6+
7+
Support openai half-duplex mode (audio in -> text out -> custom TTS model)

agents/src/llm/realtime.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export interface MessageGeneration {
1919
messageId: string;
2020
textStream: ReadableStream<string>;
2121
audioStream: ReadableStream<AudioFrame>;
22+
modalities?: Promise<('text' | 'audio')[]>;
2223
}
2324

2425
export interface GenerationCreatedEvent {
@@ -40,6 +41,7 @@ export interface RealtimeCapabilities {
4041
turnDetection: boolean;
4142
userTranscription: boolean;
4243
autoToolReplyGeneration: boolean;
44+
audioOutput: boolean;
4345
}
4446

4547
export interface InputTranscriptionCompleted {
@@ -121,7 +123,12 @@ export abstract class RealtimeSession extends EventEmitter {
121123
/**
122124
* Truncate the message at the given audio end time
123125
*/
124-
abstract truncate(options: { messageId: string; audioEndMs: number }): Promise<void>;
126+
abstract truncate(options: {
127+
messageId: string;
128+
audioEndMs: number;
129+
modalities?: ('text' | 'audio')[];
130+
audioTranscript?: string;
131+
}): Promise<void>;
125132

126133
async close(): Promise<void> {
127134
this._mainTask.cancel();

agents/src/voice/agent_activity.ts

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ export class AgentActivity implements RecognitionHooks {
235235
} catch (error) {
236236
this.logger.error(error, 'failed to update the tools');
237237
}
238+
239+
if (!this.llm.capabilities.audioOutput && !this.tts && this.agentSession.output.audio) {
240+
this.logger.error(
241+
'audio output is enabled but RealtimeModel has no audio modality ' +
242+
'and no TTS is set. Either enable audio modality in the RealtimeModel ' +
243+
'or set a TTS model.',
244+
);
245+
}
238246
} else if (this.llm instanceof LLM) {
239247
try {
240248
updateInstructions({
@@ -1633,7 +1641,7 @@ export class AgentActivity implements RecognitionHooks {
16331641

16341642
const readMessages = async (
16351643
abortController: AbortController,
1636-
outputs: Array<[string, _TextOut | null, _AudioOut | null]>,
1644+
outputs: Array<[string, _TextOut | null, _AudioOut | null, ('text' | 'audio')[] | undefined]>,
16371645
) => {
16381646
replyAbortController.signal.addEventListener('abort', () => abortController.abort(), {
16391647
once: true,
@@ -1648,7 +1656,25 @@ export class AgentActivity implements RecognitionHooks {
16481656
);
16491657
break;
16501658
}
1651-
const trNodeResult = await this.agent.transcriptionNode(msg.textStream, modelSettings);
1659+
1660+
const msgModalities = msg.modalities ? await msg.modalities : undefined;
1661+
let ttsTextInput: ReadableStream<string> | null = null;
1662+
let trTextInput: ReadableStream<string>;
1663+
1664+
if (msgModalities && !msgModalities.includes('audio') && this.tts) {
1665+
if (this.llm instanceof RealtimeModel && this.llm.capabilities.audioOutput) {
1666+
this.logger.warn(
1667+
'text response received from realtime API, falling back to use a TTS model.',
1668+
);
1669+
}
1670+
const [_ttsTextInput, _trTextInput] = msg.textStream.tee();
1671+
ttsTextInput = _ttsTextInput;
1672+
trTextInput = _trTextInput;
1673+
} else {
1674+
trTextInput = msg.textStream;
1675+
}
1676+
1677+
const trNodeResult = await this.agent.transcriptionNode(trTextInput, modelSettings);
16521678
let textOut: _TextOut | null = null;
16531679
if (trNodeResult) {
16541680
const [textForwardTask, _textOut] = performTextForwarding(
@@ -1659,30 +1685,51 @@ export class AgentActivity implements RecognitionHooks {
16591685
forwardTasks.push(textForwardTask);
16601686
textOut = _textOut;
16611687
}
1688+
16621689
let audioOut: _AudioOut | null = null;
16631690
if (audioOutput) {
1664-
const realtimeAudio = await this.agent.realtimeAudioOutputNode(
1665-
msg.audioStream,
1666-
modelSettings,
1667-
);
1668-
if (realtimeAudio) {
1691+
let realtimeAudioResult: ReadableStream<AudioFrame> | null = null;
1692+
1693+
if (ttsTextInput) {
1694+
const [ttsTask, ttsStream] = performTTSInference(
1695+
(...args) => this.agent.ttsNode(...args),
1696+
ttsTextInput,
1697+
modelSettings,
1698+
abortController,
1699+
);
1700+
tasks.push(ttsTask);
1701+
realtimeAudioResult = ttsStream;
1702+
} else if (msgModalities && msgModalities.includes('audio')) {
1703+
realtimeAudioResult = await this.agent.realtimeAudioOutputNode(
1704+
msg.audioStream,
1705+
modelSettings,
1706+
);
1707+
} else if (this.llm instanceof RealtimeModel && this.llm.capabilities.audioOutput) {
1708+
this.logger.error(
1709+
'Text message received from Realtime API with audio modality. ' +
1710+
'This usually happens when text chat context is synced to the API. ' +
1711+
'Try to add a TTS model as fallback or use text modality with TTS instead.',
1712+
);
1713+
} else {
1714+
this.logger.warn(
1715+
'audio output is enabled but neither tts nor realtime audio is available',
1716+
);
1717+
}
1718+
1719+
if (realtimeAudioResult) {
16691720
const [forwardTask, _audioOut] = performAudioForwarding(
1670-
realtimeAudio,
1721+
realtimeAudioResult,
16711722
audioOutput,
16721723
abortController,
16731724
);
16741725
forwardTasks.push(forwardTask);
16751726
audioOut = _audioOut;
16761727
audioOut.firstFrameFut.await.finally(onFirstFrame);
1677-
} else {
1678-
this.logger.warn(
1679-
'audio output is enabled but neither tts nor realtime audio is available',
1680-
);
16811728
}
16821729
} else if (textOut) {
16831730
textOut.firstTextFut.await.finally(onFirstFrame);
16841731
}
1685-
outputs.push([msg.messageId, textOut, audioOut]);
1732+
outputs.push([msg.messageId, textOut, audioOut, msgModalities]);
16861733
}
16871734
await waitFor(forwardTasks);
16881735
} catch (error) {
@@ -1692,7 +1739,9 @@ export class AgentActivity implements RecognitionHooks {
16921739
}
16931740
};
16941741

1695-
const messageOutputs: Array<[string, _TextOut | null, _AudioOut | null]> = [];
1742+
const messageOutputs: Array<
1743+
[string, _TextOut | null, _AudioOut | null, ('text' | 'audio')[] | undefined]
1744+
> = [];
16961745
const tasks = [
16971746
Task.from(
16981747
(controller) => readMessages(controller, messageOutputs),
@@ -1771,7 +1820,7 @@ export class AgentActivity implements RecognitionHooks {
17711820

17721821
if (messageOutputs.length > 0) {
17731822
// there should be only one message
1774-
const [msgId, textOut, audioOut] = messageOutputs[0]!;
1823+
const [msgId, textOut, audioOut, msgModalities] = messageOutputs[0]!;
17751824
let forwardedText = textOut?.text || '';
17761825

17771826
if (audioOutput) {
@@ -1796,6 +1845,8 @@ export class AgentActivity implements RecognitionHooks {
17961845
this.realtimeSession.truncate({
17971846
messageId: msgId,
17981847
audioEndMs: Math.floor(playbackPosition),
1848+
modalities: msgModalities,
1849+
audioTranscript: forwardedText,
17991850
});
18001851
}
18011852

@@ -1826,7 +1877,7 @@ export class AgentActivity implements RecognitionHooks {
18261877

18271878
if (messageOutputs.length > 0) {
18281879
// there should be only one message
1829-
const [msgId, textOut, _] = messageOutputs[0]!;
1880+
const [msgId, textOut, _, __] = messageOutputs[0]!;
18301881
const message = ChatMessage.create({
18311882
role: 'assistant',
18321883
content: textOut?.text || '',

examples/src/realtime_with_tts.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
import {
5+
type JobContext,
6+
type JobProcess,
7+
ServerOptions,
8+
cli,
9+
defineAgent,
10+
llm,
11+
log,
12+
voice,
13+
} from '@livekit/agents';
14+
import * as cartesia from '@livekit/agents-plugin-cartesia';
15+
import * as openai from '@livekit/agents-plugin-openai';
16+
import * as silero from '@livekit/agents-plugin-silero';
17+
import { BackgroundVoiceCancellation } from '@livekit/noise-cancellation-node';
18+
import { fileURLToPath } from 'node:url';
19+
import { z } from 'zod';
20+
21+
export default defineAgent({
22+
prewarm: async (proc: JobProcess) => {
23+
proc.userData.vad = await silero.VAD.load();
24+
},
25+
entry: async (ctx: JobContext) => {
26+
const logger = log();
27+
28+
const getWeather = llm.tool({
29+
description: 'Called when the user asks about the weather.',
30+
parameters: z.object({
31+
location: z.string().describe('The location to get the weather for'),
32+
}),
33+
execute: async ({ location }) => {
34+
logger.info(`getting weather for ${location}`);
35+
return `The weather in ${location} is sunny, and the temperature is 20 degrees Celsius.`;
36+
},
37+
});
38+
39+
const agent = new voice.Agent({
40+
instructions: 'You are a helpful assistant. Always speak in English.',
41+
tools: {
42+
getWeather,
43+
},
44+
});
45+
46+
const session = new voice.AgentSession({
47+
// Use RealtimeModel with text-only modality + separate TTS
48+
llm: new openai.realtime.RealtimeModel({
49+
modalities: ['text'],
50+
}),
51+
tts: new cartesia.TTS({
52+
model: 'sonic-3',
53+
}),
54+
voiceOptions: {
55+
maxToolSteps: 5,
56+
},
57+
});
58+
59+
await session.start({
60+
agent,
61+
room: ctx.room,
62+
inputOptions: {
63+
noiseCancellation: BackgroundVoiceCancellation(),
64+
},
65+
outputOptions: {
66+
transcriptionEnabled: true,
67+
audioEnabled: true, // You can also disable audio output to use text modality only
68+
},
69+
});
70+
71+
session.say('Hello, how can I help you today?');
72+
73+
session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => {
74+
logger.debug('metrics_collected', ev);
75+
});
76+
},
77+
});
78+
79+
cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) }));

plugins/cartesia/src/tts.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ export class SynthesizeStream extends tts.SynthesizeStream {
249249
ws.on('message', (data) => resolve(data));
250250
ws.on('close', (code, reason) => {
251251
if (!closing) {
252-
this.#logger.error(`WebSocket closed with code ${code}: ${reason}`);
252+
this.#logger.debug(`WebSocket closed with code ${code}: ${reason}`);
253253
}
254254

255255
clearTTSChunkTimeout();
@@ -277,7 +277,8 @@ export class SynthesizeStream extends tts.SynthesizeStream {
277277
// can continue to process the stream without been blocked by the stuck node
278278
clearTTSChunkTimeout();
279279
timeout = setTimeout(() => {
280-
this.#logger.error(
280+
// cartesia chunk timeout quite often, so we make it a debug log
281+
this.#logger.debug(
281282
`Cartesia WebSocket STT chunk stream timeout after ${this.#opts.chunkTimeout}ms`,
282283
);
283284
ws.close();

plugins/google/src/beta/realtime/realtime_api.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ export class RealtimeModel extends llm.RealtimeModel {
290290
turnDetection: serverTurnDetection,
291291
userTranscription: inputAudioTranscription !== null,
292292
autoToolReplyGeneration: true,
293+
audioOutput: options.modalities?.includes(Modality.AUDIO) ?? true,
293294
});
294295

295296
// Environment variable fallbacks
@@ -600,7 +601,7 @@ export class RealtimeSession extends llm.RealtimeSession {
600601
this.hasReceivedAudioInput = true;
601602

602603
for (const f of this.resampleAudio(frame)) {
603-
for (const nf of this.bstream.write(f.data.buffer)) {
604+
for (const nf of this.bstream.write(f.data.buffer as ArrayBuffer)) {
604605
const realtimeInput: types.LiveClientRealtimeInput = {
605606
mediaChunks: [
606607
{

plugins/openai/src/realtime/api_proto.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ export interface SessionResource {
190190
id: string;
191191
object: 'realtime.session';
192192
model: string;
193-
modalities: ['text', 'audio'] | ['text']; // default: ["text", "audio"]
193+
modalities: Modality[]; // default: ["text", "audio"]
194194
instructions: string;
195195
voice: Voice; // default: "alloy"
196196
input_audio_format: AudioFormat; // default: "pcm16"
@@ -267,7 +267,7 @@ export interface SessionUpdateEvent extends BaseClientEvent {
267267
type: 'session.update';
268268
session: Partial<{
269269
model: Model;
270-
modalities: ['text', 'audio'] | ['text'];
270+
modalities: Modality[];
271271
instructions: string;
272272
voice: Voice;
273273
input_audio_format: AudioFormat;
@@ -350,7 +350,7 @@ export interface ConversationItemDeleteEvent extends BaseClientEvent {
350350
export interface ResponseCreateEvent extends BaseClientEvent {
351351
type: 'response.create';
352352
response?: Partial<{
353-
modalities: ['text', 'audio'] | ['text'];
353+
modalities: Modality[];
354354
instructions: string;
355355
voice: Voice;
356356
output_audio_format: AudioFormat;
@@ -511,6 +511,7 @@ export interface ResponseContentPartDoneEvent extends BaseServerEvent {
511511
export interface ResponseTextDeltaEvent extends BaseServerEvent {
512512
type: 'response.text.delta';
513513
response_id: string;
514+
item_id: string;
514515
output_index: number;
515516
content_index: number;
516517
delta: string;
@@ -519,6 +520,7 @@ export interface ResponseTextDeltaEvent extends BaseServerEvent {
519520
export interface ResponseTextDoneEvent extends BaseServerEvent {
520521
type: 'response.text.done';
521522
response_id: string;
523+
item_id: string;
522524
output_index: number;
523525
content_index: number;
524526
text: string;

0 commit comments

Comments
 (0)