diff --git a/app/submodule/mpool/mpool_submodule.go b/app/submodule/mpool/mpool_submodule.go index ee6af4cb44..ac7f8add2e 100644 --- a/app/submodule/mpool/mpool_submodule.go +++ b/app/submodule/mpool/mpool_submodule.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strconv" + "sync" "time" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -51,8 +52,7 @@ type messagepoolConfig interface { // MessagingSubmodule enhances the `Node` with internal message capabilities. type MessagePoolSubmodule struct { //nolint // Network Fields - MessageTopic *pubsub.Topic - MessageSub *pubsub.Subscription + MessageSub *pubsub.Subscription MPool *messagepool.MessagePool msgSigner *messagepool.MessageSigner @@ -182,15 +182,20 @@ func (mp *MessagePoolSubmodule) Start(ctx context.Context) error { return err } + msgTopic, err := mp.network.Pubsub.Join(topicName) + if err != nil { + return err + } + + var once sync.Once subscribe := func() { - var err error - if mp.MessageTopic, err = mp.network.Pubsub.Join(topicName); err != nil { - panic(err) - } - if mp.MessageSub, err = mp.MessageTopic.Subscribe(); err != nil { - panic(err) - } - go mp.handleIncomingMessage(ctx) + once.Do(func() { + var err error + if mp.MessageSub, err = msgTopic.Subscribe(); err != nil { + panic(err) + } + go mp.handleIncomingMessage(ctx) + }) } // wait until we are synced within 10 epochs