@@ -63,7 +63,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
6363 } )
6464
6565 this . _seedRouter = address
66- this . _routingTables = { }
6766 this . _rediscovery = new Rediscovery ( routingContext , address . toString ( ) )
6867 this . _loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy (
6968 this . _connectionPool
@@ -72,9 +71,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
7271 this . _dnsResolver = new HostNameResolver ( )
7372 this . _log = log
7473 this . _useSeedRouter = true
75- this . _routingTablePurgeDelay = routingTablePurgeDelay
76- ? int ( routingTablePurgeDelay )
77- : DEFAULT_ROUTING_TABLE_PURGE_DELAY
74+ this . _routingTableRegistry = new RoutingTableRegistry (
75+ routingTablePurgeDelay
76+ ? int ( routingTablePurgeDelay )
77+ : DEFAULT_ROUTING_TABLE_PURGE_DELAY
78+ )
7879 }
7980
8081 _createConnectionErrorHandler ( ) {
@@ -87,15 +88,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
8788 this . _log . warn (
8889 `Routing driver ${ this . _id } will forget ${ address } for database '${ database } ' because of an error ${ error . code } '${ error . message } '`
8990 )
90- this . forget ( address , database || '' )
91+ this . forget ( address , database || DEFAULT_DB_NAME )
9192 return error
9293 }
9394
9495 _handleWriteFailure ( error , address , database ) {
9596 this . _log . warn (
9697 `Routing driver ${ this . _id } will forget writer ${ address } for database '${ database } ' because of an error ${ error . code } '${ error . message } '`
9798 )
98- this . forgetWriter ( address , database || '' )
99+ this . forgetWriter ( address , database || DEFAULT_DB_NAME )
99100 return newError (
100101 'No longer possible to write to server at ' + address ,
101102 SESSION_EXPIRED
@@ -206,36 +207,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
206207 }
207208
208209 forget ( address , database ) {
209- if ( database || database === '' ) {
210- this . _routingTables [ database ] . forget ( address )
211- } else {
212- Object . values ( this . _routingTables ) . forEach ( routingTable =>
213- routingTable . forget ( address )
214- )
215- }
210+ this . _routingTableRegistry . apply ( database , {
211+ applyWhenExists : routingTable => routingTable . forget ( address )
212+ } )
216213
217214 // We're firing and forgetting this operation explicitly and listening for any
218215 // errors to avoid unhandled promise rejection
219216 this . _connectionPool . purge ( address ) . catch ( ( ) => { } )
220217 }
221218
222219 forgetWriter ( address , database ) {
223- if ( database || database === '' ) {
224- this . _routingTables [ database ] . forgetWriter ( address )
225- } else {
226- Object . values ( this . _routingTables ) . forEach ( routingTable =>
227- routingTable . forgetWriter ( address )
228- )
229- }
220+ this . _routingTableRegistry . apply ( database , {
221+ applyWhenExists : routingTable => routingTable . forgetWriter ( address )
222+ } )
230223 }
231224
232225 _acquireConnectionToServer ( address , serverName , routingTable ) {
233226 return this . _connectionPool . acquire ( address )
234227 }
235228
236229 _freshRoutingTable ( { accessMode, database, bookmark } = { } ) {
237- const currentRoutingTable =
238- this . _routingTables [ database ] || new RoutingTable ( { database } )
230+ const currentRoutingTable = this . _routingTableRegistry . get (
231+ database ,
232+ ( ) => new RoutingTable ( { database } )
233+ )
239234
240235 if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
241236 return currentRoutingTable
@@ -481,16 +476,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
481476 async _updateRoutingTable ( newRoutingTable ) {
482477 // close old connections to servers not present in the new routing table
483478 await this . _connectionPool . keepAll ( newRoutingTable . allServers ( ) )
484-
485- // filter out expired to purge (expired for a pre-configured amount of time) routing table entries
486- Object . values ( this . _routingTables ) . forEach ( value => {
487- if ( value . isExpiredFor ( this . _routingTablePurgeDelay ) ) {
488- delete this . _routingTables [ value . database ]
489- }
490- } )
491-
492- // make this driver instance aware of the new table
493- this . _routingTables [ newRoutingTable . database ] = newRoutingTable
479+ this . _routingTableRegistry . removeExpired ( )
480+ this . _routingTableRegistry . register (
481+ newRoutingTable . database ,
482+ newRoutingTable
483+ )
494484 this . _log . info ( `Updated routing table ${ newRoutingTable } ` )
495485 }
496486
@@ -501,3 +491,96 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
501491 }
502492 }
503493}
494+
495+ /**
496+ * Responsible for keeping track of the existing routing tables
497+ */
498+ class RoutingTableRegistry {
499+ /**
500+ * Constructor
501+ * @param {int } routingTablePurgeDelay The routing table purge delay
502+ */
503+ constructor ( routingTablePurgeDelay ) {
504+ this . _tables = new Map ( )
505+ this . _routingTablePurgeDelay = routingTablePurgeDelay
506+ }
507+
508+ /**
509+ * Put a routing table in the registry
510+ *
511+ * @param {string } database The database name
512+ * @param {RoutingTable } table The routing table
513+ * @returns {RoutingTableRegistry } this
514+ */
515+ register ( database , table ) {
516+ this . _tables . set ( database , table )
517+ return this
518+ }
519+
520+ /**
521+ * Apply function in the routing table for an specific database. If the database name is not defined, the function will
522+ * be applied for each element
523+ *
524+ * @param {string } database The database name
525+ * @param {object } callbacks The actions
526+ * @param {function (RoutingTable) } callbacks.applyWhenExists Call when the db exists or when the database property is not informed
527+ * @param {function () } callbacks.applyWhenDontExists Call when the database doesn't have the routing table registred
528+ * @returns {RoutingTableRegistry } this
529+ */
530+ apply ( database , { applyWhenExists, applyWhenDontExists = ( ) => { } } = { } ) {
531+ if ( this . _tables . has ( database ) ) {
532+ applyWhenExists ( this . _tables . get ( database ) )
533+ } else if ( typeof database === 'string' || database === null ) {
534+ applyWhenDontExists ( )
535+ } else {
536+ this . _forEach ( applyWhenExists )
537+ }
538+ return this
539+ }
540+
541+ /**
542+ * Retrieves a routing table from a given database name
543+ * @param {string } database The database name
544+ * @param {function()|RoutingTable } defaultSupplier The routing table supplier, if it's not a function or not exists, it will return itself as default value
545+ * @returns {RoutingTable } The routing table for the respective database
546+ */
547+ get ( database , defaultSupplier ) {
548+ if ( this . _tables . has ( database ) ) {
549+ return this . _tables . get ( database )
550+ }
551+ return typeof defaultSupplier === 'function'
552+ ? defaultSupplier ( )
553+ : defaultSupplier
554+ }
555+
556+ /**
557+ * Remove the routing table which is already expired
558+ * @returns {RoutingTableRegistry } this
559+ */
560+ removeExpired ( ) {
561+ return this . _removeIf ( value =>
562+ value . isExpiredFor ( this . _routingTablePurgeDelay )
563+ )
564+ }
565+
566+ _forEach ( apply ) {
567+ for ( const [ , value ] of this . _tables ) {
568+ apply ( value )
569+ }
570+ return this
571+ }
572+
573+ _remove ( key ) {
574+ this . _tables . delete ( key )
575+ return this
576+ }
577+
578+ _removeIf ( predicate ) {
579+ for ( const [ key , value ] of this . _tables ) {
580+ if ( predicate ( value ) ) {
581+ this . _remove ( key )
582+ }
583+ }
584+ return this
585+ }
586+ }
0 commit comments