Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions whisper/whisperv6/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Peer struct {
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth

quit chan struct{}

wg sync.WaitGroup
}

// newPeer creates a new whisper peer object, but does not run the handshake itself.
Expand All @@ -64,13 +66,15 @@ func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
// start initiates the peer updater, periodically broadcasting the whisper packets
// into the network.
func (peer *Peer) start() {
peer.wg.Add(1)
go peer.update()
log.Trace("start", "peer", peer.ID())
}

// stop terminates the peer updater, stopping message forwarding to it.
func (peer *Peer) stop() {
close(peer.quit)
peer.wg.Wait()
log.Trace("stop", "peer", peer.ID())
}

Expand All @@ -81,7 +85,9 @@ func (peer *Peer) handshake() error {
errc := make(chan error, 1)
isLightNode := peer.host.LightClientMode()
isRestrictedLightNodeConnection := peer.host.LightClientModeConnectionRestricted()
peer.wg.Add(1)
go func() {
defer peer.wg.Done()
pow := peer.host.MinPow()
powConverted := math.Float64bits(pow)
bloom := peer.host.BloomFilter()
Expand Down Expand Up @@ -144,6 +150,7 @@ func (peer *Peer) handshake() error {
// update executes periodic operations on the peer, including message transmission
// and expiration.
func (peer *Peer) update() {
defer peer.wg.Done()
// Start the tickers for the updates
expire := time.NewTicker(expirationCycle)
defer expire.Stop()
Expand Down
11 changes: 11 additions & 0 deletions whisper/whisperv6/whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type Whisper struct {
stats Statistics // Statistics of whisper node

mailServer MailServer // MailServer interface

wg sync.WaitGroup
}

// New creates a Whisper client ready to communicate through the Ethereum P2P network.
Expand Down Expand Up @@ -243,8 +245,10 @@ func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
whisper.settings.Store(bloomFilterIdx, b)
whisper.notifyPeersAboutBloomFilterChange(b)

whisper.wg.Add(1)
go func() {
// allow some time before all the peers have processed the notification
defer whisper.wg.Done()
time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
whisper.settings.Store(bloomFilterToleranceIdx, b)
}()
Expand All @@ -261,7 +265,9 @@ func (whisper *Whisper) SetMinimumPoW(val float64) error {
whisper.settings.Store(minPowIdx, val)
whisper.notifyPeersAboutPowRequirementChange(val)

whisper.wg.Add(1)
go func() {
defer whisper.wg.Done()
// allow some time before all the peers have processed the notification
time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
whisper.settings.Store(minPowToleranceIdx, val)
Expand Down Expand Up @@ -626,10 +632,12 @@ func (whisper *Whisper) Send(envelope *Envelope) error {
// of the Whisper protocol.
func (whisper *Whisper) Start(*p2p.Server) error {
log.Info("started whisper v." + ProtocolVersionStr)
whisper.wg.Add(1)
go whisper.update()

numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ {
whisper.wg.Add(1)
go whisper.processQueue()
}

Expand All @@ -640,6 +648,7 @@ func (whisper *Whisper) Start(*p2p.Server) error {
// of the Whisper protocol.
func (whisper *Whisper) Stop() error {
close(whisper.quit)
whisper.wg.Wait()
log.Info("whisper stopped")
return nil
}
Expand Down Expand Up @@ -874,6 +883,7 @@ func (whisper *Whisper) checkOverflow() {

// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
func (whisper *Whisper) processQueue() {
defer whisper.wg.Done()
var e *Envelope
for {
select {
Expand All @@ -892,6 +902,7 @@ func (whisper *Whisper) processQueue() {
// update loops until the lifetime of the whisper node, updating its internal
// state by expiring stale messages from the pool.
func (whisper *Whisper) update() {
defer whisper.wg.Done()
// Start a ticker to check for expirations
expire := time.NewTicker(expirationCycle)

Expand Down