11'use strict'
22
33const assert = require ( 'node:assert' )
4+ const { Readable } = require ( 'node:stream' )
45const util = require ( '../core/util' )
56const CacheHandler = require ( '../handler/cache-handler' )
67const MemoryCacheStore = require ( '../cache/memory-cache-store' )
@@ -57,27 +58,25 @@ module.exports = (opts = {}) => {
5758 // Where body can be a Buffer, string, stream or blob?
5859 const result = store . get ( cacheKey )
5960 if ( ! result ) {
60- // Request isn't cached
6161 return dispatch ( opts , new CacheHandler ( globalOpts , cacheKey , handler ) )
6262 }
6363
6464 /**
65- * @param {import('node:stream').Readable | undefined } stream
65+ * @param {import('node:stream').Readable } stream
6666 * @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse } value
6767 */
6868 const respondWithCachedValue = ( stream , value ) => {
69- assert ( ! stream || ! stream . destroyed , 'stream should not be destroyed' )
70- assert ( ! stream || ! stream . readableDidRead , 'stream should not be readableDidRead' )
69+ assert ( ! stream . destroyed , 'stream should not be destroyed' )
70+ assert ( ! stream . readableDidRead , 'stream should not be readableDidRead' )
71+
7172 try {
7273 stream
73- ? .on ( 'error' , function ( err ) {
74+ . on ( 'error' , function ( err ) {
7475 if ( ! this . readableEnded ) {
7576 if ( typeof handler . onError === 'function' ) {
7677 handler . onError ( err )
7778 } else {
78- process . nextTick ( ( ) => {
79- throw err
80- } )
79+ throw err
8180 }
8281 }
8382 } )
@@ -89,10 +88,10 @@ module.exports = (opts = {}) => {
8988
9089 if ( typeof handler . onConnect === 'function' ) {
9190 handler . onConnect ( ( err ) => {
92- stream ? .destroy ( err )
91+ stream . destroy ( err )
9392 } )
9493
95- if ( stream ? .destroyed ) {
94+ if ( stream . destroyed ) {
9695 return
9796 }
9897 }
@@ -106,16 +105,12 @@ module.exports = (opts = {}) => {
106105 const rawHeaders = [ ...value . rawHeaders , AGE_HEADER , Buffer . from ( `${ age } ` ) ]
107106
108107 if ( handler . onHeaders ( value . statusCode , rawHeaders , ( ) => stream ?. resume ( ) , value . statusMessage ) === false ) {
109- stream ? .pause ( )
108+ stream . pause ( )
110109 }
111110 }
112111
113112 if ( opts . method === 'HEAD' ) {
114- if ( typeof handler . onComplete === 'function' ) {
115- handler . onComplete ( [ ] )
116- }
117-
118- stream ?. destroy ( )
113+ stream . destroy ( )
119114 } else {
120115 stream . on ( 'data' , function ( chunk ) {
121116 if ( typeof handler . onData === 'function' && ! handler . onData ( chunk ) ) {
@@ -124,15 +119,20 @@ module.exports = (opts = {}) => {
124119 } )
125120 }
126121 } catch ( err ) {
127- stream ? .destroy ( err )
122+ stream . destroy ( err )
128123 }
129124 }
130125
131126 /**
132127 * @param {import('../../types/cache-interceptor.d.ts').default.GetResult } result
133128 */
134129 const handleStream = ( result ) => {
135- const { response : value , body : stream } = result
130+ const { response : value , body } = result
131+
132+ // TODO (perf): Readable.from path can be optimized...
133+ const stream = util . isStream ( body )
134+ ? body
135+ : Readable . from ( body ?? [ ] )
136136
137137 if ( ! stream && opts . method !== 'HEAD' ) {
138138 throw new Error ( 'stream is undefined but method isn\'t HEAD' )
@@ -177,12 +177,17 @@ module.exports = (opts = {}) => {
177177 if ( typeof result . then === 'function' ) {
178178 result . then ( ( result ) => {
179179 if ( ! result ) {
180- // Request isn't cached
181- return dispatch ( opts , new CacheHandler ( globalOpts , cacheKey , handler ) )
180+ dispatch ( opts , new CacheHandler ( globalOpts , cacheKey , handler ) )
181+ } else {
182+ handleStream ( result )
182183 }
183-
184- handleStream ( result )
185- } ) . catch ( err => handler . onError ( err ) )
184+ } , err => {
185+ if ( typeof handler . onError === 'function' ) {
186+ handler . onError ( err )
187+ } else {
188+ throw err
189+ }
190+ } )
186191 } else {
187192 handleStream ( result )
188193 }
0 commit comments