-
Notifications
You must be signed in to change notification settings - Fork 180
Fix: Implement Persistent WebSocket Connection for ElevenLabs TTS #828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Remove condition that skipped word count check for empty/undefined STT text - Apply minInterruptionWords filtering uniformly to all speech scenarios - Normalize undefined/null transcripts to empty string for consistent handling - Update onEndOfTurn to use same splitWords logic as onVADInferenceDone - Add comprehensive test suite with 23 test cases covering: * Empty and undefined transcript handling * Word count threshold logic * Punctuation and whitespace handling * Integration scenarios between both methods This ensures consistent interruption behavior regardless of transcript content, preventing unwanted interruptions from silence or very short utterances. All 23 tests pass successfully.
…nsive interruption detection tests
…nsive interruption detection tests - Refactored onVADInferenceDone() to normalize undefined/null text and apply word count check consistently - Refactored onEndOfTurn() to use splitWords() for consistent word splitting with onVADInferenceDone() - Added 23 comprehensive unit tests covering all scenarios: empty strings, undefined, short/long speech, thresholds - All tests passing (23/23) - Added SPDX headers to REFACTORING_SUMMARY.md for REUSE compliance
Removed tests for interruption threshold logic.
Removed tests for undefined and null handling normalization.
- Replace per-call WebSocket creation with single persistent connection - Implement WebSocketManager class for multi-stream API support - Add context-based routing for multiplexed concurrent requests - Implement async send/recv loops for efficient message handling - Add graceful connection draining and lifecycle management - Port Python _Connection class pattern to TypeScript This allows multiple synthesize() calls to reuse the same WebSocket connection, reducing latency and resource overhead.
|
simllll
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Devesh36, i was looking into this issue too.. your changes look good, except for two things I have seen so far:
- one quite bad "race" condition in the recvLoop.. because on each processing the message handler is removed and then added again, it happens that messages are lost in between. a simpler and easier approach is just to keep the listener and check the return value of the handler..if it's e.g. true (for done), then resolve and end... I can add it on your changes if you want to?
- EOS is just a flag, it should be smthg like a deferred promise.otherwise we have unnecssary delays for checking the flag, and we cannot really push errors through it.
besides this, it's not based on the auto_mode elevnlabs PR that landed in the main already (#820), this would need som ebit
more work to handle sentencetokenizer correctly as well. (e.g. flush on each sentence)
| const messageHandler = (data: RawData) => { | ||
| if (!resolved) { | ||
| resolved = true; | ||
| ws.removeListener('message', messageHandler); | ||
| ws.removeListener('close', closeHandler); | ||
| ws.removeListener('error', errorHandler); | ||
| resolve(data); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we loose messages, while processing one, we do not have a listener.. after one "on" handler, we hsould keep it.. it even makes the coder easier to read in my opinion
instead of resolveing the promise here, just call a method with the logic..e.g.
// return true if done
const myHandler = (data: RawData): boolean => {
try {
const data = JSON.parse(msg.toString()) as Record<string, unknown>;
const contextId = (data.contextId || data.context_id) as string | undefined;
if (!contextId || !this.contextData.has(contextId)) {
return true;
}
const context = this.contextData.get(contextId)!;
if (data.error) {
this.logger.error({ contextId, error: data.error }, 'ElevenLabs error');
this.contextData.delete(contextId);
return true;
}
if (data.audio) {
const audioBuffer = Buffer.from(data.audio as string, 'base64');
const audioArray = new Int8Array(audioBuffer);
context.audioBuffer.push(audioArray);
}
if (data.isFinal) {
context.eos = true;
this.activeContexts.delete(contextId);
if (!this.isCurrent && this.activeContexts.size === 0) {
this.logger.debug('No active contexts, shutting down');
return true;
}
}
} catch (parseError) {
this.logger.warn({ parseError }, 'Failed to parse message');
}
}| const messageHandler = (data: RawData) => { | |
| if (!resolved) { | |
| resolved = true; | |
| ws.removeListener('message', messageHandler); | |
| ws.removeListener('close', closeHandler); | |
| ws.removeListener('error', errorHandler); | |
| resolve(data); | |
| } | |
| }; | |
| const messageHandler = (data: RawData) => { | |
| const done = myHandler(data); | |
| if (done && !resolved) { | |
| resolved = true; | |
| ws.removeListener('message', messageHandler); | |
| ws.removeListener('close', closeHandler); | |
| ws.removeListener('error', errorHandler); | |
| resolve(data); | |
| } | |
| }; |
|
|
||
| interface StreamContext { | ||
| contextId: string; | ||
| eos: boolean; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| eos: boolean; | |
| eos: { | |
| promise: Promise<void>; | |
| resolve: () => void; | |
| reject: (err: unknown) => void; | |
| }; |
| if (!this.contextData.has(contextId)) { | ||
| this.contextData.set(contextId, { | ||
| contextId, | ||
| eos: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| eos: false, | |
| eos: newDeferredPromise<void>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the newDeferredPromise helper is:
export const newDeferredPromise = <T>(): {
promise: Promise<any>;
resolve: (value: T | PromiseLike<T>) => void;
reject: (reason: any) => void;
} => {
let fResolve: undefined | ((value: T | PromiseLike<T>) => void);
let fReject: undefined | ((reason: any) => void);
const P = new Promise<T>((resolve, reject) => {
fResolve = resolve;
fReject = reject;
});
if (!fResolve || !fReject) {
throw new Error('Promise init failed');
}
return {
promise: P,
resolve: fResolve,
reject: fReject,
};
};| isContextEOS(contextId: string): boolean { | ||
| return this.contextData.get(contextId)?.eos ?? false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| isContextEOS(contextId: string): boolean { | |
| return this.contextData.get(contextId)?.eos ?? false; | |
| } | |
| getContextEOS(contextId: string): Promise<void> | undefined { | |
| return this.contextData.get(contextId)?.eos.promise; | |
| } |
| while (!this.#connection!.isContextEOS(this.#contextId)) { | ||
| await new Promise((resolve) => setTimeout(resolve, 10)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| while (!this.#connection!.isContextEOS(this.#contextId)) { | |
| await new Promise((resolve) => setTimeout(resolve, 10)); | |
| } | |
| await this.#connection!.getContextEOS(this.#contextId); |
| } | ||
|
|
||
| if (data.isFinal) { | ||
| context.eos = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| context.eos = true; | |
| context.eos.resolve(); |
| } catch (error) { | ||
| this.logger.error({ error }, 'Recv loop error'); | ||
| for (const context of this.contextData.values()) { | ||
| context.eos = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| context.eos = true; | |
| context.eos.reject(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a small thing, because the " " confused me at the ws.send part... i moved it to the place where it actually is described (at least in the comment ;-)).
and one message is better than two anyways :p
ps.: ts types ppbl missing
| text: text + ' ', // must always end with a space | ||
| // ...(flushOnChunk && { flush: true }), | ||
| }); | ||
| this.#connection!.sendContent(this.#contextId, text, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| this.#connection!.sendContent(this.#contextId, text, false); | |
| this.#connection.sendContent(this.#contextId, text + " ", false); |
| const textPkt = { | ||
| text: msg.text + ' ', | ||
| context_id: msg.contextId, | ||
| }; | ||
|
|
||
| this.ws.send(JSON.stringify(textPkt)); | ||
|
|
||
| if (msg.flush) { | ||
| const flushPkt = { | ||
| text: '', | ||
| context_id: msg.contextId, | ||
| }; | ||
| this.ws.send(JSON.stringify(flushPkt)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| const textPkt = { | |
| text: msg.text + ' ', | |
| context_id: msg.contextId, | |
| }; | |
| this.ws.send(JSON.stringify(textPkt)); | |
| if (msg.flush) { | |
| const flushPkt = { | |
| text: '', | |
| context_id: msg.contextId, | |
| }; | |
| this.ws.send(JSON.stringify(flushPkt)); | |
| } | |
| const pkt = { | |
| text: msg.text, | |
| context_id: msg.contextId, | |
| }; | |
| if (msg.flush) { | |
| pkt.flush = true; | |
| } | |
| this.ws.send(JSON.stringify(pkt)); |
|
Yes @simllll i'm still working on the issue ;) |
(y) the main issue are the lost messages, this was kinda hard to debug for me, as the isFinal was sometimes there and sometimes not.. ;) happy to help. let me know if you need another review or similar support |
| } | ||
| }; | ||
|
|
||
| ws.on('message', messageHandler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding the "missing events": ws.once() could also solve it I guess
Description
Implements a persistent WebSocket connection for ElevenLabs TTS integration, replacing the previous pattern of creating a new WebSocket per synthesis request. This directly ports the efficient connection management from the Python implementation (
livekit-plugins-elevenlabs), enabling multiple concurrent TTS requests to share a single WebSocket connection via the multi-stream API.Changes Made
WebSocketManagerclass (~200 lines): Manages single persistent WebSocket per TTS instance with concurrent send/recv loops, context-based routing, and proper lifecycle managementTTSclass: AddedgetCurrentConnection()method to manage persistent connection creation and reuse across multiplestream()callsSynthesizeStreamclass: Integrates with connection manager, registers contexts, and routes messages through shared WebSockettts.tsgrew from 340 lines to 697 lines due to WebSocketManager implementation (non-functional bloat avoided through focused architecture)Pre-Review Checklist
Testing
plugins/elevenlabs/src/tts.test.ts)stream()calls verify connection reuse behaviorAdditional Notes
Architecture Pattern: Directly ported from Python's
_Connectionclass, ensuring consistency across implementations and proven reliability.Performance Impact: Reduced latency and resource usage:
Backward Compatibility: Fully compatible - consumers see no API changes; persistent connection is transparent.
Related Issue: Resolves #824
/fix #824
Note to reviewers: Please ensure the pre-review checklist is completed before starting your review.