diff --git a/src/internal/connection-provider-routing.js b/src/internal/connection-provider-routing.js index 1acb56736..e8741091a 100644 --- a/src/internal/connection-provider-routing.js +++ b/src/internal/connection-provider-routing.js @@ -63,7 +63,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider }) this._seedRouter = address - this._routingTables = {} this._rediscovery = new Rediscovery(routingContext, address.toString()) this._loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( this._connectionPool @@ -72,9 +71,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._dnsResolver = new HostNameResolver() this._log = log this._useSeedRouter = true - this._routingTablePurgeDelay = routingTablePurgeDelay - ? int(routingTablePurgeDelay) - : DEFAULT_ROUTING_TABLE_PURGE_DELAY + this._routingTableRegistry = new RoutingTableRegistry( + routingTablePurgeDelay + ? int(routingTablePurgeDelay) + : DEFAULT_ROUTING_TABLE_PURGE_DELAY + ) } _createConnectionErrorHandler () { @@ -87,7 +88,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._log.warn( `Routing driver ${this._id} will forget ${address} for database '${database}' because of an error ${error.code} '${error.message}'` ) - this.forget(address, database || '') + this.forget(address, database || DEFAULT_DB_NAME) return error } @@ -95,7 +96,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._log.warn( `Routing driver ${this._id} will forget writer ${address} for database '${database}' because of an error ${error.code} '${error.message}'` ) - this.forgetWriter(address, database || '') + this.forgetWriter(address, database || DEFAULT_DB_NAME) return newError( 'No longer possible to write to server at ' + address, SESSION_EXPIRED @@ -206,13 +207,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } forget (address, database) { - if (database || database === '') { - this._routingTables[database].forget(address) - } else { - Object.values(this._routingTables).forEach(routingTable => - routingTable.forget(address) - ) - } + this._routingTableRegistry.apply(database, { + applyWhenExists: routingTable => routingTable.forget(address) + }) // We're firing and forgetting this operation explicitly and listening for any // errors to avoid unhandled promise rejection @@ -220,13 +217,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } forgetWriter (address, database) { - if (database || database === '') { - this._routingTables[database].forgetWriter(address) - } else { - Object.values(this._routingTables).forEach(routingTable => - routingTable.forgetWriter(address) - ) - } + this._routingTableRegistry.apply(database, { + applyWhenExists: routingTable => routingTable.forgetWriter(address) + }) } _acquireConnectionToServer (address, serverName, routingTable) { @@ -234,8 +227,10 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } _freshRoutingTable ({ accessMode, database, bookmark } = {}) { - const currentRoutingTable = - this._routingTables[database] || new RoutingTable({ database }) + const currentRoutingTable = this._routingTableRegistry.get( + database, + () => new RoutingTable({ database }) + ) if (!currentRoutingTable.isStaleFor(accessMode)) { return currentRoutingTable @@ -481,16 +476,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider async _updateRoutingTable (newRoutingTable) { // close old connections to servers not present in the new routing table await this._connectionPool.keepAll(newRoutingTable.allServers()) - - // filter out expired to purge (expired for a pre-configured amount of time) routing table entries - Object.values(this._routingTables).forEach(value => { - if (value.isExpiredFor(this._routingTablePurgeDelay)) { - delete this._routingTables[value.database] - } - }) - - // make this driver instance aware of the new table - this._routingTables[newRoutingTable.database] = newRoutingTable + this._routingTableRegistry.removeExpired() + this._routingTableRegistry.register( + newRoutingTable.database, + newRoutingTable + ) this._log.info(`Updated routing table ${newRoutingTable}`) } @@ -501,3 +491,96 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } } } + +/** + * Responsible for keeping track of the existing routing tables + */ +class RoutingTableRegistry { + /** + * Constructor + * @param {int} routingTablePurgeDelay The routing table purge delay + */ + constructor (routingTablePurgeDelay) { + this._tables = new Map() + this._routingTablePurgeDelay = routingTablePurgeDelay + } + + /** + * Put a routing table in the registry + * + * @param {string} database The database name + * @param {RoutingTable} table The routing table + * @returns {RoutingTableRegistry} this + */ + register (database, table) { + this._tables.set(database, table) + return this + } + + /** + * Apply function in the routing table for an specific database. If the database name is not defined, the function will + * be applied for each element + * + * @param {string} database The database name + * @param {object} callbacks The actions + * @param {function (RoutingTable)} callbacks.applyWhenExists Call when the db exists or when the database property is not informed + * @param {function ()} callbacks.applyWhenDontExists Call when the database doesn't have the routing table registred + * @returns {RoutingTableRegistry} this + */ + apply (database, { applyWhenExists, applyWhenDontExists = () => {} } = {}) { + if (this._tables.has(database)) { + applyWhenExists(this._tables.get(database)) + } else if (typeof database === 'string' || database === null) { + applyWhenDontExists() + } else { + this._forEach(applyWhenExists) + } + return this + } + + /** + * Retrieves a routing table from a given database name + * @param {string} database The database name + * @param {function()|RoutingTable} defaultSupplier The routing table supplier, if it's not a function or not exists, it will return itself as default value + * @returns {RoutingTable} The routing table for the respective database + */ + get (database, defaultSupplier) { + if (this._tables.has(database)) { + return this._tables.get(database) + } + return typeof defaultSupplier === 'function' + ? defaultSupplier() + : defaultSupplier + } + + /** + * Remove the routing table which is already expired + * @returns {RoutingTableRegistry} this + */ + removeExpired () { + return this._removeIf(value => + value.isExpiredFor(this._routingTablePurgeDelay) + ) + } + + _forEach (apply) { + for (const [, value] of this._tables) { + apply(value) + } + return this + } + + _remove (key) { + this._tables.delete(key) + return this + } + + _removeIf (predicate) { + for (const [key, value] of this._tables) { + if (predicate(value)) { + this._remove(key) + } + } + return this + } +} diff --git a/test/internal/connection-provider-routing.test.js b/test/internal/connection-provider-routing.test.js index 390bbf592..78dbdd532 100644 --- a/test/internal/connection-provider-routing.test.js +++ b/test/internal/connection-provider-routing.test.js @@ -1677,6 +1677,99 @@ describe('#unit RoutingConnectionProvider', () => { ) }) + it('should forget write server from the default database routing table on availability error', async () => { + const pool = newPool() + const connectionProvider = newRoutingConnectionProvider( + [ + newRoutingTable( + 'databaseA', + [server1, server2, server3], + [server1, server2], + [server3] + ), + newRoutingTable( + null, + [serverA, serverB, serverC], + [serverA, serverB], + [serverA, serverC] + ) + ], + pool + ) + + const conn1 = await connectionProvider.acquireConnection({ + accessMode: WRITE, + database: null + }) + + // when + conn1._errorHandler.handleAndTransformError( + newError('connection error', SERVICE_UNAVAILABLE), + conn1.address + ) + + expectRoutingTable( + connectionProvider, + 'databaseA', + [server1, server2, server3], + [server1, server2], + [server3] + ) + expectRoutingTable( + connectionProvider, + null, + [serverA, serverB, serverC], + [serverB], + [serverC] + ) + }) + + it('should forget write server from the default database routing table on availability error when db not informed', async () => { + const pool = newPool() + const connectionProvider = newRoutingConnectionProvider( + [ + newRoutingTable( + 'databaseA', + [server1, server2, server3], + [server1, server2], + [server3] + ), + newRoutingTable( + null, + [serverA, serverB, serverC], + [serverA, serverB], + [serverA, serverC] + ) + ], + pool + ) + + const conn1 = await connectionProvider.acquireConnection({ + accessMode: WRITE + }) + + // when + conn1._errorHandler.handleAndTransformError( + newError('connection error', SERVICE_UNAVAILABLE), + conn1.address + ) + + expectRoutingTable( + connectionProvider, + 'databaseA', + [server1, server2, server3], + [server1, server2], + [server3] + ) + expectRoutingTable( + connectionProvider, + null, + [serverA, serverB, serverC], + [serverB], + [serverC] + ) + }) + it('should forget write server from correct routing table on write error', async () => { const pool = newPool() const connectionProvider = newRoutingConnectionProvider( @@ -1847,7 +1940,7 @@ function newRoutingConnectionProviderWithSeedRouter ( }) connectionProvider._connectionPool = pool routingTables.forEach(r => { - connectionProvider._routingTables[r.database] = r + connectionProvider._routingTableRegistry.register(r.database, r) }) connectionProvider._rediscovery = new FakeRediscovery(routerToRoutingTable) connectionProvider._hostNameResolver = new FakeDnsResolver(seedRouterResolved) @@ -1903,14 +1996,15 @@ function expectRoutingTable ( readers, writers ) { - expect(connectionProvider._routingTables[database].database).toEqual(database) - expect(connectionProvider._routingTables[database].routers).toEqual(routers) - expect(connectionProvider._routingTables[database].readers).toEqual(readers) - expect(connectionProvider._routingTables[database].writers).toEqual(writers) + const routingTable = connectionProvider._routingTableRegistry.get(database) + expect(routingTable.database).toEqual(database) + expect(routingTable.routers).toEqual(routers) + expect(routingTable.readers).toEqual(readers) + expect(routingTable.writers).toEqual(writers) } function expectNoRoutingTable (connectionProvider, database) { - expect(connectionProvider._routingTables[database]).toBeFalsy() + expect(connectionProvider._routingTableRegistry.get(database)).toBeFalsy() } function expectPoolToContain (pool, addresses) {