Skip to content

Commit 632e466

Browse files
fjlenriquefynn
authored andcommitted
eth: improve shutdown synchronization (ethereum#20695)
* eth: improve shutdown synchronization Most goroutines started by eth.Ethereum didn't have any shutdown sync at all, which lead to weird error messages when quitting the client. This change improves the clean shutdown path by stopping all internal components in dependency order and waiting for them to actually be stopped before shutdown is considered done. In particular, we now stop everything related to peers before stopping 'resident' parts such as core.BlockChain. * eth: rewrite sync controller * eth: remove sync start debug message * eth: notify chainSyncer about new peers after handshake * eth: move downloader.Cancel call into chainSyncer * eth: make post-sync block broadcast synchronous * eth: add comments * core: change blockchain stop message * eth: change closeBloomHandler channel type
1 parent 2713760 commit 632e466

File tree

8 files changed

+213
-138
lines changed

8 files changed

+213
-138
lines changed

core/blockchain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,7 @@ func (bc *BlockChain) Stop() {
897897
log.Error("Dangling trie nodes after full cleanup")
898898
}
899899
}
900-
log.Info("Blockchain manager stopped")
900+
log.Info("Blockchain stopped")
901901
}
902902

903903
func (bc *BlockChain) procFutureBlocks() {

eth/backend.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ type LesServer interface {
6767
type Ethereum struct {
6868
config *Config
6969

70-
// Channel for shutting down the service
71-
shutdownChan chan bool
72-
7370
// Handlers
7471
txPool *core.TxPool
7572
blockchain *core.BlockChain
@@ -84,8 +81,9 @@ type Ethereum struct {
8481
engine consensus.Engine
8582
accountManager *accounts.Manager
8683

87-
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
88-
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
84+
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
85+
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
86+
closeBloomHandler chan struct{}
8987

9088
APIBackend *EthAPIBackend
9189

@@ -145,17 +143,17 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
145143
log.Info("Initialised chain configuration", "config", chainConfig)
146144

147145
eth := &Ethereum{
148-
config: config,
149-
chainDb: chainDb,
150-
eventMux: ctx.EventMux,
151-
accountManager: ctx.AccountManager,
152-
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb),
153-
shutdownChan: make(chan bool),
154-
networkID: config.NetworkId,
155-
gasPrice: config.Miner.GasPrice,
156-
etherbase: config.Miner.Etherbase,
157-
bloomRequests: make(chan chan *bloombits.Retrieval),
158-
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
146+
config: config,
147+
chainDb: chainDb,
148+
eventMux: ctx.EventMux,
149+
accountManager: ctx.AccountManager,
150+
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb),
151+
closeBloomHandler: make(chan struct{}),
152+
networkID: config.NetworkId,
153+
gasPrice: config.Miner.GasPrice,
154+
etherbase: config.Miner.Etherbase,
155+
bloomRequests: make(chan chan *bloombits.Retrieval),
156+
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
159157
}
160158

161159
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
@@ -557,18 +555,20 @@ func (s *Ethereum) Start(srvr *p2p.Server) error {
557555
// Stop implements node.Service, terminating all internal goroutines used by the
558556
// Ethereum protocol.
559557
func (s *Ethereum) Stop() error {
560-
s.bloomIndexer.Close()
561-
s.blockchain.Stop()
562-
s.engine.Close()
558+
// Stop all the peer-related stuff first.
563559
s.protocolManager.Stop()
564560
if s.lesServer != nil {
565561
s.lesServer.Stop()
566562
}
563+
564+
// Then stop everything else.
565+
s.bloomIndexer.Close()
566+
close(s.closeBloomHandler)
567567
s.txPool.Stop()
568568
s.miner.Stop()
569-
s.eventMux.Stop()
570-
569+
s.blockchain.Stop()
570+
s.engine.Close()
571571
s.chainDb.Close()
572-
close(s.shutdownChan)
572+
s.eventMux.Stop()
573573
return nil
574574
}

eth/bloombits.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
5454
go func() {
5555
for {
5656
select {
57-
case <-eth.shutdownChan:
57+
case <-eth.closeBloomHandler:
5858
return
5959

6060
case request := <-eth.bloomRequests:

eth/handler.go

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,12 @@ type ProtocolManager struct {
8787
whitelist map[uint64]common.Hash
8888

8989
// channels for fetcher, syncer, txsyncLoop
90-
newPeerCh chan *peer
91-
txsyncCh chan *txsync
92-
quitSync chan struct{}
93-
noMorePeers chan struct{}
90+
txsyncCh chan *txsync
91+
quitSync chan struct{}
9492

95-
// wait group is used for graceful shutdowns during downloading
96-
// and processing
97-
wg sync.WaitGroup
93+
chainSync *chainSyncer
94+
wg sync.WaitGroup
95+
peerWG sync.WaitGroup
9896

9997
// Test fields or hooks
10098
broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation
@@ -105,18 +103,17 @@ type ProtocolManager struct {
105103
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
106104
// Create the protocol manager with the base fields
107105
manager := &ProtocolManager{
108-
networkID: networkID,
109-
forkFilter: forkid.NewFilter(blockchain),
110-
eventMux: mux,
111-
txpool: txpool,
112-
blockchain: blockchain,
113-
peers: newPeerSet(),
114-
whitelist: whitelist,
115-
newPeerCh: make(chan *peer),
116-
noMorePeers: make(chan struct{}),
117-
txsyncCh: make(chan *txsync),
118-
quitSync: make(chan struct{}),
106+
networkID: networkID,
107+
forkFilter: forkid.NewFilter(blockchain),
108+
eventMux: mux,
109+
txpool: txpool,
110+
blockchain: blockchain,
111+
peers: newPeerSet(),
112+
whitelist: whitelist,
113+
txsyncCh: make(chan *txsync),
114+
quitSync: make(chan struct{}),
119115
}
116+
120117
if mode == downloader.FullSync {
121118
// The database seems empty as the current block is the genesis. Yet the fast
122119
// block is ahead, so fast sync was enabled for this node at a certain point.
@@ -140,6 +137,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
140137
manager.fastSync = uint32(1)
141138
}
142139
}
140+
143141
// If we have trusted checkpoints, enforce them on the chain
144142
if checkpoint != nil {
145143
manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
@@ -199,6 +197,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
199197
}
200198
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
201199

200+
manager.chainSync = newChainSyncer(manager)
201+
202202
return manager, nil
203203
}
204204

@@ -213,15 +213,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
213213
Version: version,
214214
Length: length,
215215
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
216-
peer := pm.newPeer(int(version), p, rw, pm.txpool.Get)
217-
select {
218-
case pm.newPeerCh <- peer:
219-
pm.wg.Add(1)
220-
defer pm.wg.Done()
221-
return pm.handle(peer)
222-
case <-pm.quitSync:
223-
return p2p.DiscQuitting
224-
}
216+
return pm.runPeer(pm.newPeer(int(version), p, rw, pm.txpool.Get))
225217
},
226218
NodeInfo: func() interface{} {
227219
return pm.NodeInfo()
@@ -260,40 +252,37 @@ func (pm *ProtocolManager) Start(maxPeers int) {
260252
pm.maxPeers = maxPeers
261253

262254
// broadcast transactions
255+
pm.wg.Add(1)
263256
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
264257
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
265258
go pm.txBroadcastLoop()
266259

267260
// broadcast mined blocks
261+
pm.wg.Add(1)
268262
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
269263
go pm.minedBroadcastLoop()
270264

271265
// start sync handlers
272-
go pm.syncer()
266+
pm.wg.Add(2)
267+
go pm.chainSync.loop()
273268
go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
274269
}
275270

276271
func (pm *ProtocolManager) Stop() {
277-
log.Info("Stopping Ethereum protocol")
278-
279272
pm.txsSub.Unsubscribe() // quits txBroadcastLoop
280273
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
281274

282-
// Quit the sync loop.
283-
// After this send has completed, no new peers will be accepted.
284-
pm.noMorePeers <- struct{}{}
285-
286-
// Quit fetcher, txsyncLoop.
275+
// Quit chainSync and txsync64.
276+
// After this is done, no new peers will be accepted.
287277
close(pm.quitSync)
278+
pm.wg.Wait()
288279

289280
// Disconnect existing sessions.
290281
// This also closes the gate for any new registrations on the peer set.
291282
// sessions which are already established but not added to pm.peers yet
292283
// will exit when they try to register.
293284
pm.peers.Close()
294-
295-
// Wait for all peer handler goroutines and the loops to come down.
296-
pm.wg.Wait()
285+
pm.peerWG.Wait()
297286

298287
log.Info("Ethereum protocol stopped")
299288
}
@@ -302,6 +291,15 @@ func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, ge
302291
return newPeer(pv, p, rw, getPooledTx)
303292
}
304293

294+
func (pm *ProtocolManager) runPeer(p *peer) error {
295+
if !pm.chainSync.handlePeerEvent(p) {
296+
return p2p.DiscQuitting
297+
}
298+
pm.peerWG.Add(1)
299+
defer pm.peerWG.Done()
300+
return pm.handle(p)
301+
}
302+
305303
// handle is the callback invoked to manage the life cycle of an eth peer. When
306304
// this function terminates, the peer is disconnected.
307305
func (pm *ProtocolManager) handle(p *peer) error {
@@ -323,6 +321,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
323321
p.Log().Debug("Ethereum handshake failed", "err", err)
324322
return err
325323
}
324+
326325
// Register the peer locally
327326
if err := pm.peers.Register(p); err != nil {
328327
p.Log().Error("Ethereum peer registration failed", "err", err)
@@ -334,6 +333,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
334333
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
335334
return err
336335
}
336+
pm.chainSync.handlePeerEvent(p)
337+
337338
// Propagate existing transactions. new transactions appearing
338339
// after this will be sent via broadcasts.
339340
pm.syncTransactions(p)
@@ -723,14 +724,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
723724
// Update the peer's total difficulty if better than the previous
724725
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
725726
p.SetHead(trueHead, trueTD)
726-
727-
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
728-
// a single block (as the true TD is below the propagated block), however this
729-
// scenario should easily be covered by the fetcher.
730-
currentHeader := pm.blockchain.CurrentHeader()
731-
if trueTD.Cmp(pm.blockchain.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())) > 0 {
732-
go pm.synchronise(p)
733-
}
727+
pm.chainSync.handlePeerEvent(p)
734728
}
735729

736730
case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65:
@@ -883,9 +877,10 @@ func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propaga
883877
}
884878
}
885879

886-
// Mined broadcast loop
880+
// minedBroadcastLoop sends mined blocks to connected peers.
887881
func (pm *ProtocolManager) minedBroadcastLoop() {
888-
// automatically stops if unsubscribe
882+
defer pm.wg.Done()
883+
889884
for obj := range pm.minedBlockSub.Chan() {
890885
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
891886
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
@@ -894,7 +889,10 @@ func (pm *ProtocolManager) minedBroadcastLoop() {
894889
}
895890
}
896891

892+
// txBroadcastLoop announces new transactions to connected peers.
897893
func (pm *ProtocolManager) txBroadcastLoop() {
894+
defer pm.wg.Done()
895+
898896
for {
899897
select {
900898
case event := <-pm.txsCh:
@@ -906,7 +904,6 @@ func (pm *ProtocolManager) txBroadcastLoop() {
906904
pm.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
907905
pm.BroadcastTransactions(event.Txs, false) // Only then announce to the rest
908906

909-
// Err() channel will be closed when unsubscribing.
910907
case <-pm.txsSub.Err():
911908
return
912909
}

eth/helper_test.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -170,23 +170,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
170170
// Create a message pipe to communicate through
171171
app, net := p2p.MsgPipe()
172172

173-
// Generate a random id and create the peer
173+
// Start the peer on a new thread
174174
var id enode.ID
175175
rand.Read(id[:])
176-
177176
peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net, pm.txpool.Get)
178-
179-
// Start the peer on a new thread
180177
errc := make(chan error, 1)
181-
go func() {
182-
select {
183-
case pm.newPeerCh <- peer:
184-
errc <- pm.handle(peer)
185-
case <-pm.quitSync:
186-
errc <- p2p.DiscQuitting
187-
}
188-
}()
178+
go func() { errc <- pm.runPeer(peer) }()
189179
tp := &testPeer{app: app, net: net, peer: peer}
180+
190181
// Execute any implicitly requested handshakes and return
191182
if shake {
192183
var (

eth/protocol_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ func testSyncTransaction(t *testing.T, propagtion bool) {
385385
go pmFetcher.handle(pmFetcher.newPeer(65, p2p.NewPeer(enode.ID{}, "fetcher", nil), io1, pmFetcher.txpool.Get))
386386

387387
time.Sleep(250 * time.Millisecond)
388-
pmFetcher.synchronise(pmFetcher.peers.BestPeer())
388+
pmFetcher.doSync(peerToSyncOp(downloader.FullSync, pmFetcher.peers.BestPeer()))
389389
atomic.StoreUint32(&pmFetcher.acceptTxs, 1)
390390

391391
newTxs := make(chan core.NewTxsEvent, 1024)

0 commit comments

Comments
 (0)