Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ There are four fundamental stream types within Node.js:
* [Transform][] - Duplex streams that can modify or transform the data as it
is written and read (for example [`zlib.createDeflate()`][]).

Additionally this module inclues a utility function [pump][].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/inclues/includes/


### Object Mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
Expand Down Expand Up @@ -89,7 +91,7 @@ total size of the internal write buffer is below the threshold set by
the size of the internal buffer reaches or exceeds the `highWaterMark`, `false`
will be returned.

A key goal of the `stream` API, particularly the [`stream.pipe()`] method,
A key goal of the `stream` API, particularly the [`stream.pump()`] function,
is to limit the buffering of data to acceptable levels such that sources and
destinations of differing speeds will not overwhelm the available memory.

Expand Down Expand Up @@ -1244,6 +1246,14 @@ implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].
The default implementation of `_destroy` for `Transform` also emit `'close'`.

#### Class Method: stream.pump(...streams[, callback])

* two or more streams to pipe between
* optional callback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters should be formatted according to the style used in the rest of the documentation.

Also, it would be a good idea to either add the function signature here in some way or at the very least describe in the description below what parameters the callback should have.


A class method to pipe between streams forwarding errors and properly cleaning
up.

## API for Stream Implementers

<!--type=misc-->
Expand Down Expand Up @@ -2334,14 +2344,15 @@ contain multi-byte characters.
[TCP sockets]: net.html#net_class_net_socket
[Transform]: #stream_class_stream_transform
[Writable]: #stream_class_stream_writable
[async-iterator]: https://github.com/tc39/proposal-async-iteration
[child process stdin]: child_process.html#child_process_subprocess_stdin
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout
[crypto]: crypto.html
[fs read streams]: fs.html#fs_class_fs_readstream
[fs write streams]: fs.html#fs_class_fs_writestream
[http-incoming-message]: http.html#http_class_http_incomingmessage
[zlib]: zlib.html
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[pump]: #stream_class_method_pump
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
Expand All @@ -2358,4 +2369,4 @@ contain multi-byte characters.
[readable-destroy]: #stream_readable_destroy_error
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[async-iterator]: https://github.com/tc39/proposal-async-iteration
[zlib]: zlib.html
6 changes: 3 additions & 3 deletions lib/_stream_pump.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ function destroyer(stream, reading, writing, _callback) {
return stream.abort();
// request.destroy just do .end - .abort is what we want

if (isFn(stream.destroy)) return stream.destroy();
if (isFn(stream.destroy)) return stream.destroy(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, this wont work on older streams, which is why it was not calling it with err before


callback(err || new Error('stream was destroyed'));
};
}

const call = (fn) => fn();

const callErr = (err) => (fn) => fn(err);
const pipe = (from, to) => from.pipe(to);

function pump(...streams) {
Expand All @@ -176,7 +176,7 @@ function pump(...streams) {
return destroyer(stream, reading, writing, (err) => {
if (!error) error = err;

if (err) destroys.forEach(call);
if (err) destroys.forEach(callErr(err));

if (reading) return;

Expand Down
36 changes: 20 additions & 16 deletions test/parallel/test-stream-pump.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,30 @@ test('call destroy if error', (t) => {
let wsDestroyCalled = false;
let rsDestroyCalled = false;
let callbackCalled = false;
const rs = new stream.Readable({
read(size) {
class RS extends stream.Readable {
_read(size) {
this.push(crypto.randomBytes(size));
}
});
rs.destroy = function() {
this.emit('close');
rsDestroyCalled = true;
check();
};
const ws = new stream.Writable({
write(chunk, enc, next) {
_destroy(err, cb) {
rsDestroyCalled = true;
check();
super._destroy(err, cb);
}
}
class WS extends stream.Writable {
_write(chunk, enc, next) {
setImmediate(next);
}
});
ws.destroy = function() {
this.emit('close');
wsDestroyCalled = true;
check();
};
_destroy(error, cb) {
t.equal(error && error.message, 'lets end this');
this.emit('close');
wsDestroyCalled = true;
check();
super._destroy(error, cb);
}
}
const rs = new RS();
const ws = new WS();
function throwError() {
const reverse = new stream.Transform();
let i = 0;
Expand Down