diff --git a/demo/client.ts b/demo/client.ts index f1626c4ffe..4f179b48e8 100644 --- a/demo/client.ts +++ b/demo/client.ts @@ -215,7 +215,6 @@ function initOptions(term: TerminalType): void { 'handler', 'screenKeys', 'termName', - 'useFlowControl', // Complex option 'theme' ]; diff --git a/demo/server.js b/demo/server.js index 3b170813b9..07349e9709 100644 --- a/demo/server.js +++ b/demo/server.js @@ -9,13 +9,16 @@ var pty = require('node-pty'); */ const USE_BINARY_UTF8 = false; +// pty --> websocket buffering +const MAX_SEND_INTERVAL = 5; +const MAX_CHUNK_SIZE = 16384; + function startServer() { var app = express(); expressWs(app); - var terminals = {}, - logs = {}; + var terminals = {}; app.use('/xterm.css', express.static(__dirname + '/../css/xterm.css')); app.get('/logo.png', (req, res) => res.sendFile(__dirname + '/logo.png')); @@ -45,15 +48,14 @@ function startServer() { rows: rows || 24, cwd: process.env.PWD, env: process.env, - encoding: USE_BINARY_UTF8 ? null : 'utf8' + encoding: USE_BINARY_UTF8 ? null : 'utf8', + handleFlowControl: true, + flowControlPause: '\x1b^pause\x1b\\', + flowControlResume: '\x1b^resume\x1b\\' }); console.log('Created terminal with PID: ' + term.pid); terminals[term.pid] = term; - logs[term.pid] = ''; - term.on('data', function(data) { - logs[term.pid] += data; - }); res.send(term.pid.toString()); res.end(); }); @@ -72,17 +74,34 @@ function startServer() { app.ws('/terminals/:pid', function (ws, req) { var term = terminals[parseInt(req.params.pid)]; console.log('Connected to terminal ' + term.pid); - ws.send(logs[term.pid]); + + const _send = data => { + // handle only 'open' websocket state + if (ws.readyState === 1) { + setTimeout(() => ws.send(data), 200); + } + } + + const ACK_WATERMARK = 131072;//524288; + const FLOW_CONTROL_ACK = '\x1b^ack\x1b\\'; // PM ack ST + const MAX_ACK_DIFF = 7; + let ack_expected = 0; + let sent = 0; // string message buffering - function buffer(socket, timeout) { + function buffer(timeout, limit) { let s = ''; let sender = null; return (data) => { s += data; - if (!sender) { + if (s.length > limit) { + clearTimeout(sender); + _send(s); + s = ''; + sender = null; + } else if (!sender) { sender = setTimeout(() => { - socket.send(s); + _send(s); s = ''; sender = null; }, timeout); @@ -90,16 +109,22 @@ function startServer() { }; } // binary message buffering - function bufferUtf8(socket, timeout) { + function bufferUtf8(timeout, limit) { let buffer = []; let sender = null; let length = 0; return (data) => { buffer.push(data); length += data.length; - if (!sender) { + if (length > limit) { + clearTimeout(sender); + _send(Buffer.concat(buffer, length)); + buffer = []; + sender = null; + length = 0; + } else if (!sender) { sender = setTimeout(() => { - socket.send(Buffer.concat(buffer, length)); + _send(Buffer.concat(buffer, length)); buffer = []; sender = null; length = 0; @@ -107,16 +132,29 @@ function startServer() { } }; } - const send = USE_BINARY_UTF8 ? bufferUtf8(ws, 5) : buffer(ws, 5); - - term.on('data', function(data) { - try { - send(data); - } catch (ex) { - // The WebSocket is not open, ignore + const send = (USE_BINARY_UTF8 ? bufferUtf8 : buffer)(MAX_SEND_INTERVAL, MAX_CHUNK_SIZE); + + term.on('data', data => { + send(data); + sent += data.length; + if (sent > ACK_WATERMARK) { + ack_expected++; + sent -= ACK_WATERMARK; + if (ack_expected > MAX_ACK_DIFF) { + term.pause(); + } } }); + ws.on('message', function(msg) { + //console.log([msg, sent]); + if (msg === FLOW_CONTROL_ACK) { + ack_expected--; + if (ack_expected <= MAX_ACK_DIFF) { + term.resume(); + } + return; + } term.write(msg); }); ws.on('close', function () { @@ -124,7 +162,6 @@ function startServer() { console.log('Closed terminal ' + term.pid); // Clean things up delete terminals[term.pid]; - delete logs[term.pid]; }); }); diff --git a/fast_producer.c b/fast_producer.c new file mode 100644 index 0000000000..f90a15af55 --- /dev/null +++ b/fast_producer.c @@ -0,0 +1,29 @@ +#include +#include +#include + +static char MSG[10][10] = { + { [0 ... 8] = '0', '\n' }, + { [0 ... 8] = '1', '\n' }, + { [0 ... 8] = '2', '\n' }, + { [0 ... 8] = '3', '\n' }, + { [0 ... 8] = '4', '\n' }, + { [0 ... 8] = '5', '\n' }, + { [0 ... 8] = '6', '\n' }, + { [0 ... 8] = '7', '\n' }, + { [0 ... 8] = '8', '\n' }, + { [0 ... 8] = '9', '\n' }, +}; +static char ALL[60000]; + +int main(int argc, char **argv) { + int i; + // fill 60kB buffer + for (i=0; i<600; ++i) { + memcpy(ALL+i*100, MSG, 100); + } + // copy buffer data as fast as possible (~6GB/s on my machine) + while (1) { + write(1, ALL, 60000); + } +} diff --git a/src/Terminal.ts b/src/Terminal.ts index d609331650..75618d3e14 100644 --- a/src/Terminal.ts +++ b/src/Terminal.ts @@ -60,11 +60,34 @@ import { IBufferSet, IBuffer } from 'common/buffer/Types'; const document = (typeof window !== 'undefined') ? window.document : null; /** - * The amount of write requests to queue before sending an XOFF signal to the - * pty process. This number must be small in order for ^C and similar sequences - * to be responsive. + * Safety watermark to avoid memory exhaustion and browser engine crash on fast data input. + * Once hit the terminal will stop working. Enable flow control to avoid this limit + * and make sure that your backend correctly propagates this to the underlying pty. + * (see docs for further instructions) + * Since this limit is meant as a safety parachute to prevent browser crashs, + * it is set to a very high number. Typically xterm.js gets unresponsive with + * a much lower number (>500 kB). */ -const WRITE_BUFFER_PAUSE_THRESHOLD = 5; +const DISCARD_WATERMARK = 50000000; // ~50 MB +/** + * send ACK every ACK_WATERMARK-th byte + */ +const ACK_WATERMARK = 131072; // 524288; // 2^19 + +/** + * Flow control watermarks for the write buffer. + * low: send resume to pty + * high: send pause to pty + */ +const LOW_WATERMARK = 32768; +const HIGH_WATERMARK = 131072; + +/** + * Flow control PAUSE/RESUME/ACK messages. + */ +const FLOW_CONTROL_PAUSE = '\x1b^pause\x1b\\'; // PM pause ST +const FLOW_CONTROL_RESUME = '\x1b^resume\x1b\\'; // PM resume ST +const FLOW_CONTROL_ACK = '\x1b^ack\x1b\\'; // PM ack ST /** * The max number of ms to spend on writes before allowing the renderer to @@ -155,6 +178,14 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp public writeBufferUtf8: Uint8Array[]; private _writeInProgress: boolean; + /** + * Sum of length of pending chunks in all write buffers. + * Note: For the string chunks the actual memory usage is + * doubled (JSString char takes 2 bytes). + */ + private _writeBuffersPendingSize: number = 0; + private _ackWatermark: number = 0; + /** * Whether _xterm.js_ sent XOFF in order to catch up with the pty process. * This is a distinct state from writeStopped so that if the user requested @@ -1222,6 +1253,7 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp * @param data UintArray with UTF8 bytes to write to the terminal. */ public writeUtf8(data: Uint8Array): void { + console.log((this._writeBuffersPendingSize / 1000000).toFixed(2), data.length); // Ensure the terminal isn't disposed if (this._isDisposed) { return; @@ -1232,18 +1264,23 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp return; } - this.writeBufferUtf8.push(data); + // safety measure: dont allow the backend to crash + // the terminal by writing to much data to fast. + // If we hit this, the terminal cant keep up with data written + // and will start to degenerate. + if (this._writeBuffersPendingSize > DISCARD_WATERMARK) { + throw new Error('write data discarded, use flow control to avoid losing data'); + } - // Send XOFF to pause the pty process if the write buffer becomes too large so - // xterm.js can catch up before more data is sent. This is necessary in order - // to keep signals such as ^C responsive. - if (this.options.useFlowControl && !this._xoffSentToCatchUp && this.writeBufferUtf8.length >= WRITE_BUFFER_PAUSE_THRESHOLD) { - // XOFF - stop pty pipe - // XON will be triggered by emulator before processing data chunk - this.handler(C0.DC3); + // flow control: pause pty (like XOFF) + this._writeBuffersPendingSize += data.length; + if (this.options.useFlowControl && this._writeBuffersPendingSize > HIGH_WATERMARK) { + this.handler(FLOW_CONTROL_PAUSE); this._xoffSentToCatchUp = true; } + this.writeBufferUtf8.push(data); + if (!this._writeInProgress && this.writeBufferUtf8.length > 0) { // Kick off a write which will write all data in sequence recursively this._writeInProgress = true; @@ -1265,23 +1302,11 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp const data = this.writeBufferUtf8[bufferOffset]; bufferOffset++; - // If XOFF was sent in order to catch up with the pty process, resume it if - // we reached the end of the writeBuffer to allow more data to come in. - if (this._xoffSentToCatchUp && this.writeBufferUtf8.length === bufferOffset) { - this.handler(C0.DC1); - this._xoffSentToCatchUp = false; - } - this._refreshStart = this.buffer.y; this._refreshEnd = this.buffer.y; - // HACK: Set the parser state based on it's state at the time of return. - // This works around the bug #662 which saw the parser state reset in the - // middle of parsing escape sequence in two chunks. For some reason the - // state of the parser resets to 0 after exiting parser.parse. This change - // just sets the state back based on the correct return statement. - this._inputHandler.parseUtf8(data); + this._writeBuffersPendingSize -= data.length; this.updateRange(this.buffer.y); this.refresh(this._refreshStart, this._refreshEnd); @@ -1290,6 +1315,13 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp break; } } + + // flow control: resume pty (like XON) + if (this._xoffSentToCatchUp && this._writeBuffersPendingSize < LOW_WATERMARK) { + this.handler(FLOW_CONTROL_RESUME); + this._xoffSentToCatchUp = false; + } + if (this.writeBufferUtf8.length > bufferOffset) { // Allow renderer to catch up before processing the next batch // trim already processed chunks if we are above threshold @@ -1309,6 +1341,7 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp * @param data The text to write to the terminal. */ public write(data: string): void { + console.log((this._writeBuffersPendingSize / 1000000).toFixed(2), data.length); // Ensure the terminal isn't disposed if (this._isDisposed) { return; @@ -1319,18 +1352,23 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp return; } - this.writeBuffer.push(data); + // safety measure: dont allow the backend to crash + // the terminal by writing to much data to fast. + // If we hit this, the terminal cant keep up with data written + // and will start to degenerate. + if (this._writeBuffersPendingSize > DISCARD_WATERMARK) { + throw new Error('write data discarded, use flow control to avoid losing data'); + } - // Send XOFF to pause the pty process if the write buffer becomes too large so - // xterm.js can catch up before more data is sent. This is necessary in order - // to keep signals such as ^C responsive. - if (this.options.useFlowControl && !this._xoffSentToCatchUp && this.writeBuffer.length >= WRITE_BUFFER_PAUSE_THRESHOLD) { - // XOFF - stop pty pipe - // XON will be triggered by emulator before processing data chunk - this.handler(C0.DC3); + // flow control: pause pty (like XOFF) + this._writeBuffersPendingSize += data.length; + if (this.options.useFlowControl && this._writeBuffersPendingSize > HIGH_WATERMARK) { + this.handler(FLOW_CONTROL_PAUSE); this._xoffSentToCatchUp = true; } + this.writeBuffer.push(data); + if (!this._writeInProgress && this.writeBuffer.length > 0) { // Kick off a write which will write all data in sequence recursively this._writeInProgress = true; @@ -1352,23 +1390,12 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp const data = this.writeBuffer[bufferOffset]; bufferOffset++; - // If XOFF was sent in order to catch up with the pty process, resume it if - // we reached the end of the writeBuffer to allow more data to come in. - if (this._xoffSentToCatchUp && this.writeBuffer.length === bufferOffset) { - this.handler(C0.DC1); - this._xoffSentToCatchUp = false; - } - this._refreshStart = this.buffer.y; this._refreshEnd = this.buffer.y; - // HACK: Set the parser state based on it's state at the time of return. - // This works around the bug #662 which saw the parser state reset in the - // middle of parsing escape sequence in two chunks. For some reason the - // state of the parser resets to 0 after exiting parser.parse. This change - // just sets the state back based on the correct return statement. - this._inputHandler.parse(data); + this._writeBuffersPendingSize -= data.length; + this._ackWatermark += data.length; this.updateRange(this.buffer.y); this.refresh(this._refreshStart, this._refreshEnd); @@ -1377,6 +1404,17 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp break; } } + + // flow control: resume pty (like XON) + if (this._xoffSentToCatchUp && this._writeBuffersPendingSize < LOW_WATERMARK) { + this.handler(FLOW_CONTROL_RESUME); + this._xoffSentToCatchUp = false; + } + if (this._ackWatermark > ACK_WATERMARK) { + setTimeout(() => this.handler(FLOW_CONTROL_ACK), 200); + this._ackWatermark -= ACK_WATERMARK; + } + if (this.writeBuffer.length > bufferOffset) { // Allow renderer to catch up before processing the next batch // trim already processed chunks if we are above threshold diff --git a/yarn.lock b/yarn.lock index 2c2be0f3ed..4b00c7b692 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2776,7 +2776,12 @@ mute-stream@0.0.7: resolved "https://registry.yarnpkg.com/mute-stream/-/mute-stream-0.0.7.tgz#3075ce93bc21b8fab43e1bc4da7e8115ed1e7bab" integrity sha1-MHXOk7whuPq0PhvE2n6BFe0ee6s= -nan@2.10.0, nan@^2.9.2: +nan@^2.13.2: + version "2.14.0" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.14.0.tgz#7818f722027b2459a86f0295d434d1fc2336c52c" + integrity sha512-INOFj37C7k3AfaNTtX8RhsTw7qRy7eLET14cROi9+5HAVbbHuIWUHEauBv5qT4Av2tWasiTY1Jw6puUNqRJXQg== + +nan@^2.9.2: version "2.10.0" resolved "https://registry.yarnpkg.com/nan/-/nan-2.10.0.tgz#96d0cd610ebd58d4b4de9cc0c6828cda99c7548f" integrity sha512-bAdJv7fBLhWC+/Bls0Oza+mvTaNQtP+1RyhhhvD95pgUJz6XM5IzgmxOkItJ9tkoCiplvAnXI1tNmmUD/eScyA== @@ -2875,12 +2880,12 @@ node-pre-gyp@^0.10.0: semver "^5.3.0" tar "^4" -node-pty@0.7.6: - version "0.7.6" - resolved "https://registry.yarnpkg.com/node-pty/-/node-pty-0.7.6.tgz#bff6148c9c5836ca7e73c7aaaec067dcbdac2f7b" - integrity sha512-ECzKUB7KkAFZ0cjyjMXp5WLJ+7YIZ1xnNmiiegOI6WdDaKABUNV5NbB1Dw9MXD4KrZipWII0wQ7RGZ6StU/7jA== +node-pty@0.9.0-beta10: + version "0.9.0-beta10" + resolved "https://registry.yarnpkg.com/node-pty/-/node-pty-0.9.0-beta10.tgz#058850d6971b04fefaa5ffe00a8816cd892f1419" + integrity sha512-I+wvK1FCiaAkIhlW7zA7V2FkJSX2JjOto5R9DXLGQGWMIXo+n2f0vXu7YLMbGaR5eR6NIm6KP0UhsFKKCn/bwg== dependencies: - nan "2.10.0" + nan "^2.13.2" nopt@^4.0.1: version "4.0.1"