From 0de8bddae53945278b74254d3d4ac9f71a9d66b8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 25 Jun 2024 21:14:50 +0200 Subject: [PATCH 1/5] fix: interceptor back-pressure Refs: https://github.com/nodejs/undici/pull/3368 Refs: https://github.com/nodejs/undici/issues/3370 --- lib/dispatcher/agent.js | 4 +-- lib/dispatcher/client.js | 37 ++++++++++++++++++------- lib/dispatcher/dispatcher-base.js | 10 +++---- lib/dispatcher/dispatcher.js | 11 ++++++-- lib/dispatcher/pool-base.js | 21 ++++++++++++-- lib/dispatcher/proxy-agent.js | 5 ++-- lib/dispatcher/retry-agent.js | 9 +++--- lib/interceptor/dump.js | 4 +-- lib/interceptor/redirect-interceptor.js | 6 ++-- lib/interceptor/redirect.js | 4 +-- lib/interceptor/retry.js | 5 ++-- 11 files changed, 79 insertions(+), 37 deletions(-) diff --git a/lib/dispatcher/agent.js b/lib/dispatcher/agent.js index 98f1486cac0..448eb3b8769 100644 --- a/lib/dispatcher/agent.js +++ b/lib/dispatcher/agent.js @@ -79,7 +79,7 @@ class Agent extends DispatcherBase { return ret } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { let key if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { key = String(opts.origin) @@ -102,7 +102,7 @@ class Agent extends DispatcherBase { this[kClients].set(key, dispatcher) } - return dispatcher.dispatch(opts, handler) + return dispatcher.dispatch(opts, handler, onDrain) } async [kClose] () { diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index cb61206b1ed..48f109227e5 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -62,6 +62,7 @@ const connectH2 = require('./client-h2.js') let deprecatedInterceptorWarned = false const kClosedResolve = Symbol('kClosedResolve') +const kDrainQueue = Symbol('kDrainQueue') function getPipelining (client) { return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1 @@ -243,6 +244,13 @@ class Client extends DispatcherBase { this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server this[kHTTPContext] = null + this[kDrainQueue] = [] + this.on('drain', () => { + for (const callback of this[kDrainQueue].splice(0)) { + callback(null) + } + }) + // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. // | complete | running | pending | @@ -299,26 +307,31 @@ class Client extends DispatcherBase { this.once('connect', cb) } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { const origin = opts.origin || this[kUrl].origin const request = new Request(origin, opts, handler) - this[kQueue].push(request) - if (this[kResuming]) { - // Do nothing. - } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { - // Wait a tick in case stream/iterator is ended in the same tick. - this[kResuming] = 1 - queueMicrotask(() => resume(this)) + if (this[kBusy] && onDrain) { + this[kDrainQueue].push(onDrain) + return false } else { - this[kResume](true) + this[kQueue].push(request) + if (this[kResuming]) { + // Do nothing. + } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { + // Wait a tick in case stream/iterator is ended in the same tick. + this[kResuming] = 1 + queueMicrotask(() => resume(this)) + } else { + this[kResume](true) + } } if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { this[kNeedDrain] = 2 } - return this[kNeedDrain] < 2 + return onDrain ? true : this[kNeedDrain] < 2 } async [kClose] () { @@ -341,6 +354,10 @@ class Client extends DispatcherBase { util.errorRequest(this, request, err) } + for (const callback of this[kDrainQueue].splice(0)) { + callback(err) + } + const callback = () => { if (this[kClosedResolve]) { // TODO (fix): Should we error here with ClientDestroyedError? diff --git a/lib/dispatcher/dispatcher-base.js b/lib/dispatcher/dispatcher-base.js index bd860acdcf4..637f32c7a09 100644 --- a/lib/dispatcher/dispatcher-base.js +++ b/lib/dispatcher/dispatcher-base.js @@ -142,10 +142,10 @@ class DispatcherBase extends Dispatcher { }) } - [kInterceptedDispatch] (opts, handler) { + [kInterceptedDispatch] (opts, handler, onDrain) { if (!this[kInterceptors] || this[kInterceptors].length === 0) { this[kInterceptedDispatch] = this[kDispatch] - return this[kDispatch](opts, handler) + return this[kDispatch](opts, handler, onDrain) } let dispatch = this[kDispatch].bind(this) @@ -153,10 +153,10 @@ class DispatcherBase extends Dispatcher { dispatch = this[kInterceptors][i](dispatch) } this[kInterceptedDispatch] = dispatch - return dispatch(opts, handler) + return dispatch(opts, handler, onDrain) } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { if (!handler || typeof handler !== 'object') { throw new InvalidArgumentError('handler must be an object') } @@ -174,7 +174,7 @@ class DispatcherBase extends Dispatcher { throw new ClientClosedError() } - return this[kInterceptedDispatch](opts, handler) + return this[kInterceptedDispatch](opts, handler, onDrain) } catch (err) { if (typeof handler.onError !== 'function') { throw new InvalidArgumentError('invalid onError method') diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index b1e0098ec4b..264a3fc4e66 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -49,8 +49,15 @@ class ComposedDispatcher extends Dispatcher { this.#dispatch = dispatch } - dispatch (...args) { - this.#dispatch(...args) + dispatch (opts, handler, onDrain) { + // Allocating a closure here every time is not great... + return this.#dispatch(opts, handler, onDrain ?? (err, ...args) => { + if (err) { + handler.onError(err) + } else { + this.emit('drain', ...args) + } + }) } close (...args) { diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index ff3108a4da2..0ee61a6a1d5 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -17,6 +17,7 @@ const kGetDispatcher = Symbol('get dispatcher') const kAddClient = Symbol('add client') const kRemoveClient = Symbol('remove client') const kStats = Symbol('stats') +const kDrainQueue = Symbol('kDrainQueue') class PoolBase extends DispatcherBase { constructor () { @@ -69,6 +70,13 @@ class PoolBase extends DispatcherBase { } this[kStats] = new PoolStats(this) + + this[kDrainQueue] = [] + this.on('drain', () => { + for (const callback of this[kDrainQueue].splice(0)) { + callback(null) + } + }) } get [kBusy] () { @@ -122,6 +130,10 @@ class PoolBase extends DispatcherBase { } async [kDestroy] (err) { + for (const callback of this[kDrainQueue].splice(0)) { + callback(err) + } + while (true) { const item = this[kQueue].shift() if (!item) { @@ -133,10 +145,13 @@ class PoolBase extends DispatcherBase { return Promise.all(this[kClients].map(c => c.destroy(err))) } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { const dispatcher = this[kGetDispatcher]() - if (!dispatcher) { + if (!dispatcher && onDrain) { + this[kDrainQueue].push(onDrain) + return false + } else if (!dispatcher) { this[kNeedDrain] = true this[kQueue].push({ opts, handler }) this[kQueued]++ @@ -145,7 +160,7 @@ class PoolBase extends DispatcherBase { this[kNeedDrain] = !this[kGetDispatcher]() } - return !this[kNeedDrain] + return onDrain ? true : !this[kNeedDrain] } [kAddClient] (client) { diff --git a/lib/dispatcher/proxy-agent.js b/lib/dispatcher/proxy-agent.js index 226b67846da..a698a70a6b2 100644 --- a/lib/dispatcher/proxy-agent.js +++ b/lib/dispatcher/proxy-agent.js @@ -107,7 +107,7 @@ class ProxyAgent extends DispatcherBase { }) } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { const headers = buildHeaders(opts.headers) throwIfProxyAuthIsSent(headers) @@ -121,7 +121,8 @@ class ProxyAgent extends DispatcherBase { ...opts, headers }, - handler + handler, + onDrain ) } diff --git a/lib/dispatcher/retry-agent.js b/lib/dispatcher/retry-agent.js index 0c2120d6f26..1a5015b54bb 100644 --- a/lib/dispatcher/retry-agent.js +++ b/lib/dispatcher/retry-agent.js @@ -4,15 +4,16 @@ const Dispatcher = require('./dispatcher') const RetryHandler = require('../handler/retry-handler') class RetryAgent extends Dispatcher { - #agent = null - #options = null + #agent + #options + constructor (agent, options = {}) { super(options) this.#agent = agent this.#options = options } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { const retry = new RetryHandler({ ...opts, retryOptions: this.#options @@ -20,7 +21,7 @@ class RetryAgent extends Dispatcher { dispatch: this.#agent.dispatch.bind(this.#agent), handler }) - return this.#agent.dispatch(opts, retry) + return this.#agent.dispatch(opts, retry, onDrain) } close () { diff --git a/lib/interceptor/dump.js b/lib/interceptor/dump.js index fc9cacb198d..44d221e2b43 100644 --- a/lib/interceptor/dump.js +++ b/lib/interceptor/dump.js @@ -106,7 +106,7 @@ function createDumpInterceptor ( } ) { return dispatch => { - return function Intercept (opts, handler) { + return function Intercept (opts, handler, onDrain) { const { dumpMaxSize = defaultMaxSize } = opts @@ -115,7 +115,7 @@ function createDumpInterceptor ( handler ) - return dispatch(opts, dumpHandler) + return dispatch(opts, dumpHandler, onDrain) } } } diff --git a/lib/interceptor/redirect-interceptor.js b/lib/interceptor/redirect-interceptor.js index 896ee8db939..7b26af0cc8b 100644 --- a/lib/interceptor/redirect-interceptor.js +++ b/lib/interceptor/redirect-interceptor.js @@ -4,16 +4,16 @@ const RedirectHandler = require('../handler/redirect-handler') function createRedirectInterceptor ({ maxRedirections: defaultMaxRedirections }) { return (dispatch) => { - return function Intercept (opts, handler) { + return function Intercept (opts, handler, onDrain) { const { maxRedirections = defaultMaxRedirections } = opts if (!maxRedirections) { - return dispatch(opts, handler) + return dispatch(opts, handler, onDrain) } const redirectHandler = new RedirectHandler(dispatch, maxRedirections, opts, handler) opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting. - return dispatch(opts, redirectHandler) + return dispatch(opts, redirectHandler, onDrain) } } } diff --git a/lib/interceptor/redirect.js b/lib/interceptor/redirect.js index d2e789d8efb..ffe943d7650 100644 --- a/lib/interceptor/redirect.js +++ b/lib/interceptor/redirect.js @@ -4,7 +4,7 @@ const RedirectHandler = require('../handler/redirect-handler') module.exports = opts => { const globalMaxRedirections = opts?.maxRedirections return dispatch => { - return function redirectInterceptor (opts, handler) { + return function redirectInterceptor (opts, handler, onDrain) { const { maxRedirections = globalMaxRedirections, ...baseOpts } = opts if (!maxRedirections) { @@ -18,7 +18,7 @@ module.exports = opts => { handler ) - return dispatch(baseOpts, redirectHandler) + return dispatch(baseOpts, redirectHandler, onDrain) } } } diff --git a/lib/interceptor/retry.js b/lib/interceptor/retry.js index 1c16fd845a9..4ec61b53d37 100644 --- a/lib/interceptor/retry.js +++ b/lib/interceptor/retry.js @@ -3,7 +3,7 @@ const RetryHandler = require('../handler/retry-handler') module.exports = globalOpts => { return dispatch => { - return function retryInterceptor (opts, handler) { + return function retryInterceptor (opts, handler, onDrain) { return dispatch( opts, new RetryHandler( @@ -12,7 +12,8 @@ module.exports = globalOpts => { handler, dispatch } - ) + ), + onDrain ) } } From a3221c3927fdb57deb8c53ab31fcab2ffeccb07b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 26 Jun 2024 08:02:37 +0200 Subject: [PATCH 2/5] fixup --- lib/dispatcher/client.js | 17 +++++++++++------ lib/dispatcher/pool-base.js | 6 ------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 48f109227e5..6e20bb488ad 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -245,11 +245,6 @@ class Client extends DispatcherBase { this[kHTTPContext] = null this[kDrainQueue] = [] - this.on('drain', () => { - for (const callback of this[kDrainQueue].splice(0)) { - callback(null) - } - }) // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -355,7 +350,7 @@ class Client extends DispatcherBase { } for (const callback of this[kDrainQueue].splice(0)) { - callback(err) + callback(err, this) } const callback = () => { @@ -534,6 +529,16 @@ async function connect (client) { function emitDrain (client) { client[kNeedDrain] = 0 + + while (client[kDrainQueue].length > 0) { + const callback = client[kDrainQueue].shift() + callback(null, client) + + if (client[kNeedDrain]) { + return + } + } + client.emit('drain', client[kUrl], [client]) } diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index 0ee61a6a1d5..cca32ce6578 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -70,13 +70,7 @@ class PoolBase extends DispatcherBase { } this[kStats] = new PoolStats(this) - this[kDrainQueue] = [] - this.on('drain', () => { - for (const callback of this[kDrainQueue].splice(0)) { - callback(null) - } - }) } get [kBusy] () { From 0054313bb1baa7247a4b22e709268a9e0c19e0b0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 26 Jun 2024 08:19:00 +0200 Subject: [PATCH 3/5] fixup --- lib/dispatcher/balanced-pool.js | 6 +-- lib/dispatcher/client.js | 9 +---- lib/dispatcher/dispatcher.js | 6 +-- lib/dispatcher/pool-base.js | 69 ++++++++++++++++++--------------- lib/dispatcher/pool.js | 12 ++---- 5 files changed, 49 insertions(+), 53 deletions(-) diff --git a/lib/dispatcher/balanced-pool.js b/lib/dispatcher/balanced-pool.js index 15a7e7b5879..424baad662a 100644 --- a/lib/dispatcher/balanced-pool.js +++ b/lib/dispatcher/balanced-pool.js @@ -130,7 +130,7 @@ class BalancedPool extends PoolBase { .map((p) => p[kUrl].origin) } - [kGetDispatcher] () { + * [kGetDispatcher] () { // We validate that pools is greater than 0, // otherwise we would have to wait until an upstream // is added, which might never happen. @@ -177,13 +177,13 @@ class BalancedPool extends PoolBase { } } if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { - return pool + yield pool } } this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] this[kIndex] = maxWeightIndex - return this[kClients][maxWeightIndex] + yield this[kClients][maxWeightIndex] } } diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 6e20bb488ad..3d16250180c 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -530,13 +530,8 @@ async function connect (client) { function emitDrain (client) { client[kNeedDrain] = 0 - while (client[kDrainQueue].length > 0) { - const callback = client[kDrainQueue].shift() - callback(null, client) - - if (client[kNeedDrain]) { - return - } + for (const callback of client[kDrainQueue].splice(0)) { + callback(null, client, client[kUrl], [client]) } client.emit('drain', client[kUrl], [client]) diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index 264a3fc4e66..2a1ab9206df 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -51,13 +51,13 @@ class ComposedDispatcher extends Dispatcher { dispatch (opts, handler, onDrain) { // Allocating a closure here every time is not great... - return this.#dispatch(opts, handler, onDrain ?? (err, ...args) => { + return this.#dispatch(opts, handler, onDrain ?? ((err) => { if (err) { handler.onError(err) } else { - this.emit('drain', ...args) + this.#dispatch(opts, handler, onDrain) } - }) + })) } close (...args) { diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index cca32ce6578..467777fcb37 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -26,31 +26,45 @@ class PoolBase extends DispatcherBase { this[kQueue] = new FixedQueue() this[kClients] = [] this[kQueued] = 0 + this[kNeedDrain] = false const pool = this - this[kOnDrain] = function onDrain (origin, targets) { + this[kOnDrain] = function onDrain (err, dispatcher, ...args) { const queue = pool[kQueue] - let needDrain = false + if (queue.isEmpty()) { + return + } + + const { opts, handler } = queue.shift() + pool[kQueued]-- + + if (!dispatcher.dispatch(opts, handler, this[kOnDrain])) { + // XXX: unshift + queue.push({ opts, handler }) + pool[kQueued]++ + } - while (!needDrain) { - const item = queue.shift() - if (!item) { - break - } - pool[kQueued]-- - needDrain = !this.dispatch(item.opts, item.handler) + if (!queue.isEmpty()) { + return } - this[kNeedDrain] = needDrain + // XXX: notify one + for (const callback of pool[kDrainQueue].splice(0)) { + callback(err) + } + + if (!queue.isEmpty()) { + return + } - if (!this[kNeedDrain] && pool[kNeedDrain]) { + if (pool[kNeedDrain]) { pool[kNeedDrain] = false - pool.emit('drain', origin, [pool, ...targets]) + pool.emit('drain', ...args) } - if (pool[kClosedResolve] && queue.isEmpty()) { + if (pool[kClosedResolve]) { Promise .all(pool[kClients].map(c => c.close())) .then(pool[kClosedResolve]) @@ -74,7 +88,7 @@ class PoolBase extends DispatcherBase { } get [kBusy] () { - return this[kNeedDrain] + return this[kQueued] > 0 } get [kConnected] () { @@ -140,40 +154,31 @@ class PoolBase extends DispatcherBase { } [kDispatch] (opts, handler, onDrain) { - const dispatcher = this[kGetDispatcher]() + for (const dispatcher of this[kGetDispatcher]()) { + if (dispatcher.dispatch(opts, handler, this[kOnDrain])) { + return true + } + } - if (!dispatcher && onDrain) { + if (onDrain) { this[kDrainQueue].push(onDrain) - return false - } else if (!dispatcher) { - this[kNeedDrain] = true + } else { this[kQueue].push({ opts, handler }) this[kQueued]++ - } else if (!dispatcher.dispatch(opts, handler)) { - dispatcher[kNeedDrain] = true - this[kNeedDrain] = !this[kGetDispatcher]() + this[kNeedDrain] = true } - return onDrain ? true : !this[kNeedDrain] + return false } [kAddClient] (client) { client - .on('drain', this[kOnDrain]) .on('connect', this[kOnConnect]) .on('disconnect', this[kOnDisconnect]) .on('connectionError', this[kOnConnectionError]) this[kClients].push(client) - if (this[kNeedDrain]) { - queueMicrotask(() => { - if (this[kNeedDrain]) { - this[kOnDrain](client[kUrl], [this, client]) - } - }) - } - return this } diff --git a/lib/dispatcher/pool.js b/lib/dispatcher/pool.js index 2d84cd96488..f1c0dbba77c 100644 --- a/lib/dispatcher/pool.js +++ b/lib/dispatcher/pool.js @@ -75,17 +75,13 @@ class Pool extends PoolBase { this[kFactory] = factory } - [kGetDispatcher] () { - for (const client of this[kClients]) { - if (!client[kNeedDrain]) { - return client - } - } + * [kGetDispatcher] () { + yield * this[kClients] - if (!this[kConnections] || this[kClients].length < this[kConnections]) { + while (!this[kConnections] || this[kClients].length < this[kConnections]) { const dispatcher = this[kFactory](this[kUrl], this[kOptions]) this[kAddClient](dispatcher) - return dispatcher + yield dispatcher } } } From d1b4e2906e2e5df4250d29e42bbd03205e50cf05 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 26 Jun 2024 08:22:21 +0200 Subject: [PATCH 4/5] fixup --- lib/dispatcher/pool-base.js | 1 + lib/dispatcher/pool.js | 1 - test/pool.js | 2094 +++++++++++++++++------------------ 3 files changed, 1048 insertions(+), 1048 deletions(-) diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index 467777fcb37..13ccacd3afb 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -173,6 +173,7 @@ class PoolBase extends DispatcherBase { [kAddClient] (client) { client + .on('drain', this[kOnDrain]) .on('connect', this[kOnConnect]) .on('disconnect', this[kOnDisconnect]) .on('connectionError', this[kOnConnectionError]) diff --git a/lib/dispatcher/pool.js b/lib/dispatcher/pool.js index f1c0dbba77c..659f8e486b6 100644 --- a/lib/dispatcher/pool.js +++ b/lib/dispatcher/pool.js @@ -3,7 +3,6 @@ const { PoolBase, kClients, - kNeedDrain, kAddClient, kGetDispatcher } = require('./pool-base') diff --git a/test/pool.js b/test/pool.js index b75cd530d43..f0e9cc2ca6d 100644 --- a/test/pool.js +++ b/test/pool.js @@ -24,325 +24,325 @@ const { errors } = require('..') -test('throws when connection is infinite', async (t) => { - t = tspl(t, { plan: 2 }) - - try { - new Pool(null, { connections: 0 / 0 }) // eslint-disable-line - } catch (e) { - t.ok(e instanceof errors.InvalidArgumentError) - t.strictEqual(e.message, 'invalid connections') - } -}) - -test('throws when connections is negative', async (t) => { - t = tspl(t, { plan: 2 }) - - try { - new Pool(null, { connections: -1 }) // eslint-disable-line no-new - } catch (e) { - t.ok(e instanceof errors.InvalidArgumentError) - t.strictEqual(e.message, 'invalid connections') - } -}) - -test('throws when connection is not number', async (t) => { - t = tspl(t, { plan: 2 }) - - try { - new Pool(null, { connections: true }) // eslint-disable-line no-new - } catch (e) { - t.ok(e instanceof errors.InvalidArgumentError) - t.strictEqual(e.message, 'invalid connections') - } -}) - -test('throws when factory is not a function', async (t) => { - t = tspl(t, { plan: 2 }) - - try { - new Pool(null, { factory: '' }) // eslint-disable-line no-new - } catch (e) { - t.ok(e instanceof errors.InvalidArgumentError) - t.strictEqual(e.message, 'factory must be a function.') - } -}) - -test('does not throw when connect is a function', async (t) => { - t = tspl(t, { plan: 1 }) - - t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} })) -}) - -test('connect/disconnect event(s)', async (t) => { - const clients = 2 - - t = tspl(t, { plan: clients * 6 }) - - const server = createServer((req, res) => { - res.writeHead(200, { - Connection: 'keep-alive', - 'Keep-Alive': 'timeout=1s' - }) - res.end('ok') - }) - after(() => server.close()) - - server.listen(0, () => { - const pool = new Pool(`http://localhost:${server.address().port}`, { - connections: clients, - keepAliveTimeoutThreshold: 100 - }) - after(() => pool.close()) - - pool.on('connect', (origin, [pool, client]) => { - t.strictEqual(client instanceof Client, true) - }) - pool.on('disconnect', (origin, [pool, client], error) => { - t.ok(client instanceof Client) - t.ok(error instanceof errors.InformationalError) - t.strictEqual(error.code, 'UND_ERR_INFO') - t.strictEqual(error.message, 'socket idle timeout') - }) - - for (let i = 0; i < clients; i++) { - pool.request({ - path: '/', - method: 'GET' - }, (err, { headers, body }) => { - t.ifError(err) - body.resume() - }) - } - }) - - await t.completed -}) - -test('basic get', async (t) => { - t = tspl(t, { plan: 14 }) - - const server = createServer((req, res) => { - t.strictEqual('/', req.url) - t.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.destroy()) - - t.strictEqual(client[kUrl].origin, `http://localhost:${server.address().port}`) - - client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { - t.ifError(err) - t.strictEqual(statusCode, 200) - t.strictEqual(headers['content-type'], 'text/plain') - const bufs = [] - body.on('data', (buf) => { - bufs.push(buf) - }) - body.on('end', () => { - t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) - }) - }) - - t.strictEqual(client.destroyed, false) - t.strictEqual(client.closed, false) - client.close((err) => { - t.ifError(err) - t.strictEqual(client.destroyed, true) - client.destroy((err) => { - t.ifError(err) - client.close((err) => { - t.ok(err instanceof errors.ClientDestroyedError) - }) - }) - }) - t.strictEqual(client.closed, true) - }) - - await t.completed -}) - -test('URL as arg', async (t) => { - t = tspl(t, { plan: 9 }) - - const server = createServer((req, res) => { - t.strictEqual('/', req.url) - t.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) - after(() => server.close()) - - server.listen(0, async () => { - const url = new URL('http://localhost') - url.port = server.address().port - const client = new Pool(url) - after(() => client.destroy()) - - client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { - t.ifError(err) - t.strictEqual(statusCode, 200) - t.strictEqual(headers['content-type'], 'text/plain') - const bufs = [] - body.on('data', (buf) => { - bufs.push(buf) - }) - body.on('end', () => { - t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) - }) - }) - - client.close((err) => { - t.ifError(err) - client.destroy((err) => { - t.ifError(err) - client.close((err) => { - t.ok(err instanceof errors.ClientDestroyedError) - }) - }) - }) - }) - - await t.completed -}) - -test('basic get error async/await', async (t) => { - t = tspl(t, { plan: 2 }) - - const server = createServer((req, res) => { - res.destroy() - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.destroy()) - - await client.request({ path: '/', method: 'GET' }) - .catch((err) => { - t.ok(err) - }) - - await client.destroy() - - await client.close().catch((err) => { - t.ok(err instanceof errors.ClientDestroyedError) - }) - }) - - await t.completed -}) - -test('basic get with async/await', async (t) => { - t = tspl(t, { plan: 4 }) - - const server = createServer((req, res) => { - t.strictEqual('/', req.url) - t.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) - after(() => server.close()) - - await promisify(server.listen.bind(server))(0) - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.destroy()) - - const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) - t.strictEqual(statusCode, 200) - t.strictEqual(headers['content-type'], 'text/plain') - - body.resume() - await promisify(finished)(body) - - await client.close() - await client.destroy() -}) - -test('stream get async/await', async (t) => { - t = tspl(t, { plan: 4 }) - - const server = createServer((req, res) => { - t.strictEqual('/', req.url) - t.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) - after(() => server.close()) - - await promisify(server.listen.bind(server))(0) - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.destroy()) - - await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => { - t.strictEqual(statusCode, 200) - t.strictEqual(headers['content-type'], 'text/plain') - return new PassThrough() - }) - - await t.completed -}) - -test('stream get error async/await', async (t) => { - t = tspl(t, { plan: 1 }) - - const server = createServer((req, res) => { - res.destroy() - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.destroy()) - - await client.stream({ path: '/', method: 'GET' }, () => { - - }) - .catch((err) => { - t.ok(err) - }) - }) - - await t.completed -}) - -test('pipeline get', async (t) => { - t = tspl(t, { plan: 5 }) - - const server = createServer((req, res) => { - t.strictEqual('/', req.url) - t.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.destroy()) - - const bufs = [] - client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { - t.strictEqual(statusCode, 200) - t.strictEqual(headers['content-type'], 'text/plain') - return body - }) - .end() - .on('data', (buf) => { - bufs.push(buf) - }) - .on('end', () => { - t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) - }) - }) - - await t.completed -}) +// test('throws when connection is infinite', async (t) => { +// t = tspl(t, { plan: 2 }) + +// try { +// new Pool(null, { connections: 0 / 0 }) // eslint-disable-line +// } catch (e) { +// t.ok(e instanceof errors.InvalidArgumentError) +// t.strictEqual(e.message, 'invalid connections') +// } +// }) + +// test('throws when connections is negative', async (t) => { +// t = tspl(t, { plan: 2 }) + +// try { +// new Pool(null, { connections: -1 }) // eslint-disable-line no-new +// } catch (e) { +// t.ok(e instanceof errors.InvalidArgumentError) +// t.strictEqual(e.message, 'invalid connections') +// } +// }) + +// test('throws when connection is not number', async (t) => { +// t = tspl(t, { plan: 2 }) + +// try { +// new Pool(null, { connections: true }) // eslint-disable-line no-new +// } catch (e) { +// t.ok(e instanceof errors.InvalidArgumentError) +// t.strictEqual(e.message, 'invalid connections') +// } +// }) + +// test('throws when factory is not a function', async (t) => { +// t = tspl(t, { plan: 2 }) + +// try { +// new Pool(null, { factory: '' }) // eslint-disable-line no-new +// } catch (e) { +// t.ok(e instanceof errors.InvalidArgumentError) +// t.strictEqual(e.message, 'factory must be a function.') +// } +// }) + +// test('does not throw when connect is a function', async (t) => { +// t = tspl(t, { plan: 1 }) + +// t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} })) +// }) + +// test('connect/disconnect event(s)', async (t) => { +// const clients = 2 + +// t = tspl(t, { plan: clients * 6 }) + +// const server = createServer((req, res) => { +// res.writeHead(200, { +// Connection: 'keep-alive', +// 'Keep-Alive': 'timeout=1s' +// }) +// res.end('ok') +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const pool = new Pool(`http://localhost:${server.address().port}`, { +// connections: clients, +// keepAliveTimeoutThreshold: 100 +// }) +// after(() => pool.close()) + +// pool.on('connect', (origin, [pool, client]) => { +// t.strictEqual(client instanceof Client, true) +// }) +// pool.on('disconnect', (origin, [pool, client], error) => { +// t.ok(client instanceof Client) +// t.ok(error instanceof errors.InformationalError) +// t.strictEqual(error.code, 'UND_ERR_INFO') +// t.strictEqual(error.message, 'socket idle timeout') +// }) + +// for (let i = 0; i < clients; i++) { +// pool.request({ +// path: '/', +// method: 'GET' +// }, (err, { headers, body }) => { +// t.ifError(err) +// body.resume() +// }) +// } +// }) + +// await t.completed +// }) + +// test('basic get', async (t) => { +// t = tspl(t, { plan: 14 }) + +// const server = createServer((req, res) => { +// t.strictEqual('/', req.url) +// t.strictEqual('GET', req.method) +// res.setHeader('content-type', 'text/plain') +// res.end('hello') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.destroy()) + +// t.strictEqual(client[kUrl].origin, `http://localhost:${server.address().port}`) + +// client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { +// t.ifError(err) +// t.strictEqual(statusCode, 200) +// t.strictEqual(headers['content-type'], 'text/plain') +// const bufs = [] +// body.on('data', (buf) => { +// bufs.push(buf) +// }) +// body.on('end', () => { +// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) +// }) +// }) + +// t.strictEqual(client.destroyed, false) +// t.strictEqual(client.closed, false) +// client.close((err) => { +// t.ifError(err) +// t.strictEqual(client.destroyed, true) +// client.destroy((err) => { +// t.ifError(err) +// client.close((err) => { +// t.ok(err instanceof errors.ClientDestroyedError) +// }) +// }) +// }) +// t.strictEqual(client.closed, true) +// }) + +// await t.completed +// }) + +// test('URL as arg', async (t) => { +// t = tspl(t, { plan: 9 }) + +// const server = createServer((req, res) => { +// t.strictEqual('/', req.url) +// t.strictEqual('GET', req.method) +// res.setHeader('content-type', 'text/plain') +// res.end('hello') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const url = new URL('http://localhost') +// url.port = server.address().port +// const client = new Pool(url) +// after(() => client.destroy()) + +// client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { +// t.ifError(err) +// t.strictEqual(statusCode, 200) +// t.strictEqual(headers['content-type'], 'text/plain') +// const bufs = [] +// body.on('data', (buf) => { +// bufs.push(buf) +// }) +// body.on('end', () => { +// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) +// }) +// }) + +// client.close((err) => { +// t.ifError(err) +// client.destroy((err) => { +// t.ifError(err) +// client.close((err) => { +// t.ok(err instanceof errors.ClientDestroyedError) +// }) +// }) +// }) +// }) + +// await t.completed +// }) + +// test('basic get error async/await', async (t) => { +// t = tspl(t, { plan: 2 }) + +// const server = createServer((req, res) => { +// res.destroy() +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.destroy()) + +// await client.request({ path: '/', method: 'GET' }) +// .catch((err) => { +// t.ok(err) +// }) + +// await client.destroy() + +// await client.close().catch((err) => { +// t.ok(err instanceof errors.ClientDestroyedError) +// }) +// }) + +// await t.completed +// }) + +// test('basic get with async/await', async (t) => { +// t = tspl(t, { plan: 4 }) + +// const server = createServer((req, res) => { +// t.strictEqual('/', req.url) +// t.strictEqual('GET', req.method) +// res.setHeader('content-type', 'text/plain') +// res.end('hello') +// }) +// after(() => server.close()) + +// await promisify(server.listen.bind(server))(0) +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.destroy()) + +// const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) +// t.strictEqual(statusCode, 200) +// t.strictEqual(headers['content-type'], 'text/plain') + +// body.resume() +// await promisify(finished)(body) + +// await client.close() +// await client.destroy() +// }) + +// test('stream get async/await', async (t) => { +// t = tspl(t, { plan: 4 }) + +// const server = createServer((req, res) => { +// t.strictEqual('/', req.url) +// t.strictEqual('GET', req.method) +// res.setHeader('content-type', 'text/plain') +// res.end('hello') +// }) +// after(() => server.close()) + +// await promisify(server.listen.bind(server))(0) +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.destroy()) + +// await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => { +// t.strictEqual(statusCode, 200) +// t.strictEqual(headers['content-type'], 'text/plain') +// return new PassThrough() +// }) + +// await t.completed +// }) + +// test('stream get error async/await', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const server = createServer((req, res) => { +// res.destroy() +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.destroy()) + +// await client.stream({ path: '/', method: 'GET' }, () => { + +// }) +// .catch((err) => { +// t.ok(err) +// }) +// }) + +// await t.completed +// }) + +// test('pipeline get', async (t) => { +// t = tspl(t, { plan: 5 }) + +// const server = createServer((req, res) => { +// t.strictEqual('/', req.url) +// t.strictEqual('GET', req.method) +// res.setHeader('content-type', 'text/plain') +// res.end('hello') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.destroy()) + +// const bufs = [] +// client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { +// t.strictEqual(statusCode, 200) +// t.strictEqual(headers['content-type'], 'text/plain') +// return body +// }) +// .end() +// .on('data', (buf) => { +// bufs.push(buf) +// }) +// .on('end', () => { +// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) +// }) +// }) + +// await t.completed +// }) test('backpressure algorithm', async (t) => { t = tspl(t, { plan: 12 }) @@ -422,731 +422,731 @@ test('backpressure algorithm', async (t) => { t.end() }) -test('busy', async (t) => { - t = tspl(t, { plan: 8 * 16 + 2 + 1 }) - - const server = createServer((req, res) => { - t.strictEqual('/', req.url) - t.strictEqual('GET', req.method) - res.setHeader('content-type', 'text/plain') - res.end('hello') - }) - after(() => server.close()) - - const connections = 2 - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections, - pipelining: 2 - }) - client.on('drain', () => { - t.ok(true, 'pass') - }) - client.on('connect', () => { - t.ok(true, 'pass') - }) - after(() => client.destroy()) - - for (let n = 1; n <= 8; ++n) { - client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { - t.ifError(err) - t.strictEqual(statusCode, 200) - t.strictEqual(headers['content-type'], 'text/plain') - const bufs = [] - body.on('data', (buf) => { - bufs.push(buf) - }) - body.on('end', () => { - t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) - }) - }) - t.strictEqual(client[kPending], n) - t.strictEqual(client[kBusy], n > 1) - t.strictEqual(client[kSize], n) - t.strictEqual(client[kRunning], 0) - - t.strictEqual(client.stats.connected, 0) - t.strictEqual(client.stats.free, 0) - t.strictEqual(client.stats.queued, Math.max(n - connections, 0)) - t.strictEqual(client.stats.pending, n) - t.strictEqual(client.stats.size, n) - t.strictEqual(client.stats.running, 0) - } - }) - - await t.completed -}) - -test('invalid pool dispatch options', async (t) => { - t = tspl(t, { plan: 2 }) - const pool = new Pool('http://notahost') - t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler') - t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler') -}) - -test('pool upgrade promise', async (t) => { - t = tspl(t, { plan: 2 }) - - const server = net.createServer((c) => { - c.on('data', (d) => { - c.write('HTTP/1.1 101\r\n') - c.write('hello: world\r\n') - c.write('connection: upgrade\r\n') - c.write('upgrade: websocket\r\n') - c.write('\r\n') - c.write('Body') - }) - - c.on('end', () => { - c.end() - }) - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.close()) - - const { headers, socket } = await client.upgrade({ - path: '/', - method: 'GET', - protocol: 'Websocket' - }) - - let recvData = '' - socket.on('data', (d) => { - recvData += d - }) - - socket.on('close', () => { - t.strictEqual(recvData.toString(), 'Body') - }) - - t.deepStrictEqual(headers, { - hello: 'world', - connection: 'upgrade', - upgrade: 'websocket' - }) - socket.end() - }) - - await t.completed -}) - -test('pool connect', async (t) => { - t = tspl(t, { plan: 1 }) - - const server = createServer((c) => { - t.fail() - }) - server.on('connect', (req, socket, firstBodyChunk) => { - socket.write('HTTP/1.1 200 Connection established\r\n\r\n') - - let data = firstBodyChunk.toString() - socket.on('data', (buf) => { - data += buf.toString() - }) - - socket.on('end', () => { - socket.end(data) - }) - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.close()) - - const { socket } = await client.connect({ - path: '/' - }) - - let recvData = '' - socket.on('data', (d) => { - recvData += d - }) - - socket.on('end', () => { - t.strictEqual(recvData.toString(), 'Body') - }) - - socket.write('Body') - socket.end() - }) - - await t.completed -}) - -test('pool dispatch', async (t) => { - t = tspl(t, { plan: 2 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.close()) - - let buf = '' - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - buf += chunk - }, - onComplete () { - t.strictEqual(buf, 'asd') - }, - onError () { - } - }) - }) - - await t.completed -}) - -test('pool pipeline args validation', async (t) => { - t = tspl(t, { plan: 2 }) - - const client = new Pool('http://localhost:5000') - - const ret = client.pipeline(null, () => {}) - ret.on('error', (err) => { - t.ok(/opts/.test(err.message)) - t.ok(err instanceof errors.InvalidArgumentError) - }) - - await t.completed -}) - -test('300 requests succeed', async (t) => { - t = tspl(t, { plan: 300 * 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1 - }) - after(() => client.destroy()) - - for (let n = 0; n < 300; ++n) { - client.request({ - path: '/', - method: 'GET' - }, (err, data) => { - t.ifError(err) - data.body.on('data', (chunk) => { - t.strictEqual(chunk.toString(), 'asd') - }).on('end', () => { - t.ok(true, 'pass') - }) - }) - } - }) - - await t.completed -}) - -test('pool connect error', async (t) => { - t = tspl(t, { plan: 1 }) - - const server = createServer((c) => { - t.fail() - }) - server.on('connect', (req, socket, firstBodyChunk) => { - socket.destroy() - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.close()) - - try { - await client.connect({ - path: '/' - }) - } catch (err) { - t.ok(err) - } - }) - - await t.completed -}) - -test('pool upgrade error', async (t) => { - t = tspl(t, { plan: 1 }) - - const server = net.createServer((c) => { - c.on('data', (d) => { - c.write('HTTP/1.1 101\r\n') - c.write('hello: world\r\n') - c.write('connection: upgrade\r\n') - c.write('\r\n') - c.write('Body') - }) - c.on('error', () => { - // Whether we get an error, end or close is undefined. - // Ignore error. - }) - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`) - after(() => client.close()) - - try { - await client.upgrade({ - path: '/', - method: 'GET', - protocol: 'Websocket' - }) - } catch (err) { - t.ok(err) - } - }) - - await t.completed -}) - -test('pool dispatch error', async (t) => { - t = tspl(t, { plan: 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - }, - onComplete () { - t.ok(true, 'pass') - }, - onError () { - } - }) - - client.dispatch({ - path: '/', - method: 'GET', - headers: { - 'transfer-encoding': 'fail' - } - }, { - onConnect () { - t.fail() - }, - onHeaders (statusCode, headers) { - t.fail() - }, - onData (chunk) { - t.fail() - }, - onError (err) { - t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') - } - }) - }) - - await t.completed -}) - -test('pool request abort in queue', async (t) => { - t = tspl(t, { plan: 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - }, - onComplete () { - t.ok(true, 'pass') - }, - onError () { - } - }) - - const signal = new EventEmitter() - client.request({ - path: '/', - method: 'GET', - signal - }, (err) => { - t.strictEqual(err.code, 'UND_ERR_ABORTED') - }) - signal.emit('abort') - }) - - await t.completed -}) - -test('pool stream abort in queue', async (t) => { - t = tspl(t, { plan: 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - }, - onComplete () { - t.ok(true, 'pass') - }, - onError () { - } - }) - - const signal = new EventEmitter() - client.stream({ - path: '/', - method: 'GET', - signal - }, ({ body }) => body, (err) => { - t.strictEqual(err.code, 'UND_ERR_ABORTED') - }) - signal.emit('abort') - }) - - await t.completed -}) - -test('pool pipeline abort in queue', async (t) => { - t = tspl(t, { plan: 3 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onConnect () { - }, - onHeaders (statusCode, headers) { - t.strictEqual(statusCode, 200) - }, - onData (chunk) { - }, - onComplete () { - t.ok(true, 'pass') - }, - onError () { - } - }) - - const signal = new EventEmitter() - client.pipeline({ - path: '/', - method: 'GET', - signal - }, ({ body }) => body).end().on('error', (err) => { - t.strictEqual(err.code, 'UND_ERR_ABORTED') - }) - signal.emit('abort') - }) - - await t.completed -}) - -test('pool stream constructor error destroy body', async (t) => { - t = tspl(t, { plan: 4 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - { - const body = new Readable({ - read () { - } - }) - client.stream({ - path: '/', - method: 'GET', - body, - headers: { - 'transfer-encoding': 'fail' - } - }, () => { - t.fail() - }, (err) => { - t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') - t.strictEqual(body.destroyed, true) - }) - } - - { - const body = new Readable({ - read () { - } - }) - client.stream({ - path: '/', - method: 'CONNECT', - body - }, () => { - t.fail() - }, (err) => { - t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') - t.strictEqual(body.destroyed, true) - }) - } - }) - - await t.completed -}) - -test('pool request constructor error destroy body', async (t) => { - t = tspl(t, { plan: 4 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.close()) - - { - const body = new Readable({ - read () { - } - }) - client.request({ - path: '/', - method: 'GET', - body, - headers: { - 'transfer-encoding': 'fail' - } - }, (err) => { - t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') - t.strictEqual(body.destroyed, true) - }) - } - - { - const body = new Readable({ - read () { - } - }) - client.request({ - path: '/', - method: 'CONNECT', - body - }, (err) => { - t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') - t.strictEqual(body.destroyed, true) - }) - } - }) - - await t.completed -}) - -test('pool close waits for all requests', async (t) => { - t = tspl(t, { plan: 5 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.destroy()) - - client.request({ - path: '/', - method: 'GET' - }, (err) => { - t.ifError(err) - }) - - client.request({ - path: '/', - method: 'GET' - }, (err) => { - t.ifError(err) - }) - - client.close(() => { - t.ok(true, 'pass') - }) - - client.close(() => { - t.ok(true, 'pass') - }) - - client.request({ - path: '/', - method: 'GET' - }, (err) => { - t.ok(err instanceof errors.ClientClosedError) - }) - }) - - await t.completed -}) - -test('pool destroyed', async (t) => { - t = tspl(t, { plan: 1 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.destroy()) - - client.destroy() - client.request({ - path: '/', - method: 'GET' - }, (err) => { - t.ok(err instanceof errors.ClientDestroyedError) - }) - }) - - await t.completed -}) - -test('pool destroy fails queued requests', async (t) => { - t = tspl(t, { plan: 6 }) - - const server = createServer((req, res) => { - res.end('asd') - }) - after(() => server.close()) - - server.listen(0, async () => { - const client = new Pool(`http://localhost:${server.address().port}`, { - connections: 1, - pipelining: 1 - }) - after(() => client.destroy()) - - const _err = new Error() - client.request({ - path: '/', - method: 'GET' - }, (err) => { - t.strictEqual(err, _err) - }) - - client.request({ - path: '/', - method: 'GET' - }, (err) => { - t.strictEqual(err, _err) - }) - - t.strictEqual(client.destroyed, false) - client.destroy(_err, () => { - t.ok(true, 'pass') - }) - t.strictEqual(client.destroyed, true) - - client.request({ - path: '/', - method: 'GET' - }, (err) => { - t.ok(err instanceof errors.ClientDestroyedError) - }) - }) - await t.completed -}) +// test('busy', async (t) => { +// t = tspl(t, { plan: 8 * 16 + 2 + 1 }) + +// const server = createServer((req, res) => { +// t.strictEqual('/', req.url) +// t.strictEqual('GET', req.method) +// res.setHeader('content-type', 'text/plain') +// res.end('hello') +// }) +// after(() => server.close()) + +// const connections = 2 + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections, +// pipelining: 2 +// }) +// client.on('drain', () => { +// t.ok(true, 'pass') +// }) +// client.on('connect', () => { +// t.ok(true, 'pass') +// }) +// after(() => client.destroy()) + +// for (let n = 1; n <= 8; ++n) { +// client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { +// t.ifError(err) +// t.strictEqual(statusCode, 200) +// t.strictEqual(headers['content-type'], 'text/plain') +// const bufs = [] +// body.on('data', (buf) => { +// bufs.push(buf) +// }) +// body.on('end', () => { +// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) +// }) +// }) +// t.strictEqual(client[kPending], n) +// t.strictEqual(client[kBusy], n > 1) +// t.strictEqual(client[kSize], n) +// t.strictEqual(client[kRunning], 0) + +// t.strictEqual(client.stats.connected, 0) +// t.strictEqual(client.stats.free, 0) +// t.strictEqual(client.stats.queued, Math.max(n - connections, 0)) +// t.strictEqual(client.stats.pending, n) +// t.strictEqual(client.stats.size, n) +// t.strictEqual(client.stats.running, 0) +// } +// }) + +// await t.completed +// }) + +// test('invalid pool dispatch options', async (t) => { +// t = tspl(t, { plan: 2 }) +// const pool = new Pool('http://notahost') +// t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler') +// t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler') +// }) + +// test('pool upgrade promise', async (t) => { +// t = tspl(t, { plan: 2 }) + +// const server = net.createServer((c) => { +// c.on('data', (d) => { +// c.write('HTTP/1.1 101\r\n') +// c.write('hello: world\r\n') +// c.write('connection: upgrade\r\n') +// c.write('upgrade: websocket\r\n') +// c.write('\r\n') +// c.write('Body') +// }) + +// c.on('end', () => { +// c.end() +// }) +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.close()) + +// const { headers, socket } = await client.upgrade({ +// path: '/', +// method: 'GET', +// protocol: 'Websocket' +// }) + +// let recvData = '' +// socket.on('data', (d) => { +// recvData += d +// }) + +// socket.on('close', () => { +// t.strictEqual(recvData.toString(), 'Body') +// }) + +// t.deepStrictEqual(headers, { +// hello: 'world', +// connection: 'upgrade', +// upgrade: 'websocket' +// }) +// socket.end() +// }) + +// await t.completed +// }) + +// test('pool connect', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const server = createServer((c) => { +// t.fail() +// }) +// server.on('connect', (req, socket, firstBodyChunk) => { +// socket.write('HTTP/1.1 200 Connection established\r\n\r\n') + +// let data = firstBodyChunk.toString() +// socket.on('data', (buf) => { +// data += buf.toString() +// }) + +// socket.on('end', () => { +// socket.end(data) +// }) +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.close()) + +// const { socket } = await client.connect({ +// path: '/' +// }) + +// let recvData = '' +// socket.on('data', (d) => { +// recvData += d +// }) + +// socket.on('end', () => { +// t.strictEqual(recvData.toString(), 'Body') +// }) + +// socket.write('Body') +// socket.end() +// }) + +// await t.completed +// }) + +// test('pool dispatch', async (t) => { +// t = tspl(t, { plan: 2 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.close()) + +// let buf = '' +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// buf += chunk +// }, +// onComplete () { +// t.strictEqual(buf, 'asd') +// }, +// onError () { +// } +// }) +// }) + +// await t.completed +// }) + +// test('pool pipeline args validation', async (t) => { +// t = tspl(t, { plan: 2 }) + +// const client = new Pool('http://localhost:5000') + +// const ret = client.pipeline(null, () => {}) +// ret.on('error', (err) => { +// t.ok(/opts/.test(err.message)) +// t.ok(err instanceof errors.InvalidArgumentError) +// }) + +// await t.completed +// }) + +// test('300 requests succeed', async (t) => { +// t = tspl(t, { plan: 300 * 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1 +// }) +// after(() => client.destroy()) + +// for (let n = 0; n < 300; ++n) { +// client.request({ +// path: '/', +// method: 'GET' +// }, (err, data) => { +// t.ifError(err) +// data.body.on('data', (chunk) => { +// t.strictEqual(chunk.toString(), 'asd') +// }).on('end', () => { +// t.ok(true, 'pass') +// }) +// }) +// } +// }) + +// await t.completed +// }) + +// test('pool connect error', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const server = createServer((c) => { +// t.fail() +// }) +// server.on('connect', (req, socket, firstBodyChunk) => { +// socket.destroy() +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.close()) + +// try { +// await client.connect({ +// path: '/' +// }) +// } catch (err) { +// t.ok(err) +// } +// }) + +// await t.completed +// }) + +// test('pool upgrade error', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const server = net.createServer((c) => { +// c.on('data', (d) => { +// c.write('HTTP/1.1 101\r\n') +// c.write('hello: world\r\n') +// c.write('connection: upgrade\r\n') +// c.write('\r\n') +// c.write('Body') +// }) +// c.on('error', () => { +// // Whether we get an error, end or close is undefined. +// // Ignore error. +// }) +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`) +// after(() => client.close()) + +// try { +// await client.upgrade({ +// path: '/', +// method: 'GET', +// protocol: 'Websocket' +// }) +// } catch (err) { +// t.ok(err) +// } +// }) + +// await t.completed +// }) + +// test('pool dispatch error', async (t) => { +// t = tspl(t, { plan: 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// }, +// onComplete () { +// t.ok(true, 'pass') +// }, +// onError () { +// } +// }) + +// client.dispatch({ +// path: '/', +// method: 'GET', +// headers: { +// 'transfer-encoding': 'fail' +// } +// }, { +// onConnect () { +// t.fail() +// }, +// onHeaders (statusCode, headers) { +// t.fail() +// }, +// onData (chunk) { +// t.fail() +// }, +// onError (err) { +// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') +// } +// }) +// }) + +// await t.completed +// }) + +// test('pool request abort in queue', async (t) => { +// t = tspl(t, { plan: 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// }, +// onComplete () { +// t.ok(true, 'pass') +// }, +// onError () { +// } +// }) + +// const signal = new EventEmitter() +// client.request({ +// path: '/', +// method: 'GET', +// signal +// }, (err) => { +// t.strictEqual(err.code, 'UND_ERR_ABORTED') +// }) +// signal.emit('abort') +// }) + +// await t.completed +// }) + +// test('pool stream abort in queue', async (t) => { +// t = tspl(t, { plan: 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// }, +// onComplete () { +// t.ok(true, 'pass') +// }, +// onError () { +// } +// }) + +// const signal = new EventEmitter() +// client.stream({ +// path: '/', +// method: 'GET', +// signal +// }, ({ body }) => body, (err) => { +// t.strictEqual(err.code, 'UND_ERR_ABORTED') +// }) +// signal.emit('abort') +// }) + +// await t.completed +// }) + +// test('pool pipeline abort in queue', async (t) => { +// t = tspl(t, { plan: 3 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// client.dispatch({ +// path: '/', +// method: 'GET' +// }, { +// onConnect () { +// }, +// onHeaders (statusCode, headers) { +// t.strictEqual(statusCode, 200) +// }, +// onData (chunk) { +// }, +// onComplete () { +// t.ok(true, 'pass') +// }, +// onError () { +// } +// }) + +// const signal = new EventEmitter() +// client.pipeline({ +// path: '/', +// method: 'GET', +// signal +// }, ({ body }) => body).end().on('error', (err) => { +// t.strictEqual(err.code, 'UND_ERR_ABORTED') +// }) +// signal.emit('abort') +// }) + +// await t.completed +// }) + +// test('pool stream constructor error destroy body', async (t) => { +// t = tspl(t, { plan: 4 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// { +// const body = new Readable({ +// read () { +// } +// }) +// client.stream({ +// path: '/', +// method: 'GET', +// body, +// headers: { +// 'transfer-encoding': 'fail' +// } +// }, () => { +// t.fail() +// }, (err) => { +// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') +// t.strictEqual(body.destroyed, true) +// }) +// } + +// { +// const body = new Readable({ +// read () { +// } +// }) +// client.stream({ +// path: '/', +// method: 'CONNECT', +// body +// }, () => { +// t.fail() +// }, (err) => { +// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') +// t.strictEqual(body.destroyed, true) +// }) +// } +// }) + +// await t.completed +// }) + +// test('pool request constructor error destroy body', async (t) => { +// t = tspl(t, { plan: 4 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.close()) + +// { +// const body = new Readable({ +// read () { +// } +// }) +// client.request({ +// path: '/', +// method: 'GET', +// body, +// headers: { +// 'transfer-encoding': 'fail' +// } +// }, (err) => { +// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') +// t.strictEqual(body.destroyed, true) +// }) +// } + +// { +// const body = new Readable({ +// read () { +// } +// }) +// client.request({ +// path: '/', +// method: 'CONNECT', +// body +// }, (err) => { +// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') +// t.strictEqual(body.destroyed, true) +// }) +// } +// }) + +// await t.completed +// }) + +// test('pool close waits for all requests', async (t) => { +// t = tspl(t, { plan: 5 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.destroy()) + +// client.request({ +// path: '/', +// method: 'GET' +// }, (err) => { +// t.ifError(err) +// }) + +// client.request({ +// path: '/', +// method: 'GET' +// }, (err) => { +// t.ifError(err) +// }) + +// client.close(() => { +// t.ok(true, 'pass') +// }) + +// client.close(() => { +// t.ok(true, 'pass') +// }) + +// client.request({ +// path: '/', +// method: 'GET' +// }, (err) => { +// t.ok(err instanceof errors.ClientClosedError) +// }) +// }) + +// await t.completed +// }) + +// test('pool destroyed', async (t) => { +// t = tspl(t, { plan: 1 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.destroy()) + +// client.destroy() +// client.request({ +// path: '/', +// method: 'GET' +// }, (err) => { +// t.ok(err instanceof errors.ClientDestroyedError) +// }) +// }) + +// await t.completed +// }) + +// test('pool destroy fails queued requests', async (t) => { +// t = tspl(t, { plan: 6 }) + +// const server = createServer((req, res) => { +// res.end('asd') +// }) +// after(() => server.close()) + +// server.listen(0, async () => { +// const client = new Pool(`http://localhost:${server.address().port}`, { +// connections: 1, +// pipelining: 1 +// }) +// after(() => client.destroy()) + +// const _err = new Error() +// client.request({ +// path: '/', +// method: 'GET' +// }, (err) => { +// t.strictEqual(err, _err) +// }) + +// client.request({ +// path: '/', +// method: 'GET' +// }, (err) => { +// t.strictEqual(err, _err) +// }) + +// t.strictEqual(client.destroyed, false) +// client.destroy(_err, () => { +// t.ok(true, 'pass') +// }) +// t.strictEqual(client.destroyed, true) + +// client.request({ +// path: '/', +// method: 'GET' +// }, (err) => { +// t.ok(err instanceof errors.ClientDestroyedError) +// }) +// }) +// await t.completed +// }) From 0653f406382a307d126776937f74a8795871dbdf Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 26 Jun 2024 08:35:11 +0200 Subject: [PATCH 5/5] fixup --- lib/dispatcher/balanced-pool.js | 57 +- lib/dispatcher/client.js | 2 +- lib/dispatcher/dispatcher.js | 11 +- lib/dispatcher/pool-base.js | 86 +- lib/dispatcher/pool.js | 26 +- test/pool.js | 2094 +++++++++++++++---------------- 6 files changed, 1175 insertions(+), 1101 deletions(-) diff --git a/lib/dispatcher/balanced-pool.js b/lib/dispatcher/balanced-pool.js index 424baad662a..fa72eebeee7 100644 --- a/lib/dispatcher/balanced-pool.js +++ b/lib/dispatcher/balanced-pool.js @@ -10,7 +10,8 @@ const { kNeedDrain, kAddClient, kRemoveClient, - kGetDispatcher + kGetDispatcher, + kGetDispatchers } = require('./pool-base') const Pool = require('./pool') const { kUrl, kInterceptors } = require('../core/symbols') @@ -130,7 +131,7 @@ class BalancedPool extends PoolBase { .map((p) => p[kUrl].origin) } - * [kGetDispatcher] () { + [kGetDispatcher] () { // We validate that pools is greater than 0, // otherwise we would have to wait until an upstream // is added, which might never happen. @@ -177,13 +178,61 @@ class BalancedPool extends PoolBase { } } if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { - yield pool + return pool } } this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] this[kIndex] = maxWeightIndex - yield this[kClients][maxWeightIndex] + return this[kClients][maxWeightIndex] + } + + [kGetDispatchers] () { + // We validate that pools is greater than 0, + // otherwise we would have to wait until an upstream + // is added, which might never happen. + if (this[kClients].length === 0) { + throw new BalancedPoolMissingUpstreamError() + } + + let maxWeightIndex = 0 + { + let maxWeight = this[kClients][0][kWeight] + for (let n = 1; n < this[kClients].length; n++) { + if (this[kClients][n][kWeight] > maxWeight) { + maxWeight = this[kClients][n][kWeight] + maxWeightIndex = n + } + } + } + + let counter = 0 + while (counter++ < this[kClients].length) { + this[kIndex] = (this[kIndex] + 1) % this[kClients].length + const pool = this[kClients][this[kIndex]] + + // find pool index with the largest weight + if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) { + maxWeightIndex = this[kIndex] + } + + // decrease the current weight every `this[kClients].length`. + if (this[kIndex] === 0) { + // Set the current weight to the next lower weight. + this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor] + + if (this[kCurrentWeight] <= 0) { + this[kCurrentWeight] = this[kMaxWeightPerServer] + } + } + if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { + return pool + } + } + + this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] + this[kIndex] = maxWeightIndex + return this[kClients][maxWeightIndex] } } diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 3d16250180c..fb8207be645 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -531,7 +531,7 @@ function emitDrain (client) { client[kNeedDrain] = 0 for (const callback of client[kDrainQueue].splice(0)) { - callback(null, client, client[kUrl], [client]) + callback.call(client, client[kUrl], [client]) } client.emit('drain', client[kUrl], [client]) diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index 2a1ab9206df..c2625100479 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -30,7 +30,7 @@ class Dispatcher extends EventEmitter { dispatch = interceptor(dispatch) - if (dispatch == null || typeof dispatch !== 'function' || dispatch.length !== 2) { + if (dispatch == null || typeof dispatch !== 'function') { throw new TypeError('invalid interceptor') } } @@ -51,12 +51,9 @@ class ComposedDispatcher extends Dispatcher { dispatch (opts, handler, onDrain) { // Allocating a closure here every time is not great... - return this.#dispatch(opts, handler, onDrain ?? ((err) => { - if (err) { - handler.onError(err) - } else { - this.#dispatch(opts, handler, onDrain) - } + return this.#dispatch(opts, handler, onDrain ?? (() => { + // XXX: Stop if error? + this.#dispatch(opts, handler, onDrain) })) } diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index 13ccacd3afb..493258038cc 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -7,6 +7,7 @@ const PoolStats = require('./pool-stats') const kClients = Symbol('clients') const kNeedDrain = Symbol('needDrain') +const kDrainQueue = Symbol('drainQueue') const kQueue = Symbol('queue') const kClosedResolve = Symbol('closed resolve') const kOnDrain = Symbol('onDrain') @@ -14,10 +15,10 @@ const kOnConnect = Symbol('onConnect') const kOnDisconnect = Symbol('onDisconnect') const kOnConnectionError = Symbol('onConnectionError') const kGetDispatcher = Symbol('get dispatcher') +const kGetDispatchers = Symbol('get dispatchers') const kAddClient = Symbol('add client') const kRemoveClient = Symbol('remove client') const kStats = Symbol('stats') -const kDrainQueue = Symbol('kDrainQueue') class PoolBase extends DispatcherBase { constructor () { @@ -26,45 +27,41 @@ class PoolBase extends DispatcherBase { this[kQueue] = new FixedQueue() this[kClients] = [] this[kQueued] = 0 - this[kNeedDrain] = false + this[kDrainQueue] = [] const pool = this - this[kOnDrain] = function onDrain (err, dispatcher, ...args) { + this[kOnDrain] = function onDrain (origin, targets) { const queue = pool[kQueue] if (queue.isEmpty()) { return } - const { opts, handler } = queue.shift() - pool[kQueued]-- + let needDrain = false - if (!dispatcher.dispatch(opts, handler, this[kOnDrain])) { - // XXX: unshift - queue.push({ opts, handler }) - pool[kQueued]++ + while (!needDrain) { + const item = queue.shift() + if (!item) { + break + } + pool[kQueued]-- + needDrain = !this.dispatch(item.opts, item.handler) } - if (!queue.isEmpty()) { - return - } + this[kNeedDrain] = needDrain - // XXX: notify one for (const callback of pool[kDrainQueue].splice(0)) { - callback(err) - } - - if (!queue.isEmpty()) { - return + callback.call(this, origin, [pool, ...targets]) } - if (pool[kNeedDrain]) { + if (!this[kNeedDrain] && pool[kNeedDrain]) { + // Legacy... pool[kNeedDrain] = false - pool.emit('drain', ...args) + pool.emit('drain', origin, [pool, ...targets]) } - if (pool[kClosedResolve]) { + if (pool[kClosedResolve] && queue.isEmpty()) { Promise .all(pool[kClients].map(c => c.close())) .then(pool[kClosedResolve]) @@ -84,11 +81,10 @@ class PoolBase extends DispatcherBase { } this[kStats] = new PoolStats(this) - this[kDrainQueue] = [] } get [kBusy] () { - return this[kQueued] > 0 + return this[kNeedDrain] } get [kConnected] () { @@ -138,10 +134,6 @@ class PoolBase extends DispatcherBase { } async [kDestroy] (err) { - for (const callback of this[kDrainQueue].splice(0)) { - callback(err) - } - while (true) { const item = this[kQueue].shift() if (!item) { @@ -154,21 +146,30 @@ class PoolBase extends DispatcherBase { } [kDispatch] (opts, handler, onDrain) { - for (const dispatcher of this[kGetDispatcher]()) { - if (dispatcher.dispatch(opts, handler, this[kOnDrain])) { - return true + if (onDrain) { + for (const dispatcher of this[kGetDispatchers]()) { + if (dispatcher.dispatch(opts, handler, this[kOnDrain])) { + return true + } } - } - if (onDrain) { this[kDrainQueue].push(onDrain) + + return false } else { - this[kQueue].push({ opts, handler }) - this[kQueued]++ - this[kNeedDrain] = true - } + // Legacy... + const dispatcher = this[kGetDispatcher]() + if (!dispatcher) { + this[kNeedDrain] = true + this[kQueue].push({ opts, handler }) + this[kQueued]++ + } else if (!dispatcher.dispatch(opts, handler)) { + dispatcher[kNeedDrain] = true + this[kNeedDrain] = !this[kGetDispatcher]() + } - return false + return !this[kNeedDrain] + } } [kAddClient] (client) { @@ -180,6 +181,14 @@ class PoolBase extends DispatcherBase { this[kClients].push(client) + if (this[kNeedDrain]) { + queueMicrotask(() => { + if (this[kNeedDrain]) { + this[kOnDrain](client[kUrl], [this, client]) + } + }) + } + return this } @@ -205,5 +214,6 @@ module.exports = { kNeedDrain, kAddClient, kRemoveClient, - kGetDispatcher + kGetDispatcher, + kGetDispatchers } diff --git a/lib/dispatcher/pool.js b/lib/dispatcher/pool.js index 659f8e486b6..2aad43ef65c 100644 --- a/lib/dispatcher/pool.js +++ b/lib/dispatcher/pool.js @@ -3,8 +3,10 @@ const { PoolBase, kClients, + kNeedDrain, kAddClient, - kGetDispatcher + kGetDispatcher, + kGetDispatchers } = require('./pool-base') const Client = require('./client') const { @@ -74,10 +76,26 @@ class Pool extends PoolBase { this[kFactory] = factory } - * [kGetDispatcher] () { - yield * this[kClients] + [kGetDispatcher] () { + for (const client of this[kClients]) { + if (!client[kNeedDrain]) { + return client + } + } + + if (!this[kConnections] || this[kClients].length < this[kConnections]) { + const dispatcher = this[kFactory](this[kUrl], this[kOptions]) + this[kAddClient](dispatcher) + return dispatcher + } + } + + * [kGetDispatchers]() { + for (const client of this[kClients]) { + yield client + } - while (!this[kConnections] || this[kClients].length < this[kConnections]) { + if (!this[kConnections] || this[kClients].length < this[kConnections]) { const dispatcher = this[kFactory](this[kUrl], this[kOptions]) this[kAddClient](dispatcher) yield dispatcher diff --git a/test/pool.js b/test/pool.js index f0e9cc2ca6d..b75cd530d43 100644 --- a/test/pool.js +++ b/test/pool.js @@ -24,325 +24,325 @@ const { errors } = require('..') -// test('throws when connection is infinite', async (t) => { -// t = tspl(t, { plan: 2 }) - -// try { -// new Pool(null, { connections: 0 / 0 }) // eslint-disable-line -// } catch (e) { -// t.ok(e instanceof errors.InvalidArgumentError) -// t.strictEqual(e.message, 'invalid connections') -// } -// }) - -// test('throws when connections is negative', async (t) => { -// t = tspl(t, { plan: 2 }) - -// try { -// new Pool(null, { connections: -1 }) // eslint-disable-line no-new -// } catch (e) { -// t.ok(e instanceof errors.InvalidArgumentError) -// t.strictEqual(e.message, 'invalid connections') -// } -// }) - -// test('throws when connection is not number', async (t) => { -// t = tspl(t, { plan: 2 }) - -// try { -// new Pool(null, { connections: true }) // eslint-disable-line no-new -// } catch (e) { -// t.ok(e instanceof errors.InvalidArgumentError) -// t.strictEqual(e.message, 'invalid connections') -// } -// }) - -// test('throws when factory is not a function', async (t) => { -// t = tspl(t, { plan: 2 }) - -// try { -// new Pool(null, { factory: '' }) // eslint-disable-line no-new -// } catch (e) { -// t.ok(e instanceof errors.InvalidArgumentError) -// t.strictEqual(e.message, 'factory must be a function.') -// } -// }) - -// test('does not throw when connect is a function', async (t) => { -// t = tspl(t, { plan: 1 }) - -// t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} })) -// }) - -// test('connect/disconnect event(s)', async (t) => { -// const clients = 2 - -// t = tspl(t, { plan: clients * 6 }) - -// const server = createServer((req, res) => { -// res.writeHead(200, { -// Connection: 'keep-alive', -// 'Keep-Alive': 'timeout=1s' -// }) -// res.end('ok') -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const pool = new Pool(`http://localhost:${server.address().port}`, { -// connections: clients, -// keepAliveTimeoutThreshold: 100 -// }) -// after(() => pool.close()) - -// pool.on('connect', (origin, [pool, client]) => { -// t.strictEqual(client instanceof Client, true) -// }) -// pool.on('disconnect', (origin, [pool, client], error) => { -// t.ok(client instanceof Client) -// t.ok(error instanceof errors.InformationalError) -// t.strictEqual(error.code, 'UND_ERR_INFO') -// t.strictEqual(error.message, 'socket idle timeout') -// }) - -// for (let i = 0; i < clients; i++) { -// pool.request({ -// path: '/', -// method: 'GET' -// }, (err, { headers, body }) => { -// t.ifError(err) -// body.resume() -// }) -// } -// }) - -// await t.completed -// }) - -// test('basic get', async (t) => { -// t = tspl(t, { plan: 14 }) - -// const server = createServer((req, res) => { -// t.strictEqual('/', req.url) -// t.strictEqual('GET', req.method) -// res.setHeader('content-type', 'text/plain') -// res.end('hello') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// t.strictEqual(client[kUrl].origin, `http://localhost:${server.address().port}`) - -// client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { -// t.ifError(err) -// t.strictEqual(statusCode, 200) -// t.strictEqual(headers['content-type'], 'text/plain') -// const bufs = [] -// body.on('data', (buf) => { -// bufs.push(buf) -// }) -// body.on('end', () => { -// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) -// }) -// }) - -// t.strictEqual(client.destroyed, false) -// t.strictEqual(client.closed, false) -// client.close((err) => { -// t.ifError(err) -// t.strictEqual(client.destroyed, true) -// client.destroy((err) => { -// t.ifError(err) -// client.close((err) => { -// t.ok(err instanceof errors.ClientDestroyedError) -// }) -// }) -// }) -// t.strictEqual(client.closed, true) -// }) - -// await t.completed -// }) - -// test('URL as arg', async (t) => { -// t = tspl(t, { plan: 9 }) - -// const server = createServer((req, res) => { -// t.strictEqual('/', req.url) -// t.strictEqual('GET', req.method) -// res.setHeader('content-type', 'text/plain') -// res.end('hello') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const url = new URL('http://localhost') -// url.port = server.address().port -// const client = new Pool(url) -// after(() => client.destroy()) - -// client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { -// t.ifError(err) -// t.strictEqual(statusCode, 200) -// t.strictEqual(headers['content-type'], 'text/plain') -// const bufs = [] -// body.on('data', (buf) => { -// bufs.push(buf) -// }) -// body.on('end', () => { -// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) -// }) -// }) - -// client.close((err) => { -// t.ifError(err) -// client.destroy((err) => { -// t.ifError(err) -// client.close((err) => { -// t.ok(err instanceof errors.ClientDestroyedError) -// }) -// }) -// }) -// }) - -// await t.completed -// }) - -// test('basic get error async/await', async (t) => { -// t = tspl(t, { plan: 2 }) - -// const server = createServer((req, res) => { -// res.destroy() -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// await client.request({ path: '/', method: 'GET' }) -// .catch((err) => { -// t.ok(err) -// }) - -// await client.destroy() - -// await client.close().catch((err) => { -// t.ok(err instanceof errors.ClientDestroyedError) -// }) -// }) - -// await t.completed -// }) - -// test('basic get with async/await', async (t) => { -// t = tspl(t, { plan: 4 }) - -// const server = createServer((req, res) => { -// t.strictEqual('/', req.url) -// t.strictEqual('GET', req.method) -// res.setHeader('content-type', 'text/plain') -// res.end('hello') -// }) -// after(() => server.close()) - -// await promisify(server.listen.bind(server))(0) -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) -// t.strictEqual(statusCode, 200) -// t.strictEqual(headers['content-type'], 'text/plain') - -// body.resume() -// await promisify(finished)(body) - -// await client.close() -// await client.destroy() -// }) - -// test('stream get async/await', async (t) => { -// t = tspl(t, { plan: 4 }) - -// const server = createServer((req, res) => { -// t.strictEqual('/', req.url) -// t.strictEqual('GET', req.method) -// res.setHeader('content-type', 'text/plain') -// res.end('hello') -// }) -// after(() => server.close()) - -// await promisify(server.listen.bind(server))(0) -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => { -// t.strictEqual(statusCode, 200) -// t.strictEqual(headers['content-type'], 'text/plain') -// return new PassThrough() -// }) - -// await t.completed -// }) - -// test('stream get error async/await', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const server = createServer((req, res) => { -// res.destroy() -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// await client.stream({ path: '/', method: 'GET' }, () => { - -// }) -// .catch((err) => { -// t.ok(err) -// }) -// }) - -// await t.completed -// }) - -// test('pipeline get', async (t) => { -// t = tspl(t, { plan: 5 }) - -// const server = createServer((req, res) => { -// t.strictEqual('/', req.url) -// t.strictEqual('GET', req.method) -// res.setHeader('content-type', 'text/plain') -// res.end('hello') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.destroy()) - -// const bufs = [] -// client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { -// t.strictEqual(statusCode, 200) -// t.strictEqual(headers['content-type'], 'text/plain') -// return body -// }) -// .end() -// .on('data', (buf) => { -// bufs.push(buf) -// }) -// .on('end', () => { -// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) -// }) -// }) - -// await t.completed -// }) +test('throws when connection is infinite', async (t) => { + t = tspl(t, { plan: 2 }) + + try { + new Pool(null, { connections: 0 / 0 }) // eslint-disable-line + } catch (e) { + t.ok(e instanceof errors.InvalidArgumentError) + t.strictEqual(e.message, 'invalid connections') + } +}) + +test('throws when connections is negative', async (t) => { + t = tspl(t, { plan: 2 }) + + try { + new Pool(null, { connections: -1 }) // eslint-disable-line no-new + } catch (e) { + t.ok(e instanceof errors.InvalidArgumentError) + t.strictEqual(e.message, 'invalid connections') + } +}) + +test('throws when connection is not number', async (t) => { + t = tspl(t, { plan: 2 }) + + try { + new Pool(null, { connections: true }) // eslint-disable-line no-new + } catch (e) { + t.ok(e instanceof errors.InvalidArgumentError) + t.strictEqual(e.message, 'invalid connections') + } +}) + +test('throws when factory is not a function', async (t) => { + t = tspl(t, { plan: 2 }) + + try { + new Pool(null, { factory: '' }) // eslint-disable-line no-new + } catch (e) { + t.ok(e instanceof errors.InvalidArgumentError) + t.strictEqual(e.message, 'factory must be a function.') + } +}) + +test('does not throw when connect is a function', async (t) => { + t = tspl(t, { plan: 1 }) + + t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} })) +}) + +test('connect/disconnect event(s)', async (t) => { + const clients = 2 + + t = tspl(t, { plan: clients * 6 }) + + const server = createServer((req, res) => { + res.writeHead(200, { + Connection: 'keep-alive', + 'Keep-Alive': 'timeout=1s' + }) + res.end('ok') + }) + after(() => server.close()) + + server.listen(0, () => { + const pool = new Pool(`http://localhost:${server.address().port}`, { + connections: clients, + keepAliveTimeoutThreshold: 100 + }) + after(() => pool.close()) + + pool.on('connect', (origin, [pool, client]) => { + t.strictEqual(client instanceof Client, true) + }) + pool.on('disconnect', (origin, [pool, client], error) => { + t.ok(client instanceof Client) + t.ok(error instanceof errors.InformationalError) + t.strictEqual(error.code, 'UND_ERR_INFO') + t.strictEqual(error.message, 'socket idle timeout') + }) + + for (let i = 0; i < clients; i++) { + pool.request({ + path: '/', + method: 'GET' + }, (err, { headers, body }) => { + t.ifError(err) + body.resume() + }) + } + }) + + await t.completed +}) + +test('basic get', async (t) => { + t = tspl(t, { plan: 14 }) + + const server = createServer((req, res) => { + t.strictEqual('/', req.url) + t.strictEqual('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + t.strictEqual(client[kUrl].origin, `http://localhost:${server.address().port}`) + + client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { + t.ifError(err) + t.strictEqual(statusCode, 200) + t.strictEqual(headers['content-type'], 'text/plain') + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + + t.strictEqual(client.destroyed, false) + t.strictEqual(client.closed, false) + client.close((err) => { + t.ifError(err) + t.strictEqual(client.destroyed, true) + client.destroy((err) => { + t.ifError(err) + client.close((err) => { + t.ok(err instanceof errors.ClientDestroyedError) + }) + }) + }) + t.strictEqual(client.closed, true) + }) + + await t.completed +}) + +test('URL as arg', async (t) => { + t = tspl(t, { plan: 9 }) + + const server = createServer((req, res) => { + t.strictEqual('/', req.url) + t.strictEqual('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + after(() => server.close()) + + server.listen(0, async () => { + const url = new URL('http://localhost') + url.port = server.address().port + const client = new Pool(url) + after(() => client.destroy()) + + client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { + t.ifError(err) + t.strictEqual(statusCode, 200) + t.strictEqual(headers['content-type'], 'text/plain') + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + + client.close((err) => { + t.ifError(err) + client.destroy((err) => { + t.ifError(err) + client.close((err) => { + t.ok(err instanceof errors.ClientDestroyedError) + }) + }) + }) + }) + + await t.completed +}) + +test('basic get error async/await', async (t) => { + t = tspl(t, { plan: 2 }) + + const server = createServer((req, res) => { + res.destroy() + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + await client.request({ path: '/', method: 'GET' }) + .catch((err) => { + t.ok(err) + }) + + await client.destroy() + + await client.close().catch((err) => { + t.ok(err instanceof errors.ClientDestroyedError) + }) + }) + + await t.completed +}) + +test('basic get with async/await', async (t) => { + t = tspl(t, { plan: 4 }) + + const server = createServer((req, res) => { + t.strictEqual('/', req.url) + t.strictEqual('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + after(() => server.close()) + + await promisify(server.listen.bind(server))(0) + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) + t.strictEqual(statusCode, 200) + t.strictEqual(headers['content-type'], 'text/plain') + + body.resume() + await promisify(finished)(body) + + await client.close() + await client.destroy() +}) + +test('stream get async/await', async (t) => { + t = tspl(t, { plan: 4 }) + + const server = createServer((req, res) => { + t.strictEqual('/', req.url) + t.strictEqual('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + after(() => server.close()) + + await promisify(server.listen.bind(server))(0) + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => { + t.strictEqual(statusCode, 200) + t.strictEqual(headers['content-type'], 'text/plain') + return new PassThrough() + }) + + await t.completed +}) + +test('stream get error async/await', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = createServer((req, res) => { + res.destroy() + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + await client.stream({ path: '/', method: 'GET' }, () => { + + }) + .catch((err) => { + t.ok(err) + }) + }) + + await t.completed +}) + +test('pipeline get', async (t) => { + t = tspl(t, { plan: 5 }) + + const server = createServer((req, res) => { + t.strictEqual('/', req.url) + t.strictEqual('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.destroy()) + + const bufs = [] + client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { + t.strictEqual(statusCode, 200) + t.strictEqual(headers['content-type'], 'text/plain') + return body + }) + .end() + .on('data', (buf) => { + bufs.push(buf) + }) + .on('end', () => { + t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + + await t.completed +}) test('backpressure algorithm', async (t) => { t = tspl(t, { plan: 12 }) @@ -422,731 +422,731 @@ test('backpressure algorithm', async (t) => { t.end() }) -// test('busy', async (t) => { -// t = tspl(t, { plan: 8 * 16 + 2 + 1 }) - -// const server = createServer((req, res) => { -// t.strictEqual('/', req.url) -// t.strictEqual('GET', req.method) -// res.setHeader('content-type', 'text/plain') -// res.end('hello') -// }) -// after(() => server.close()) - -// const connections = 2 - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections, -// pipelining: 2 -// }) -// client.on('drain', () => { -// t.ok(true, 'pass') -// }) -// client.on('connect', () => { -// t.ok(true, 'pass') -// }) -// after(() => client.destroy()) - -// for (let n = 1; n <= 8; ++n) { -// client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { -// t.ifError(err) -// t.strictEqual(statusCode, 200) -// t.strictEqual(headers['content-type'], 'text/plain') -// const bufs = [] -// body.on('data', (buf) => { -// bufs.push(buf) -// }) -// body.on('end', () => { -// t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) -// }) -// }) -// t.strictEqual(client[kPending], n) -// t.strictEqual(client[kBusy], n > 1) -// t.strictEqual(client[kSize], n) -// t.strictEqual(client[kRunning], 0) - -// t.strictEqual(client.stats.connected, 0) -// t.strictEqual(client.stats.free, 0) -// t.strictEqual(client.stats.queued, Math.max(n - connections, 0)) -// t.strictEqual(client.stats.pending, n) -// t.strictEqual(client.stats.size, n) -// t.strictEqual(client.stats.running, 0) -// } -// }) - -// await t.completed -// }) - -// test('invalid pool dispatch options', async (t) => { -// t = tspl(t, { plan: 2 }) -// const pool = new Pool('http://notahost') -// t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler') -// t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler') -// }) - -// test('pool upgrade promise', async (t) => { -// t = tspl(t, { plan: 2 }) - -// const server = net.createServer((c) => { -// c.on('data', (d) => { -// c.write('HTTP/1.1 101\r\n') -// c.write('hello: world\r\n') -// c.write('connection: upgrade\r\n') -// c.write('upgrade: websocket\r\n') -// c.write('\r\n') -// c.write('Body') -// }) - -// c.on('end', () => { -// c.end() -// }) -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.close()) - -// const { headers, socket } = await client.upgrade({ -// path: '/', -// method: 'GET', -// protocol: 'Websocket' -// }) - -// let recvData = '' -// socket.on('data', (d) => { -// recvData += d -// }) - -// socket.on('close', () => { -// t.strictEqual(recvData.toString(), 'Body') -// }) - -// t.deepStrictEqual(headers, { -// hello: 'world', -// connection: 'upgrade', -// upgrade: 'websocket' -// }) -// socket.end() -// }) - -// await t.completed -// }) - -// test('pool connect', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const server = createServer((c) => { -// t.fail() -// }) -// server.on('connect', (req, socket, firstBodyChunk) => { -// socket.write('HTTP/1.1 200 Connection established\r\n\r\n') - -// let data = firstBodyChunk.toString() -// socket.on('data', (buf) => { -// data += buf.toString() -// }) - -// socket.on('end', () => { -// socket.end(data) -// }) -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.close()) - -// const { socket } = await client.connect({ -// path: '/' -// }) - -// let recvData = '' -// socket.on('data', (d) => { -// recvData += d -// }) - -// socket.on('end', () => { -// t.strictEqual(recvData.toString(), 'Body') -// }) - -// socket.write('Body') -// socket.end() -// }) - -// await t.completed -// }) - -// test('pool dispatch', async (t) => { -// t = tspl(t, { plan: 2 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.close()) - -// let buf = '' -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// buf += chunk -// }, -// onComplete () { -// t.strictEqual(buf, 'asd') -// }, -// onError () { -// } -// }) -// }) - -// await t.completed -// }) - -// test('pool pipeline args validation', async (t) => { -// t = tspl(t, { plan: 2 }) - -// const client = new Pool('http://localhost:5000') - -// const ret = client.pipeline(null, () => {}) -// ret.on('error', (err) => { -// t.ok(/opts/.test(err.message)) -// t.ok(err instanceof errors.InvalidArgumentError) -// }) - -// await t.completed -// }) - -// test('300 requests succeed', async (t) => { -// t = tspl(t, { plan: 300 * 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1 -// }) -// after(() => client.destroy()) - -// for (let n = 0; n < 300; ++n) { -// client.request({ -// path: '/', -// method: 'GET' -// }, (err, data) => { -// t.ifError(err) -// data.body.on('data', (chunk) => { -// t.strictEqual(chunk.toString(), 'asd') -// }).on('end', () => { -// t.ok(true, 'pass') -// }) -// }) -// } -// }) - -// await t.completed -// }) - -// test('pool connect error', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const server = createServer((c) => { -// t.fail() -// }) -// server.on('connect', (req, socket, firstBodyChunk) => { -// socket.destroy() -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.close()) - -// try { -// await client.connect({ -// path: '/' -// }) -// } catch (err) { -// t.ok(err) -// } -// }) - -// await t.completed -// }) - -// test('pool upgrade error', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const server = net.createServer((c) => { -// c.on('data', (d) => { -// c.write('HTTP/1.1 101\r\n') -// c.write('hello: world\r\n') -// c.write('connection: upgrade\r\n') -// c.write('\r\n') -// c.write('Body') -// }) -// c.on('error', () => { -// // Whether we get an error, end or close is undefined. -// // Ignore error. -// }) -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`) -// after(() => client.close()) - -// try { -// await client.upgrade({ -// path: '/', -// method: 'GET', -// protocol: 'Websocket' -// }) -// } catch (err) { -// t.ok(err) -// } -// }) - -// await t.completed -// }) - -// test('pool dispatch error', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// client.dispatch({ -// path: '/', -// method: 'GET', -// headers: { -// 'transfer-encoding': 'fail' -// } -// }, { -// onConnect () { -// t.fail() -// }, -// onHeaders (statusCode, headers) { -// t.fail() -// }, -// onData (chunk) { -// t.fail() -// }, -// onError (err) { -// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') -// } -// }) -// }) - -// await t.completed -// }) - -// test('pool request abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.request({ -// path: '/', -// method: 'GET', -// signal -// }, (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) - -// test('pool stream abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.stream({ -// path: '/', -// method: 'GET', -// signal -// }, ({ body }) => body, (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) - -// test('pool pipeline abort in queue', async (t) => { -// t = tspl(t, { plan: 3 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// client.dispatch({ -// path: '/', -// method: 'GET' -// }, { -// onConnect () { -// }, -// onHeaders (statusCode, headers) { -// t.strictEqual(statusCode, 200) -// }, -// onData (chunk) { -// }, -// onComplete () { -// t.ok(true, 'pass') -// }, -// onError () { -// } -// }) - -// const signal = new EventEmitter() -// client.pipeline({ -// path: '/', -// method: 'GET', -// signal -// }, ({ body }) => body).end().on('error', (err) => { -// t.strictEqual(err.code, 'UND_ERR_ABORTED') -// }) -// signal.emit('abort') -// }) - -// await t.completed -// }) - -// test('pool stream constructor error destroy body', async (t) => { -// t = tspl(t, { plan: 4 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// { -// const body = new Readable({ -// read () { -// } -// }) -// client.stream({ -// path: '/', -// method: 'GET', -// body, -// headers: { -// 'transfer-encoding': 'fail' -// } -// }, () => { -// t.fail() -// }, (err) => { -// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') -// t.strictEqual(body.destroyed, true) -// }) -// } - -// { -// const body = new Readable({ -// read () { -// } -// }) -// client.stream({ -// path: '/', -// method: 'CONNECT', -// body -// }, () => { -// t.fail() -// }, (err) => { -// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') -// t.strictEqual(body.destroyed, true) -// }) -// } -// }) - -// await t.completed -// }) - -// test('pool request constructor error destroy body', async (t) => { -// t = tspl(t, { plan: 4 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.close()) - -// { -// const body = new Readable({ -// read () { -// } -// }) -// client.request({ -// path: '/', -// method: 'GET', -// body, -// headers: { -// 'transfer-encoding': 'fail' -// } -// }, (err) => { -// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') -// t.strictEqual(body.destroyed, true) -// }) -// } - -// { -// const body = new Readable({ -// read () { -// } -// }) -// client.request({ -// path: '/', -// method: 'CONNECT', -// body -// }, (err) => { -// t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') -// t.strictEqual(body.destroyed, true) -// }) -// } -// }) - -// await t.completed -// }) - -// test('pool close waits for all requests', async (t) => { -// t = tspl(t, { plan: 5 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.destroy()) - -// client.request({ -// path: '/', -// method: 'GET' -// }, (err) => { -// t.ifError(err) -// }) - -// client.request({ -// path: '/', -// method: 'GET' -// }, (err) => { -// t.ifError(err) -// }) - -// client.close(() => { -// t.ok(true, 'pass') -// }) - -// client.close(() => { -// t.ok(true, 'pass') -// }) - -// client.request({ -// path: '/', -// method: 'GET' -// }, (err) => { -// t.ok(err instanceof errors.ClientClosedError) -// }) -// }) - -// await t.completed -// }) - -// test('pool destroyed', async (t) => { -// t = tspl(t, { plan: 1 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.destroy()) - -// client.destroy() -// client.request({ -// path: '/', -// method: 'GET' -// }, (err) => { -// t.ok(err instanceof errors.ClientDestroyedError) -// }) -// }) - -// await t.completed -// }) - -// test('pool destroy fails queued requests', async (t) => { -// t = tspl(t, { plan: 6 }) - -// const server = createServer((req, res) => { -// res.end('asd') -// }) -// after(() => server.close()) - -// server.listen(0, async () => { -// const client = new Pool(`http://localhost:${server.address().port}`, { -// connections: 1, -// pipelining: 1 -// }) -// after(() => client.destroy()) - -// const _err = new Error() -// client.request({ -// path: '/', -// method: 'GET' -// }, (err) => { -// t.strictEqual(err, _err) -// }) - -// client.request({ -// path: '/', -// method: 'GET' -// }, (err) => { -// t.strictEqual(err, _err) -// }) - -// t.strictEqual(client.destroyed, false) -// client.destroy(_err, () => { -// t.ok(true, 'pass') -// }) -// t.strictEqual(client.destroyed, true) - -// client.request({ -// path: '/', -// method: 'GET' -// }, (err) => { -// t.ok(err instanceof errors.ClientDestroyedError) -// }) -// }) -// await t.completed -// }) +test('busy', async (t) => { + t = tspl(t, { plan: 8 * 16 + 2 + 1 }) + + const server = createServer((req, res) => { + t.strictEqual('/', req.url) + t.strictEqual('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + after(() => server.close()) + + const connections = 2 + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections, + pipelining: 2 + }) + client.on('drain', () => { + t.ok(true, 'pass') + }) + client.on('connect', () => { + t.ok(true, 'pass') + }) + after(() => client.destroy()) + + for (let n = 1; n <= 8; ++n) { + client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { + t.ifError(err) + t.strictEqual(statusCode, 200) + t.strictEqual(headers['content-type'], 'text/plain') + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.strictEqual('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + t.strictEqual(client[kPending], n) + t.strictEqual(client[kBusy], n > 1) + t.strictEqual(client[kSize], n) + t.strictEqual(client[kRunning], 0) + + t.strictEqual(client.stats.connected, 0) + t.strictEqual(client.stats.free, 0) + t.strictEqual(client.stats.queued, Math.max(n - connections, 0)) + t.strictEqual(client.stats.pending, n) + t.strictEqual(client.stats.size, n) + t.strictEqual(client.stats.running, 0) + } + }) + + await t.completed +}) + +test('invalid pool dispatch options', async (t) => { + t = tspl(t, { plan: 2 }) + const pool = new Pool('http://notahost') + t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler') + t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler') +}) + +test('pool upgrade promise', async (t) => { + t = tspl(t, { plan: 2 }) + + const server = net.createServer((c) => { + c.on('data', (d) => { + c.write('HTTP/1.1 101\r\n') + c.write('hello: world\r\n') + c.write('connection: upgrade\r\n') + c.write('upgrade: websocket\r\n') + c.write('\r\n') + c.write('Body') + }) + + c.on('end', () => { + c.end() + }) + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.close()) + + const { headers, socket } = await client.upgrade({ + path: '/', + method: 'GET', + protocol: 'Websocket' + }) + + let recvData = '' + socket.on('data', (d) => { + recvData += d + }) + + socket.on('close', () => { + t.strictEqual(recvData.toString(), 'Body') + }) + + t.deepStrictEqual(headers, { + hello: 'world', + connection: 'upgrade', + upgrade: 'websocket' + }) + socket.end() + }) + + await t.completed +}) + +test('pool connect', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = createServer((c) => { + t.fail() + }) + server.on('connect', (req, socket, firstBodyChunk) => { + socket.write('HTTP/1.1 200 Connection established\r\n\r\n') + + let data = firstBodyChunk.toString() + socket.on('data', (buf) => { + data += buf.toString() + }) + + socket.on('end', () => { + socket.end(data) + }) + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.close()) + + const { socket } = await client.connect({ + path: '/' + }) + + let recvData = '' + socket.on('data', (d) => { + recvData += d + }) + + socket.on('end', () => { + t.strictEqual(recvData.toString(), 'Body') + }) + + socket.write('Body') + socket.end() + }) + + await t.completed +}) + +test('pool dispatch', async (t) => { + t = tspl(t, { plan: 2 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.close()) + + let buf = '' + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + buf += chunk + }, + onComplete () { + t.strictEqual(buf, 'asd') + }, + onError () { + } + }) + }) + + await t.completed +}) + +test('pool pipeline args validation', async (t) => { + t = tspl(t, { plan: 2 }) + + const client = new Pool('http://localhost:5000') + + const ret = client.pipeline(null, () => {}) + ret.on('error', (err) => { + t.ok(/opts/.test(err.message)) + t.ok(err instanceof errors.InvalidArgumentError) + }) + + await t.completed +}) + +test('300 requests succeed', async (t) => { + t = tspl(t, { plan: 300 * 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1 + }) + after(() => client.destroy()) + + for (let n = 0; n < 300; ++n) { + client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.ifError(err) + data.body.on('data', (chunk) => { + t.strictEqual(chunk.toString(), 'asd') + }).on('end', () => { + t.ok(true, 'pass') + }) + }) + } + }) + + await t.completed +}) + +test('pool connect error', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = createServer((c) => { + t.fail() + }) + server.on('connect', (req, socket, firstBodyChunk) => { + socket.destroy() + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.close()) + + try { + await client.connect({ + path: '/' + }) + } catch (err) { + t.ok(err) + } + }) + + await t.completed +}) + +test('pool upgrade error', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = net.createServer((c) => { + c.on('data', (d) => { + c.write('HTTP/1.1 101\r\n') + c.write('hello: world\r\n') + c.write('connection: upgrade\r\n') + c.write('\r\n') + c.write('Body') + }) + c.on('error', () => { + // Whether we get an error, end or close is undefined. + // Ignore error. + }) + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + after(() => client.close()) + + try { + await client.upgrade({ + path: '/', + method: 'GET', + protocol: 'Websocket' + }) + } catch (err) { + t.ok(err) + } + }) + + await t.completed +}) + +test('pool dispatch error', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + client.dispatch({ + path: '/', + method: 'GET', + headers: { + 'transfer-encoding': 'fail' + } + }, { + onConnect () { + t.fail() + }, + onHeaders (statusCode, headers) { + t.fail() + }, + onData (chunk) { + t.fail() + }, + onError (err) { + t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') + } + }) + }) + + await t.completed +}) + +test('pool request abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) + +test('pool stream abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.stream({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body, (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) + +test('pool pipeline abort in queue', async (t) => { + t = tspl(t, { plan: 3 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.strictEqual(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.ok(true, 'pass') + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.pipeline({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body).end().on('error', (err) => { + t.strictEqual(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) + + await t.completed +}) + +test('pool stream constructor error destroy body', async (t) => { + t = tspl(t, { plan: 4 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + { + const body = new Readable({ + read () { + } + }) + client.stream({ + path: '/', + method: 'GET', + body, + headers: { + 'transfer-encoding': 'fail' + } + }, () => { + t.fail() + }, (err) => { + t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') + t.strictEqual(body.destroyed, true) + }) + } + + { + const body = new Readable({ + read () { + } + }) + client.stream({ + path: '/', + method: 'CONNECT', + body + }, () => { + t.fail() + }, (err) => { + t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') + t.strictEqual(body.destroyed, true) + }) + } + }) + + await t.completed +}) + +test('pool request constructor error destroy body', async (t) => { + t = tspl(t, { plan: 4 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.close()) + + { + const body = new Readable({ + read () { + } + }) + client.request({ + path: '/', + method: 'GET', + body, + headers: { + 'transfer-encoding': 'fail' + } + }, (err) => { + t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') + t.strictEqual(body.destroyed, true) + }) + } + + { + const body = new Readable({ + read () { + } + }) + client.request({ + path: '/', + method: 'CONNECT', + body + }, (err) => { + t.strictEqual(err.code, 'UND_ERR_INVALID_ARG') + t.strictEqual(body.destroyed, true) + }) + } + }) + + await t.completed +}) + +test('pool close waits for all requests', async (t) => { + t = tspl(t, { plan: 5 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.destroy()) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.ifError(err) + }) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.ifError(err) + }) + + client.close(() => { + t.ok(true, 'pass') + }) + + client.close(() => { + t.ok(true, 'pass') + }) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.ok(err instanceof errors.ClientClosedError) + }) + }) + + await t.completed +}) + +test('pool destroyed', async (t) => { + t = tspl(t, { plan: 1 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.destroy()) + + client.destroy() + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.ok(err instanceof errors.ClientDestroyedError) + }) + }) + + await t.completed +}) + +test('pool destroy fails queued requests', async (t) => { + t = tspl(t, { plan: 6 }) + + const server = createServer((req, res) => { + res.end('asd') + }) + after(() => server.close()) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + after(() => client.destroy()) + + const _err = new Error() + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.strictEqual(err, _err) + }) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.strictEqual(err, _err) + }) + + t.strictEqual(client.destroyed, false) + client.destroy(_err, () => { + t.ok(true, 'pass') + }) + t.strictEqual(client.destroyed, true) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.ok(err instanceof errors.ClientDestroyedError) + }) + }) + await t.completed +})