Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,35 @@ class Agent extends DispatcherBase {
}

this[kOnConnect] = (origin, targets) => {
const result = this[kClients].get(origin)
if (result) {
result.count += 1
}
this.emit('connect', origin, [this, ...targets])
}

this[kOnDisconnect] = (origin, targets, err) => {
const result = this[kClients].get(origin)
if (result) {
result.count -= 1
if (result.count <= 0) {
this[kClients].delete(origin)
result.dispatcher.destroy()
}
}
this.emit('disconnect', origin, [this, ...targets], err)
}

this[kOnConnectionError] = (origin, targets, err) => {
// TODO: should this decrement result.count here?
this.emit('connectionError', origin, [this, ...targets], err)
}
}

get [kRunning] () {
let ret = 0
for (const client of this[kClients].values()) {
ret += client[kRunning]
for (const { dispatcher } of this[kClients].values()) {
ret += dispatcher[kRunning]
}
return ret
}
Expand All @@ -73,28 +86,25 @@ class Agent extends DispatcherBase {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}

let dispatcher = this[kClients].get(key)

const result = this[kClients].get(key)
let dispatcher = result && result.dispatcher
if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

// This introduces a tiny memory leak, as dispatchers are never removed from the map.
// TODO(mcollina): remove te timer when the client/pool do not have any more
// active connections.
this[kClients].set(key, dispatcher)
this[kClients].set(key, { count: 0, dispatcher })
}

return dispatcher.dispatch(opts, handler)
}

async [kClose] () {
const closePromises = []
for (const client of this[kClients].values()) {
closePromises.push(client.close())
for (const { dispatcher } of this[kClients].values()) {
closePromises.push(dispatcher.close())
}
this[kClients].clear()

Expand All @@ -103,8 +113,8 @@ class Agent extends DispatcherBase {

async [kDestroy] (err) {
const destroyPromises = []
for (const client of this[kClients].values()) {
destroyPromises.push(client.destroy(err))
for (const { dispatcher } of this[kClients].values()) {
destroyPromises.push(dispatcher.destroy(err))
}
this[kClients].clear()

Expand All @@ -113,9 +123,9 @@ class Agent extends DispatcherBase {

get stats () {
const allClientStats = {}
for (const client of this[kClients].values()) {
if (client.stats) {
allClientStats[client[kUrl].origin] = client.stats
for (const { dispatcher } of this[kClients].values()) {
if (dispatcher.stats) {
allClientStats[dispatcher[kUrl].origin] = dispatcher.stats
}
}
return allClientStats
Expand Down
16 changes: 8 additions & 8 deletions lib/mock/mock-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class MockAgent extends Dispatcher {
}

[kMockAgentSet] (origin, dispatcher) {
this[kClients].set(origin, dispatcher)
this[kClients].set(origin, { count: 0, dispatcher })
}

[kFactory] (origin) {
Expand All @@ -171,9 +171,9 @@ class MockAgent extends Dispatcher {

[kMockAgentGet] (origin) {
// First check if we can immediately find it
const client = this[kClients].get(origin)
if (client) {
return client
const result = this[kClients].get(origin)
if (result?.dispatcher) {
return result.dispatcher
}

// If the origin is not a string create a dummy parent pool and return to user
Expand All @@ -184,11 +184,11 @@ class MockAgent extends Dispatcher {
}

// If we match, create a pool and assign the same dispatches
for (const [keyMatcher, nonExplicitDispatcher] of Array.from(this[kClients])) {
if (nonExplicitDispatcher && typeof keyMatcher !== 'string' && matchValue(keyMatcher, origin)) {
for (const [keyMatcher, result] of Array.from(this[kClients])) {
if (result && typeof keyMatcher !== 'string' && matchValue(keyMatcher, origin)) {
const dispatcher = this[kFactory](origin)
this[kMockAgentSet](origin, dispatcher)
dispatcher[kDispatches] = nonExplicitDispatcher[kDispatches]
dispatcher[kDispatches] = result.dispatcher[kDispatches]
return dispatcher
}
}
Expand All @@ -202,7 +202,7 @@ class MockAgent extends Dispatcher {
const mockAgentClients = this[kClients]

return Array.from(mockAgentClients.entries())
.flatMap(([origin, scope]) => scope[kDispatches].map(dispatch => ({ ...dispatch, origin })))
.flatMap(([origin, result]) => result.dispatcher[kDispatches].map(dispatch => ({ ...dispatch, origin })))
.filter(({ pending }) => pending)
}

Expand Down
Loading