From 04d9a1bf958a56e536c70ea92e884dc77ff76d13 Mon Sep 17 00:00:00 2001 From: Chris Dickinson Date: Thu, 4 Dec 2014 12:00:23 -0800 Subject: [PATCH 1/2] stream: switch _writableState.buffer to queue In cases where many small writes are made to a stream lacking _writev, the array data structure backing the WriteReq buffer would greatly increase GC pressure. Specifically, in the fs.WriteStream case, the clearBuffer routine would only clear a single WriteReq from the buffer before exiting, but would cause the entire backing array to be GC'd. Switching to [].shift lessened pressure, but still the bulk of the time was spent in memcpy. This replaces that structure with a linked list-backed queue so that adding and removing from the queue is O(1). In the _writev case, collecting the buffer requires an O(N) loop over the buffer, but that was already being performed to collect callbacks, so slowdown should be neglible. --- lib/_stream_writable.js | 74 +++++++++++++++++++-------- lib/net.js | 2 +- test/simple/test-stream2-transform.js | 2 +- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 92984eb08eb2..fb5968ad6374 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -28,6 +28,7 @@ Writable.WritableState = WritableState; var util = require('util'); var Stream = require('stream'); +var debug = util.debuglog('stream'); util.inherits(Writable, Stream); @@ -35,6 +36,7 @@ function WriteReq(chunk, encoding, cb) { this.chunk = chunk; this.encoding = encoding; this.callback = cb; + this.next = null; } function WritableState(options, stream) { @@ -109,7 +111,9 @@ function WritableState(options, stream) { // the amount that is being written when _write is called. this.writelen = 0; - this.buffer = []; + this.bufferedRequest = null; + this.lastBufferedRequest = null; + this.bufferLength = 0; // number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -123,6 +127,22 @@ function WritableState(options, stream) { this.errorEmitted = false; } +WritableState.prototype.getBuffer = function writableStateGetBuffer() { + var current = this.bufferedRequest; + var out = []; + while (current) { + out.push(current); + current = current.next; + } + return out; +}; + +Object.defineProperty(WritableState.prototype, 'buffer', { + get: util.deprecate(function() { + return this.getBuffer(); + }, '_writableState.buffer is deprecated. Use _writableState.getBuffer() instead.') +}); + function Writable(options) { // Writable ctor is applied to Duplexes, though they're not // instanceof Writable, they're instanceof Readable. @@ -216,7 +236,7 @@ Writable.prototype.uncork = function() { !state.corked && !state.finished && !state.bufferProcessing && - state.buffer.length) + state.bufferedRequest) clearBuffer(this, state); } }; @@ -255,8 +275,16 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { if (!ret) state.needDrain = true; - if (state.writing || state.corked) - state.buffer.push(new WriteReq(chunk, encoding, cb)); + if (state.writing || state.corked) { + var last = state.lastBufferedRequest; + ++state.bufferLength; + state.lastBufferedRequest = new WriteReq(chunk, encoding, cb); + if (last) { + last.next = state.lastBufferedRequest; + } else { + state.bufferedRequest = state.lastBufferedRequest; + } + } else doWrite(stream, state, false, len, chunk, encoding, cb); @@ -313,7 +341,7 @@ function onwrite(stream, er) { if (!finished && !state.corked && !state.bufferProcessing && - state.buffer.length) { + state.bufferedRequest) { clearBuffer(stream, state); } @@ -349,17 +377,24 @@ function onwriteDrain(stream, state) { // if there's something in the buffer waiting, then process it function clearBuffer(stream, state) { state.bufferProcessing = true; + var bufferLength = state.bufferLength; + var entry = state.bufferedRequest; - if (stream._writev && state.buffer.length > 1) { + if (stream._writev && bufferLength > 1) { // Fast case, write everything using _writev() + var buffer = []; var cbs = []; - for (var c = 0; c < state.buffer.length; c++) - cbs.push(state.buffer[c].callback); + while (entry) { + cbs.push(entry.callback); + buffer.push(entry); + entry = entry.next; + } // count the one we are adding, as well. // TODO(isaacs) clean this up state.pendingcb++; - doWrite(stream, state, true, state.length, state.buffer, '', function(err) { + bufferLength = 0; + doWrite(stream, state, true, state.length, buffer, '', function(err) { for (var i = 0; i < cbs.length; i++) { state.pendingcb--; cbs[i](err); @@ -367,34 +402,31 @@ function clearBuffer(stream, state) { }); // Clear buffer - state.buffer = []; } else { // Slow case, write chunks one-by-one - for (var c = 0; c < state.buffer.length; c++) { - var entry = state.buffer[c]; + while (entry) { + --bufferLength; var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, cb); - + entry = entry.next; // if we didn't call the onwrite immediately, then // it means that we need to wait until it does. // also, that means that the chunk and cb are currently // being processed, so move the buffer counter past them. if (state.writing) { - c++; break; } } - - if (c < state.buffer.length) - state.buffer = state.buffer.slice(c); - else - state.buffer.length = 0; } - + state.bufferedRequest = entry; + if (!bufferLength) { + state.lastBufferedRequest = null; + } + state.bufferLength = bufferLength; state.bufferProcessing = false; } @@ -435,7 +467,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { function needFinish(stream, state) { return (state.ending && state.length === 0 && - state.buffer.length === 0 && + state.bufferedRequest === null && !state.finished && !state.writing); } diff --git a/lib/net.js b/lib/net.js index 34de98bc3c01..d5c0eda2c7a0 100644 --- a/lib/net.js +++ b/lib/net.js @@ -724,7 +724,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() { data = this._pendingData, encoding = this._pendingEncoding; - state.buffer.forEach(function(el) { + state.getBuffer().forEach(function(el) { if (util.isBuffer(el.chunk)) bytes += el.chunk.length; else diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 9c9ddd8efc31..6064565be0a1 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -81,7 +81,7 @@ test('writable side consumption', function(t) { t.equal(tx._readableState.length, 10); t.equal(transformed, 10); t.equal(tx._transformState.writechunk.length, 5); - t.same(tx._writableState.buffer.map(function(c) { + t.same(tx._writableState.getBuffer().map(function(c) { return c.chunk.length; }), [6, 7, 8, 9, 10]); From d8a2be545dc8be7f259bf21fdbae7d45b6587430 Mon Sep 17 00:00:00 2001 From: Chris Dickinson Date: Fri, 5 Dec 2014 12:47:47 -0800 Subject: [PATCH 2/2] eliminate bufferLength; style fix on deprecation --- lib/_stream_writable.js | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index fb5968ad6374..39eee61460fe 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -113,7 +113,6 @@ function WritableState(options, stream) { this.bufferedRequest = null; this.lastBufferedRequest = null; - this.bufferLength = 0; // number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -140,7 +139,8 @@ WritableState.prototype.getBuffer = function writableStateGetBuffer() { Object.defineProperty(WritableState.prototype, 'buffer', { get: util.deprecate(function() { return this.getBuffer(); - }, '_writableState.buffer is deprecated. Use _writableState.getBuffer() instead.') + }, '_writableState.buffer is deprecated. Use ' + + '_writableState.getBuffer() instead.') }); function Writable(options) { @@ -277,7 +277,6 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { if (state.writing || state.corked) { var last = state.lastBufferedRequest; - ++state.bufferLength; state.lastBufferedRequest = new WriteReq(chunk, encoding, cb); if (last) { last.next = state.lastBufferedRequest; @@ -377,10 +376,9 @@ function onwriteDrain(stream, state) { // if there's something in the buffer waiting, then process it function clearBuffer(stream, state) { state.bufferProcessing = true; - var bufferLength = state.bufferLength; var entry = state.bufferedRequest; - if (stream._writev && bufferLength > 1) { + if (stream._writev && entry && entry.next) { // Fast case, write everything using _writev() var buffer = []; var cbs = []; @@ -393,7 +391,7 @@ function clearBuffer(stream, state) { // count the one we are adding, as well. // TODO(isaacs) clean this up state.pendingcb++; - bufferLength = 0; + state.lastBufferedRequest = null; doWrite(stream, state, true, state.length, buffer, '', function(err) { for (var i = 0; i < cbs.length; i++) { state.pendingcb--; @@ -405,7 +403,6 @@ function clearBuffer(stream, state) { } else { // Slow case, write chunks one-by-one while (entry) { - --bufferLength; var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; @@ -421,12 +418,11 @@ function clearBuffer(stream, state) { break; } } + + if (entry === null) + state.lastBufferedRequest = null; } state.bufferedRequest = entry; - if (!bufferLength) { - state.lastBufferedRequest = null; - } - state.bufferLength = bufferLength; state.bufferProcessing = false; }