Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Agent extends DispatcherBase {
return ret
}

[kDispatch] (opts, handler) {
[kDispatch] (opts, handler, onDrain) {
let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
Expand All @@ -102,7 +102,7 @@ class Agent extends DispatcherBase {
this[kClients].set(key, dispatcher)
}

return dispatcher.dispatch(opts, handler)
return dispatcher.dispatch(opts, handler, onDrain)
}

async [kClose] () {
Expand Down
51 changes: 50 additions & 1 deletion lib/dispatcher/balanced-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const {
kNeedDrain,
kAddClient,
kRemoveClient,
kGetDispatcher
kGetDispatcher,
kGetDispatchers
} = require('./pool-base')
const Pool = require('./pool')
const { kUrl, kInterceptors } = require('../core/symbols')
Expand Down Expand Up @@ -185,6 +186,54 @@ class BalancedPool extends PoolBase {
this[kIndex] = maxWeightIndex
return this[kClients][maxWeightIndex]
}

[kGetDispatchers] () {
// We validate that pools is greater than 0,
// otherwise we would have to wait until an upstream
// is added, which might never happen.
if (this[kClients].length === 0) {
throw new BalancedPoolMissingUpstreamError()
}

let maxWeightIndex = 0
{
let maxWeight = this[kClients][0][kWeight]
for (let n = 1; n < this[kClients].length; n++) {
if (this[kClients][n][kWeight] > maxWeight) {
maxWeight = this[kClients][n][kWeight]
maxWeightIndex = n
}
}
}

let counter = 0
while (counter++ < this[kClients].length) {
this[kIndex] = (this[kIndex] + 1) % this[kClients].length
const pool = this[kClients][this[kIndex]]

// find pool index with the largest weight
if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
maxWeightIndex = this[kIndex]
}

// decrease the current weight every `this[kClients].length`.
if (this[kIndex] === 0) {
// Set the current weight to the next lower weight.
this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]

if (this[kCurrentWeight] <= 0) {
this[kCurrentWeight] = this[kMaxWeightPerServer]
}
}
if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
return pool
}
}

this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
this[kIndex] = maxWeightIndex
return this[kClients][maxWeightIndex]
}
}

module.exports = BalancedPool
37 changes: 27 additions & 10 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const connectH2 = require('./client-h2.js')
let deprecatedInterceptorWarned = false

const kClosedResolve = Symbol('kClosedResolve')
const kDrainQueue = Symbol('kDrainQueue')

function getPipelining (client) {
return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
Expand Down Expand Up @@ -243,6 +244,8 @@ class Client extends DispatcherBase {
this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
this[kHTTPContext] = null

this[kDrainQueue] = []

// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
// | complete | running | pending |
Expand Down Expand Up @@ -299,26 +302,31 @@ class Client extends DispatcherBase {
this.once('connect', cb)
}

[kDispatch] (opts, handler) {
[kDispatch] (opts, handler, onDrain) {
const origin = opts.origin || this[kUrl].origin
const request = new Request(origin, opts, handler)

this[kQueue].push(request)
if (this[kResuming]) {
// Do nothing.
} else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
// Wait a tick in case stream/iterator is ended in the same tick.
this[kResuming] = 1
queueMicrotask(() => resume(this))
if (this[kBusy] && onDrain) {
this[kDrainQueue].push(onDrain)
return false
} else {
this[kResume](true)
this[kQueue].push(request)
if (this[kResuming]) {
// Do nothing.
} else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
// Wait a tick in case stream/iterator is ended in the same tick.
this[kResuming] = 1
queueMicrotask(() => resume(this))
} else {
this[kResume](true)
}
}

if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
this[kNeedDrain] = 2
}

return this[kNeedDrain] < 2
return onDrain ? true : this[kNeedDrain] < 2
}

async [kClose] () {
Expand All @@ -341,6 +349,10 @@ class Client extends DispatcherBase {
util.errorRequest(this, request, err)
}

for (const callback of this[kDrainQueue].splice(0)) {
callback(err, this)
}

const callback = () => {
if (this[kClosedResolve]) {
// TODO (fix): Should we error here with ClientDestroyedError?
Expand Down Expand Up @@ -517,6 +529,11 @@ async function connect (client) {

function emitDrain (client) {
client[kNeedDrain] = 0

for (const callback of client[kDrainQueue].splice(0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it differs from subscribing to dispatcher.on('drain')?

Sure, it is integrated as first-class-citizen, but it kind overlaps with the drain event isn't it?

If the retry interceptor wants to implement backpressure, it can handle the last value returned from dispatch, and decide what to do on the next attempt, e.g. waiting for the event or trying once more, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'drain' applies globally while onDrain applies to this specific request, e.g. you could have different origins that apply to different queues, so having a global drain is confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is why the current dispatch: boolean + drain 'event' API is slightly broken.

callback.call(client, client[kUrl], [client])
}

client.emit('drain', client[kUrl], [client])
}

Expand Down
10 changes: 5 additions & 5 deletions lib/dispatcher/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,21 @@ class DispatcherBase extends Dispatcher {
})
}

[kInterceptedDispatch] (opts, handler) {
[kInterceptedDispatch] (opts, handler, onDrain) {
if (!this[kInterceptors] || this[kInterceptors].length === 0) {
this[kInterceptedDispatch] = this[kDispatch]
return this[kDispatch](opts, handler)
return this[kDispatch](opts, handler, onDrain)
}

let dispatch = this[kDispatch].bind(this)
for (let i = this[kInterceptors].length - 1; i >= 0; i--) {
dispatch = this[kInterceptors][i](dispatch)
}
this[kInterceptedDispatch] = dispatch
return dispatch(opts, handler)
return dispatch(opts, handler, onDrain)
}

dispatch (opts, handler) {
dispatch (opts, handler, onDrain) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object')
}
Expand All @@ -174,7 +174,7 @@ class DispatcherBase extends Dispatcher {
throw new ClientClosedError()
}

return this[kInterceptedDispatch](opts, handler)
return this[kInterceptedDispatch](opts, handler, onDrain)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
Expand Down
10 changes: 7 additions & 3 deletions lib/dispatcher/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Dispatcher extends EventEmitter {

dispatch = interceptor(dispatch)

if (dispatch == null || typeof dispatch !== 'function' || dispatch.length !== 2) {
if (dispatch == null || typeof dispatch !== 'function') {
throw new TypeError('invalid interceptor')
}
}
Expand All @@ -49,8 +49,12 @@ class ComposedDispatcher extends Dispatcher {
this.#dispatch = dispatch
}

dispatch (...args) {
this.#dispatch(...args)
dispatch (opts, handler, onDrain) {
// Allocating a closure here every time is not great...
return this.#dispatch(opts, handler, onDrain ?? (() => {
// XXX: Stop if error?
this.#dispatch(opts, handler, onDrain)
}))
}

close (...args) {
Expand Down
49 changes: 37 additions & 12 deletions lib/dispatcher/pool-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ const PoolStats = require('./pool-stats')

const kClients = Symbol('clients')
const kNeedDrain = Symbol('needDrain')
const kDrainQueue = Symbol('drainQueue')
const kQueue = Symbol('queue')
const kClosedResolve = Symbol('closed resolve')
const kOnDrain = Symbol('onDrain')
const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnConnectionError = Symbol('onConnectionError')
const kGetDispatcher = Symbol('get dispatcher')
const kGetDispatchers = Symbol('get dispatchers')
const kAddClient = Symbol('add client')
const kRemoveClient = Symbol('remove client')
const kStats = Symbol('stats')
Expand All @@ -25,12 +27,17 @@ class PoolBase extends DispatcherBase {
this[kQueue] = new FixedQueue()
this[kClients] = []
this[kQueued] = 0
this[kDrainQueue] = []

const pool = this

this[kOnDrain] = function onDrain (origin, targets) {
const queue = pool[kQueue]

if (queue.isEmpty()) {
return
}

let needDrain = false

while (!needDrain) {
Expand All @@ -44,7 +51,12 @@ class PoolBase extends DispatcherBase {

this[kNeedDrain] = needDrain

for (const callback of pool[kDrainQueue].splice(0)) {
callback.call(this, origin, [pool, ...targets])
}

if (!this[kNeedDrain] && pool[kNeedDrain]) {
// Legacy...
pool[kNeedDrain] = false
pool.emit('drain', origin, [pool, ...targets])
}
Expand Down Expand Up @@ -133,19 +145,31 @@ class PoolBase extends DispatcherBase {
return Promise.all(this[kClients].map(c => c.destroy(err)))
}

[kDispatch] (opts, handler) {
const dispatcher = this[kGetDispatcher]()
[kDispatch] (opts, handler, onDrain) {
if (onDrain) {
for (const dispatcher of this[kGetDispatchers]()) {
if (dispatcher.dispatch(opts, handler, this[kOnDrain])) {
return true
}
}

if (!dispatcher) {
this[kNeedDrain] = true
this[kQueue].push({ opts, handler })
this[kQueued]++
} else if (!dispatcher.dispatch(opts, handler)) {
dispatcher[kNeedDrain] = true
this[kNeedDrain] = !this[kGetDispatcher]()
}
this[kDrainQueue].push(onDrain)

return !this[kNeedDrain]
return false
} else {
// Legacy...
const dispatcher = this[kGetDispatcher]()
if (!dispatcher) {
this[kNeedDrain] = true
this[kQueue].push({ opts, handler })
this[kQueued]++
} else if (!dispatcher.dispatch(opts, handler)) {
dispatcher[kNeedDrain] = true
this[kNeedDrain] = !this[kGetDispatcher]()
}

return !this[kNeedDrain]
}
}

[kAddClient] (client) {
Expand Down Expand Up @@ -190,5 +214,6 @@ module.exports = {
kNeedDrain,
kAddClient,
kRemoveClient,
kGetDispatcher
kGetDispatcher,
kGetDispatchers
}
15 changes: 14 additions & 1 deletion lib/dispatcher/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const {
kClients,
kNeedDrain,
kAddClient,
kGetDispatcher
kGetDispatcher,
kGetDispatchers
} = require('./pool-base')
const Client = require('./client')
const {
Expand Down Expand Up @@ -88,6 +89,18 @@ class Pool extends PoolBase {
return dispatcher
}
}

* [kGetDispatchers]() {
for (const client of this[kClients]) {
yield client
}

if (!this[kConnections] || this[kClients].length < this[kConnections]) {
const dispatcher = this[kFactory](this[kUrl], this[kOptions])
this[kAddClient](dispatcher)
yield dispatcher
}
}
}

module.exports = Pool
5 changes: 3 additions & 2 deletions lib/dispatcher/proxy-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ProxyAgent extends DispatcherBase {
})
}

dispatch (opts, handler) {
dispatch (opts, handler, onDrain) {
const headers = buildHeaders(opts.headers)
throwIfProxyAuthIsSent(headers)

Expand All @@ -121,7 +121,8 @@ class ProxyAgent extends DispatcherBase {
...opts,
headers
},
handler
handler,
onDrain
)
}

Expand Down
9 changes: 5 additions & 4 deletions lib/dispatcher/retry-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ const Dispatcher = require('./dispatcher')
const RetryHandler = require('../handler/retry-handler')

class RetryAgent extends Dispatcher {
#agent = null
#options = null
#agent
#options

constructor (agent, options = {}) {
super(options)
this.#agent = agent
this.#options = options
}

dispatch (opts, handler) {
dispatch (opts, handler, onDrain) {
const retry = new RetryHandler({
...opts,
retryOptions: this.#options
}, {
dispatch: this.#agent.dispatch.bind(this.#agent),
handler
})
return this.#agent.dispatch(opts, retry)
return this.#agent.dispatch(opts, retry, onDrain)
}

close () {
Expand Down
Loading