@@ -416,6 +416,41 @@ function adapter(uri, opts) {
416416 Adapter . prototype . broadcast . call ( this , packet , opts ) ;
417417 } ;
418418
419+
420+ /**
421+ * Get the number of subscribers of a channel
422+ *
423+ * @param {String } channel
424+ */
425+
426+ function getNumSub ( channel ) {
427+ if ( pub . constructor . name != 'Cluster' ) {
428+ // RedisClient or Redis
429+ return new Promise ( function ( resolve , reject ) {
430+ pub . send_command ( 'pubsub' , [ 'numsub' , channel ] , function ( err , numsub ) {
431+ if ( err ) return reject ( err ) ;
432+ resolve ( parseInt ( numsub [ 1 ] , 10 ) ) ;
433+ } ) ;
434+ } )
435+ } else {
436+ // Cluster
437+ var nodes = pub . nodes ( ) ;
438+ return Promise . all (
439+ nodes . map ( function ( node ) {
440+ return node . send_command ( 'pubsub' , [ 'numsub' , channel ] ) ;
441+ } )
442+ ) . then ( function ( values ) {
443+ var numsub = 0 ;
444+ values . map ( function ( value ) {
445+ numsub += parseInt ( value [ 1 ] , 10 ) ;
446+ } )
447+ return numsub ;
448+ } ) . catch ( function ( err ) {
449+ throw err ;
450+ } ) ;
451+ }
452+ }
453+
419454 /**
420455 * Gets a list of clients by sid.
421456 *
@@ -435,14 +470,7 @@ function adapter(uri, opts) {
435470 var self = this ;
436471 var requestid = uid2 ( 6 ) ;
437472
438- pub . send_command ( 'pubsub' , [ 'numsub' , self . requestChannel ] , function ( err , numsub ) {
439- if ( err ) {
440- self . emit ( 'error' , err ) ;
441- if ( fn ) fn ( err ) ;
442- return ;
443- }
444-
445- numsub = parseInt ( numsub [ 1 ] , 10 ) ;
473+ getNumSub ( self . requestChannel ) . then ( numsub => {
446474 debug ( 'waiting for %d responses to "clients" request' , numsub ) ;
447475
448476 var request = JSON . stringify ( {
@@ -468,6 +496,9 @@ function adapter(uri, opts) {
468496 } ;
469497
470498 pub . publish ( self . requestChannel , request ) ;
499+ } ) . catch ( err => {
500+ self . emit ( 'error' , err ) ;
501+ if ( fn ) fn ( err ) ;
471502 } ) ;
472503 } ;
473504
@@ -524,14 +555,7 @@ function adapter(uri, opts) {
524555 var self = this ;
525556 var requestid = uid2 ( 6 ) ;
526557
527- pub . send_command ( 'pubsub' , [ 'numsub' , self . requestChannel ] , function ( err , numsub ) {
528- if ( err ) {
529- self . emit ( 'error' , err ) ;
530- if ( fn ) fn ( err ) ;
531- return ;
532- }
533-
534- numsub = parseInt ( numsub [ 1 ] , 10 ) ;
558+ getNumSub ( self . requestChannel ) . then ( numsub => {
535559 debug ( 'waiting for %d responses to "allRooms" request' , numsub ) ;
536560
537561 var request = JSON . stringify ( {
@@ -556,6 +580,9 @@ function adapter(uri, opts) {
556580 } ;
557581
558582 pub . publish ( self . requestChannel , request ) ;
583+ } ) . catch ( err => {
584+ self . emit ( 'error' , err ) ;
585+ if ( fn ) fn ( err ) ;
559586 } ) ;
560587 } ;
561588
@@ -700,14 +727,7 @@ function adapter(uri, opts) {
700727 var self = this ;
701728 var requestid = uid2 ( 6 ) ;
702729
703- pub . send_command ( 'pubsub' , [ 'numsub' , self . requestChannel ] , function ( err , numsub ) {
704- if ( err ) {
705- self . emit ( 'error' , err ) ;
706- if ( fn ) fn ( err ) ;
707- return ;
708- }
709-
710- numsub = parseInt ( numsub [ 1 ] , 10 ) ;
730+ getNumSub ( self . requestChannel ) . then ( numsub => {
711731 debug ( 'waiting for %d responses to "customRequest" request' , numsub ) ;
712732
713733 var request = JSON . stringify ( {
@@ -733,6 +753,9 @@ function adapter(uri, opts) {
733753 } ;
734754
735755 pub . publish ( self . requestChannel , request ) ;
756+ } ) . catch ( err => {
757+ self . emit ( 'error' , err ) ;
758+ if ( fn ) fn ( err ) ;
736759 } ) ;
737760 } ;
738761
0 commit comments