diff --git a/lib/dispatcher/agent.js b/lib/dispatcher/agent.js index 98f1486cac0..448eb3b8769 100644 --- a/lib/dispatcher/agent.js +++ b/lib/dispatcher/agent.js @@ -79,7 +79,7 @@ class Agent extends DispatcherBase { return ret } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { let key if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { key = String(opts.origin) @@ -102,7 +102,7 @@ class Agent extends DispatcherBase { this[kClients].set(key, dispatcher) } - return dispatcher.dispatch(opts, handler) + return dispatcher.dispatch(opts, handler, onDrain) } async [kClose] () { diff --git a/lib/dispatcher/balanced-pool.js b/lib/dispatcher/balanced-pool.js index 15a7e7b5879..fa72eebeee7 100644 --- a/lib/dispatcher/balanced-pool.js +++ b/lib/dispatcher/balanced-pool.js @@ -10,7 +10,8 @@ const { kNeedDrain, kAddClient, kRemoveClient, - kGetDispatcher + kGetDispatcher, + kGetDispatchers } = require('./pool-base') const Pool = require('./pool') const { kUrl, kInterceptors } = require('../core/symbols') @@ -185,6 +186,54 @@ class BalancedPool extends PoolBase { this[kIndex] = maxWeightIndex return this[kClients][maxWeightIndex] } + + [kGetDispatchers] () { + // We validate that pools is greater than 0, + // otherwise we would have to wait until an upstream + // is added, which might never happen. + if (this[kClients].length === 0) { + throw new BalancedPoolMissingUpstreamError() + } + + let maxWeightIndex = 0 + { + let maxWeight = this[kClients][0][kWeight] + for (let n = 1; n < this[kClients].length; n++) { + if (this[kClients][n][kWeight] > maxWeight) { + maxWeight = this[kClients][n][kWeight] + maxWeightIndex = n + } + } + } + + let counter = 0 + while (counter++ < this[kClients].length) { + this[kIndex] = (this[kIndex] + 1) % this[kClients].length + const pool = this[kClients][this[kIndex]] + + // find pool index with the largest weight + if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) { + maxWeightIndex = this[kIndex] + } + + // decrease the current weight every `this[kClients].length`. + if (this[kIndex] === 0) { + // Set the current weight to the next lower weight. + this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor] + + if (this[kCurrentWeight] <= 0) { + this[kCurrentWeight] = this[kMaxWeightPerServer] + } + } + if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { + return pool + } + } + + this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] + this[kIndex] = maxWeightIndex + return this[kClients][maxWeightIndex] + } } module.exports = BalancedPool diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index cb61206b1ed..fb8207be645 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -62,6 +62,7 @@ const connectH2 = require('./client-h2.js') let deprecatedInterceptorWarned = false const kClosedResolve = Symbol('kClosedResolve') +const kDrainQueue = Symbol('kDrainQueue') function getPipelining (client) { return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1 @@ -243,6 +244,8 @@ class Client extends DispatcherBase { this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server this[kHTTPContext] = null + this[kDrainQueue] = [] + // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. // | complete | running | pending | @@ -299,26 +302,31 @@ class Client extends DispatcherBase { this.once('connect', cb) } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { const origin = opts.origin || this[kUrl].origin const request = new Request(origin, opts, handler) - this[kQueue].push(request) - if (this[kResuming]) { - // Do nothing. - } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { - // Wait a tick in case stream/iterator is ended in the same tick. - this[kResuming] = 1 - queueMicrotask(() => resume(this)) + if (this[kBusy] && onDrain) { + this[kDrainQueue].push(onDrain) + return false } else { - this[kResume](true) + this[kQueue].push(request) + if (this[kResuming]) { + // Do nothing. + } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { + // Wait a tick in case stream/iterator is ended in the same tick. + this[kResuming] = 1 + queueMicrotask(() => resume(this)) + } else { + this[kResume](true) + } } if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { this[kNeedDrain] = 2 } - return this[kNeedDrain] < 2 + return onDrain ? true : this[kNeedDrain] < 2 } async [kClose] () { @@ -341,6 +349,10 @@ class Client extends DispatcherBase { util.errorRequest(this, request, err) } + for (const callback of this[kDrainQueue].splice(0)) { + callback(err, this) + } + const callback = () => { if (this[kClosedResolve]) { // TODO (fix): Should we error here with ClientDestroyedError? @@ -517,6 +529,11 @@ async function connect (client) { function emitDrain (client) { client[kNeedDrain] = 0 + + for (const callback of client[kDrainQueue].splice(0)) { + callback.call(client, client[kUrl], [client]) + } + client.emit('drain', client[kUrl], [client]) } diff --git a/lib/dispatcher/dispatcher-base.js b/lib/dispatcher/dispatcher-base.js index bd860acdcf4..637f32c7a09 100644 --- a/lib/dispatcher/dispatcher-base.js +++ b/lib/dispatcher/dispatcher-base.js @@ -142,10 +142,10 @@ class DispatcherBase extends Dispatcher { }) } - [kInterceptedDispatch] (opts, handler) { + [kInterceptedDispatch] (opts, handler, onDrain) { if (!this[kInterceptors] || this[kInterceptors].length === 0) { this[kInterceptedDispatch] = this[kDispatch] - return this[kDispatch](opts, handler) + return this[kDispatch](opts, handler, onDrain) } let dispatch = this[kDispatch].bind(this) @@ -153,10 +153,10 @@ class DispatcherBase extends Dispatcher { dispatch = this[kInterceptors][i](dispatch) } this[kInterceptedDispatch] = dispatch - return dispatch(opts, handler) + return dispatch(opts, handler, onDrain) } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { if (!handler || typeof handler !== 'object') { throw new InvalidArgumentError('handler must be an object') } @@ -174,7 +174,7 @@ class DispatcherBase extends Dispatcher { throw new ClientClosedError() } - return this[kInterceptedDispatch](opts, handler) + return this[kInterceptedDispatch](opts, handler, onDrain) } catch (err) { if (typeof handler.onError !== 'function') { throw new InvalidArgumentError('invalid onError method') diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index b1e0098ec4b..c2625100479 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -30,7 +30,7 @@ class Dispatcher extends EventEmitter { dispatch = interceptor(dispatch) - if (dispatch == null || typeof dispatch !== 'function' || dispatch.length !== 2) { + if (dispatch == null || typeof dispatch !== 'function') { throw new TypeError('invalid interceptor') } } @@ -49,8 +49,12 @@ class ComposedDispatcher extends Dispatcher { this.#dispatch = dispatch } - dispatch (...args) { - this.#dispatch(...args) + dispatch (opts, handler, onDrain) { + // Allocating a closure here every time is not great... + return this.#dispatch(opts, handler, onDrain ?? (() => { + // XXX: Stop if error? + this.#dispatch(opts, handler, onDrain) + })) } close (...args) { diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index ff3108a4da2..493258038cc 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -7,6 +7,7 @@ const PoolStats = require('./pool-stats') const kClients = Symbol('clients') const kNeedDrain = Symbol('needDrain') +const kDrainQueue = Symbol('drainQueue') const kQueue = Symbol('queue') const kClosedResolve = Symbol('closed resolve') const kOnDrain = Symbol('onDrain') @@ -14,6 +15,7 @@ const kOnConnect = Symbol('onConnect') const kOnDisconnect = Symbol('onDisconnect') const kOnConnectionError = Symbol('onConnectionError') const kGetDispatcher = Symbol('get dispatcher') +const kGetDispatchers = Symbol('get dispatchers') const kAddClient = Symbol('add client') const kRemoveClient = Symbol('remove client') const kStats = Symbol('stats') @@ -25,12 +27,17 @@ class PoolBase extends DispatcherBase { this[kQueue] = new FixedQueue() this[kClients] = [] this[kQueued] = 0 + this[kDrainQueue] = [] const pool = this this[kOnDrain] = function onDrain (origin, targets) { const queue = pool[kQueue] + if (queue.isEmpty()) { + return + } + let needDrain = false while (!needDrain) { @@ -44,7 +51,12 @@ class PoolBase extends DispatcherBase { this[kNeedDrain] = needDrain + for (const callback of pool[kDrainQueue].splice(0)) { + callback.call(this, origin, [pool, ...targets]) + } + if (!this[kNeedDrain] && pool[kNeedDrain]) { + // Legacy... pool[kNeedDrain] = false pool.emit('drain', origin, [pool, ...targets]) } @@ -133,19 +145,31 @@ class PoolBase extends DispatcherBase { return Promise.all(this[kClients].map(c => c.destroy(err))) } - [kDispatch] (opts, handler) { - const dispatcher = this[kGetDispatcher]() + [kDispatch] (opts, handler, onDrain) { + if (onDrain) { + for (const dispatcher of this[kGetDispatchers]()) { + if (dispatcher.dispatch(opts, handler, this[kOnDrain])) { + return true + } + } - if (!dispatcher) { - this[kNeedDrain] = true - this[kQueue].push({ opts, handler }) - this[kQueued]++ - } else if (!dispatcher.dispatch(opts, handler)) { - dispatcher[kNeedDrain] = true - this[kNeedDrain] = !this[kGetDispatcher]() - } + this[kDrainQueue].push(onDrain) - return !this[kNeedDrain] + return false + } else { + // Legacy... + const dispatcher = this[kGetDispatcher]() + if (!dispatcher) { + this[kNeedDrain] = true + this[kQueue].push({ opts, handler }) + this[kQueued]++ + } else if (!dispatcher.dispatch(opts, handler)) { + dispatcher[kNeedDrain] = true + this[kNeedDrain] = !this[kGetDispatcher]() + } + + return !this[kNeedDrain] + } } [kAddClient] (client) { @@ -190,5 +214,6 @@ module.exports = { kNeedDrain, kAddClient, kRemoveClient, - kGetDispatcher + kGetDispatcher, + kGetDispatchers } diff --git a/lib/dispatcher/pool.js b/lib/dispatcher/pool.js index 2d84cd96488..2aad43ef65c 100644 --- a/lib/dispatcher/pool.js +++ b/lib/dispatcher/pool.js @@ -5,7 +5,8 @@ const { kClients, kNeedDrain, kAddClient, - kGetDispatcher + kGetDispatcher, + kGetDispatchers } = require('./pool-base') const Client = require('./client') const { @@ -88,6 +89,18 @@ class Pool extends PoolBase { return dispatcher } } + + * [kGetDispatchers]() { + for (const client of this[kClients]) { + yield client + } + + if (!this[kConnections] || this[kClients].length < this[kConnections]) { + const dispatcher = this[kFactory](this[kUrl], this[kOptions]) + this[kAddClient](dispatcher) + yield dispatcher + } + } } module.exports = Pool diff --git a/lib/dispatcher/proxy-agent.js b/lib/dispatcher/proxy-agent.js index 226b67846da..a698a70a6b2 100644 --- a/lib/dispatcher/proxy-agent.js +++ b/lib/dispatcher/proxy-agent.js @@ -107,7 +107,7 @@ class ProxyAgent extends DispatcherBase { }) } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { const headers = buildHeaders(opts.headers) throwIfProxyAuthIsSent(headers) @@ -121,7 +121,8 @@ class ProxyAgent extends DispatcherBase { ...opts, headers }, - handler + handler, + onDrain ) } diff --git a/lib/dispatcher/retry-agent.js b/lib/dispatcher/retry-agent.js index 0c2120d6f26..1a5015b54bb 100644 --- a/lib/dispatcher/retry-agent.js +++ b/lib/dispatcher/retry-agent.js @@ -4,15 +4,16 @@ const Dispatcher = require('./dispatcher') const RetryHandler = require('../handler/retry-handler') class RetryAgent extends Dispatcher { - #agent = null - #options = null + #agent + #options + constructor (agent, options = {}) { super(options) this.#agent = agent this.#options = options } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { const retry = new RetryHandler({ ...opts, retryOptions: this.#options @@ -20,7 +21,7 @@ class RetryAgent extends Dispatcher { dispatch: this.#agent.dispatch.bind(this.#agent), handler }) - return this.#agent.dispatch(opts, retry) + return this.#agent.dispatch(opts, retry, onDrain) } close () { diff --git a/lib/interceptor/dump.js b/lib/interceptor/dump.js index fc9cacb198d..44d221e2b43 100644 --- a/lib/interceptor/dump.js +++ b/lib/interceptor/dump.js @@ -106,7 +106,7 @@ function createDumpInterceptor ( } ) { return dispatch => { - return function Intercept (opts, handler) { + return function Intercept (opts, handler, onDrain) { const { dumpMaxSize = defaultMaxSize } = opts @@ -115,7 +115,7 @@ function createDumpInterceptor ( handler ) - return dispatch(opts, dumpHandler) + return dispatch(opts, dumpHandler, onDrain) } } } diff --git a/lib/interceptor/redirect-interceptor.js b/lib/interceptor/redirect-interceptor.js index 896ee8db939..7b26af0cc8b 100644 --- a/lib/interceptor/redirect-interceptor.js +++ b/lib/interceptor/redirect-interceptor.js @@ -4,16 +4,16 @@ const RedirectHandler = require('../handler/redirect-handler') function createRedirectInterceptor ({ maxRedirections: defaultMaxRedirections }) { return (dispatch) => { - return function Intercept (opts, handler) { + return function Intercept (opts, handler, onDrain) { const { maxRedirections = defaultMaxRedirections } = opts if (!maxRedirections) { - return dispatch(opts, handler) + return dispatch(opts, handler, onDrain) } const redirectHandler = new RedirectHandler(dispatch, maxRedirections, opts, handler) opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting. - return dispatch(opts, redirectHandler) + return dispatch(opts, redirectHandler, onDrain) } } } diff --git a/lib/interceptor/redirect.js b/lib/interceptor/redirect.js index d2e789d8efb..ffe943d7650 100644 --- a/lib/interceptor/redirect.js +++ b/lib/interceptor/redirect.js @@ -4,7 +4,7 @@ const RedirectHandler = require('../handler/redirect-handler') module.exports = opts => { const globalMaxRedirections = opts?.maxRedirections return dispatch => { - return function redirectInterceptor (opts, handler) { + return function redirectInterceptor (opts, handler, onDrain) { const { maxRedirections = globalMaxRedirections, ...baseOpts } = opts if (!maxRedirections) { @@ -18,7 +18,7 @@ module.exports = opts => { handler ) - return dispatch(baseOpts, redirectHandler) + return dispatch(baseOpts, redirectHandler, onDrain) } } } diff --git a/lib/interceptor/retry.js b/lib/interceptor/retry.js index 1c16fd845a9..4ec61b53d37 100644 --- a/lib/interceptor/retry.js +++ b/lib/interceptor/retry.js @@ -3,7 +3,7 @@ const RetryHandler = require('../handler/retry-handler') module.exports = globalOpts => { return dispatch => { - return function retryInterceptor (opts, handler) { + return function retryInterceptor (opts, handler, onDrain) { return dispatch( opts, new RetryHandler( @@ -12,7 +12,8 @@ module.exports = globalOpts => { handler, dispatch } - ) + ), + onDrain ) } }