@@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() {
284284// similar to how Writable.write() returns true if you should
285285// write() some more.
286286Readable . prototype . push = function ( chunk , encoding ) {
287- return readableAddChunk ( this , chunk , encoding , false ) ;
287+ debug ( 'push' , chunk ) ;
288+
289+ const state = this . _readableState ;
290+ return ( state [ kState ] & kObjectMode ) === 0 ?
291+ readableAddChunkPushByteMode ( this , state , chunk , encoding ) :
292+ readableAddChunkPushObjectMode ( this , state , chunk , encoding ) ;
288293} ;
289294
290295// Unshift should *always* be something directly out of read().
291296Readable . prototype . unshift = function ( chunk , encoding ) {
292- return readableAddChunk ( this , chunk , encoding , true ) ;
297+ debug ( 'unshift' , chunk ) ;
298+ const state = this . _readableState ;
299+ return ( state [ kState ] & kObjectMode ) === 0 ?
300+ readableAddChunkUnshiftByteMode ( this , state , chunk , encoding ) :
301+ readableAddChunkUnshiftObjectMode ( this , state , chunk ) ;
293302} ;
294303
295- function readableAddChunk ( stream , chunk , encoding , addToFront ) {
296- debug ( 'readableAddChunk' , chunk ) ;
297- const state = stream . _readableState ;
298304
299- let err ;
300- if ( ( state [ kState ] & kObjectMode ) === 0 ) {
301- if ( typeof chunk === 'string' ) {
302- encoding = encoding || state . defaultEncoding ;
303- if ( state . encoding !== encoding ) {
304- if ( addToFront && state . encoding ) {
305- // When unshifting, if state.encoding is set, we have to save
306- // the string in the BufferList with the state encoding.
307- chunk = Buffer . from ( chunk , encoding ) . toString ( state . encoding ) ;
308- } else {
309- chunk = Buffer . from ( chunk , encoding ) ;
310- encoding = '' ;
311- }
305+ function readableAddChunkUnshiftByteMode ( stream , state , chunk , encoding ) {
306+ if ( chunk === null ) {
307+ state [ kState ] &= ~ kReading ;
308+ onEofChunk ( stream , state ) ;
309+
310+ return false ;
311+ }
312+
313+ if ( typeof chunk === 'string' ) {
314+ encoding = encoding || state . defaultEncoding ;
315+ if ( state . encoding !== encoding ) {
316+ if ( state . encoding ) {
317+ // When unshifting, if state.encoding is set, we have to save
318+ // the string in the BufferList with the state encoding.
319+ chunk = Buffer . from ( chunk , encoding ) . toString ( state . encoding ) ;
320+ } else {
321+ chunk = Buffer . from ( chunk , encoding ) ;
312322 }
313- } else if ( chunk instanceof Buffer ) {
314- encoding = '' ;
315- } else if ( Stream . _isUint8Array ( chunk ) ) {
316- chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
317- encoding = '' ;
318- } else if ( chunk != null ) {
319- err = new ERR_INVALID_ARG_TYPE (
320- 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
321323 }
324+ } else if ( Stream . _isUint8Array ( chunk ) ) {
325+ chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
326+ } else if ( chunk !== undefined && ! ( chunk instanceof Buffer ) ) {
327+ errorOrDestroy ( stream , new ERR_INVALID_ARG_TYPE (
328+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ) ;
329+ return false ;
322330 }
323331
324- if ( err ) {
325- errorOrDestroy ( stream , err ) ;
326- } else if ( chunk === null ) {
332+
333+ if ( ! ( chunk && chunk . length > 0 ) ) {
334+ return canPushMore ( state ) ;
335+ }
336+
337+ return readableAddChunkUnshiftValue ( stream , state , chunk ) ;
338+ }
339+
340+ function readableAddChunkUnshiftObjectMode ( stream , state , chunk ) {
341+ if ( chunk === null ) {
327342 state [ kState ] &= ~ kReading ;
328343 onEofChunk ( stream , state ) ;
329- } else if ( ( ( state [ kState ] & kObjectMode ) !== 0 ) || ( chunk && chunk . length > 0 ) ) {
330- if ( addToFront ) {
331- if ( ( state [ kState ] & kEndEmitted ) !== 0 )
332- errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ;
333- else if ( state . destroyed || state . errored )
334- return false ;
335- else
336- addChunk ( stream , state , chunk , true ) ;
337- } else if ( state . ended ) {
338- errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
339- } else if ( state . destroyed || state . errored ) {
340- return false ;
341- } else {
342- state [ kState ] &= ~ kReading ;
343- if ( state . decoder && ! encoding ) {
344- chunk = state . decoder . write ( chunk ) ;
345- if ( state . objectMode || chunk . length !== 0 )
346- addChunk ( stream , state , chunk , false ) ;
347- else
348- maybeReadMore ( stream , state ) ;
349- } else {
350- addChunk ( stream , state , chunk , false ) ;
351- }
344+
345+ return false ;
346+ }
347+
348+ return readableAddChunkUnshiftValue ( stream , state , chunk ) ;
349+ }
350+
351+ function readableAddChunkUnshiftValue ( stream , state , chunk ) {
352+ if ( ( state [ kState ] & kEndEmitted ) !== 0 )
353+ errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ;
354+ else if ( state . destroyed || state . errored )
355+ return false ;
356+ else
357+ addChunk ( stream , state , chunk , true ) ;
358+
359+ return canPushMore ( state ) ;
360+ }
361+
362+ function readableAddChunkPushByteMode ( stream , state , chunk , encoding ) {
363+ if ( chunk === null ) {
364+ state [ kState ] &= ~ kReading ;
365+ onEofChunk ( stream , state ) ;
366+
367+ return false ;
368+ }
369+
370+ if ( typeof chunk === 'string' ) {
371+ encoding = encoding || state . defaultEncoding ;
372+ if ( state . encoding !== encoding ) {
373+ chunk = Buffer . from ( chunk , encoding ) ;
374+ encoding = '' ;
352375 }
353- } else if ( ! addToFront ) {
376+ } else if ( chunk instanceof Buffer ) {
377+ encoding = '' ;
378+ } else if ( Stream . _isUint8Array ( chunk ) ) {
379+ chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
380+ encoding = '' ;
381+ } else if ( chunk !== undefined ) {
382+ errorOrDestroy ( stream , new ERR_INVALID_ARG_TYPE (
383+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ) ;
384+ return false ;
385+ }
386+
387+ if ( ! chunk || chunk . length <= 0 ) {
354388 state [ kState ] &= ~ kReading ;
355389 maybeReadMore ( stream , state ) ;
390+
391+ return canPushMore ( state ) ;
392+ }
393+
394+ if ( state . ended ) {
395+ errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
396+
397+ return false ;
398+ }
399+
400+ if ( state . destroyed || state . errored ) {
401+ return false ;
402+ }
403+
404+ state [ kState ] &= ~ kReading ;
405+ if ( state . decoder && ! encoding ) {
406+ chunk = state . decoder . write ( chunk ) ;
407+ if ( chunk . length === 0 ) {
408+ maybeReadMore ( stream , state ) ;
409+
410+ return canPushMore ( state ) ;
411+ }
356412 }
357413
414+ addChunk ( stream , state , chunk , false ) ;
415+ return canPushMore ( state ) ;
416+ }
417+
418+ function readableAddChunkPushObjectMode ( stream , state , chunk , encoding ) {
419+ if ( chunk === null ) {
420+ state [ kState ] &= ~ kReading ;
421+ onEofChunk ( stream , state ) ;
422+
423+ return false ;
424+ }
425+
426+ if ( state . ended ) {
427+ errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
428+ return false ;
429+ }
430+
431+ if ( state . destroyed || state . errored ) {
432+ return false ;
433+ }
434+
435+ state [ kState ] &= ~ kReading ;
436+ if ( state . decoder && ! encoding ) {
437+ chunk = state . decoder . write ( chunk ) ;
438+ }
439+
440+ addChunk ( stream , state , chunk , false ) ;
441+ return canPushMore ( state ) ;
442+ }
443+
444+ function canPushMore ( state ) {
358445 // We can push more data if we are below the highWaterMark.
359446 // Also, if we have no data yet, we can stand some more bytes.
360447 // This is to work around cases where hwm=0, such as the repl.
0 commit comments