From 947a0f4ee0f91ebec39702a4b918a8aa564a72f0 Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 24 Sep 2024 18:49:04 +0900 Subject: [PATCH 01/23] wip/neighborMsg handling --- lib/grandpa/grandpa.go | 24 +++++++++++++++- lib/grandpa/message_handler.go | 25 ++++++++++++++++- lib/grandpa/message_tracker.go | 50 ++++++++++++++++++++++++++++++---- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 74296c6aaa..ac968afd11 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -1139,8 +1139,30 @@ func (s *Service) handleVoteMessage(from peer.ID, vote *VoteMessage) (err error) return nil } +func (s *Service) handleNeighborMessage(round uint64, setID uint64) error { + // TODO sender side of neighbor msg + highestHeader, err := s.blockState.GetHighestFinalisedHeader() + if err != nil { + return err + } + neighbourMessage := &NeighbourPacketV1{ + Round: round, + SetID: setID, + Number: uint32(highestHeader.Number), + } + + cm, err := neighbourMessage.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting neighbour message to network message: %w", err) + } + + logger.Errorf("sending neighbour message: %v", neighbourMessage) + s.network.GossipMessage(cm) + return nil +} + func (s *Service) handleCommitMessage(commitMessage *CommitMessage) error { - logger.Debugf("received commit message: %+v", commitMessage) + logger.Warnf("received commit message: %+v", commitMessage) err := verifyBlockHashAgainstBlockNumber(s.blockState, commitMessage.Vote.Hash, uint(commitMessage.Vote.Number)) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index df607d6915..a5e456dfcf 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -31,6 +31,8 @@ type MessageHandler struct { grandpa *Service blockState BlockState telemetry Telemetry + + isStart bool // This is a temp hacky way } // NewMessageHandler returns a new MessageHandler @@ -39,6 +41,7 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer grandpa: grandpa, blockState: blockState, telemetry: telemetryMailer, + isStart: true, } } @@ -82,8 +85,28 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } } -func (*MessageHandler) handleNeighbourMessage(_ *NeighbourPacketV1) error { +func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1) error { // TODO(#2931) + // This should be the receiver side of the handling messages, NOT GOSSIP + if h.isStart { + logger.Errorf("Received initial neighbor msg") + neighbourMessage := &NeighbourPacketV1{ + Round: packet.Round, + SetID: packet.SetID, + Number: packet.Number, + } + + cm, err := neighbourMessage.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting neighbour message to network message: %w", err) + } + + logger.Errorf("sending neighbour message: %v", neighbourMessage) + h.grandpa.network.GossipMessage(cm) + h.isStart = false + } + + // TODO handle in normal case? return nil } diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 381760acc6..35bedf7162 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -22,6 +22,9 @@ type tracker struct { in chan *types.Block // receive imported block from BlockState stopped chan struct{} + neighborIn chan NeighbourPacketV1 // trigger the sending of a neighbor message + stoppedNeighbor chan struct{} + catchUpResponseMessageMutex sync.Mutex // round(uint64) is used as key and *CatchUpResponse as value catchUpResponseMessages map[uint64]*CatchUpResponse @@ -33,22 +36,28 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { commitsCapacity = 1000 ) return &tracker{ - blockState: bs, - handler: handler, - votes: newVotesTracker(votesCapacity), - commits: newCommitsTracker(commitsCapacity), - in: bs.GetImportedBlockNotifierChannel(), - stopped: make(chan struct{}), + blockState: bs, + handler: handler, + votes: newVotesTracker(votesCapacity), + commits: newCommitsTracker(commitsCapacity), + in: bs.GetImportedBlockNotifierChannel(), + stopped: make(chan struct{}), + + neighborIn: make(chan NeighbourPacketV1), + stoppedNeighbor: make(chan struct{}), + catchUpResponseMessages: make(map[uint64]*CatchUpResponse), } } func (t *tracker) start() { go t.handleBlocks() + go t.handleNeighborMessage() } func (t *tracker) stop() { close(t.stopped) + close(t.stoppedNeighbor) t.blockState.FreeImportedBlockNotifierChannel(t.in) } @@ -62,6 +71,11 @@ func (t *tracker) addVote(peerID peer.ID, message *VoteMessage) { func (t *tracker) addCommit(cm *CommitMessage) { t.commits.add(cm) + t.neighborIn <- NeighbourPacketV1{ + Round: cm.Round + 1, + SetID: cm.SetID, // need to hceck for set changes + Number: 0, // This gets modified later + } } func (t *tracker) addCatchUpResponse(_ *CatchUpResponse) { @@ -92,6 +106,30 @@ func (t *tracker) handleBlocks() { } } +func (t *tracker) handleNeighborMessage() { + // https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 + const duration = time.Minute * 2 + ticker := time.NewTicker(duration) + defer ticker.Stop() + + for { + select { + case msg := <-t.neighborIn: + logger.Warnf("Event Channel handleNeighborMessage Triggered") + err := t.handler.grandpa.handleNeighborMessage(msg.Round, msg.SetID) + if err != nil { + logger.Errorf("handling neighbor message: %v", err) + } + + ticker.Reset(duration) + case <-ticker.C: + logger.Warnf("Tick handleNeighborMessage") + case <-t.stoppedNeighbor: + return + } + } +} + func (t *tracker) handleBlock(b *types.Block) { h := b.Header.Hash() vms := t.votes.messages(h) From 3b01286f5142813adca0abd3beb70cf15123a470 Mon Sep 17 00:00:00 2001 From: jimboj Date: Wed, 25 Sep 2024 22:51:41 +0900 Subject: [PATCH 02/23] wip/catch up request flow --- lib/grandpa/catch_up.go | 69 ++++++++++++++++++++++++++++++++++ lib/grandpa/grandpa.go | 9 +++++ lib/grandpa/message_handler.go | 23 +++--------- lib/grandpa/message_tracker.go | 2 +- 4 files changed, 85 insertions(+), 18 deletions(-) create mode 100644 lib/grandpa/catch_up.go diff --git a/lib/grandpa/catch_up.go b/lib/grandpa/catch_up.go new file mode 100644 index 0000000000..b44138e134 --- /dev/null +++ b/lib/grandpa/catch_up.go @@ -0,0 +1,69 @@ +package grandpa + +import ( + "fmt" + "github.com/libp2p/go-libp2p/core/peer" + "sync" + "sync/atomic" + "time" +) + +type catchUp struct { + lock sync.Mutex + + grandpa *Service + readyToCatchUp *atomic.Bool + shutdownCatchup chan struct{} +} + +func newCatchUp(grandpa *Service) *catchUp { + c := &catchUp{ + readyToCatchUp: &atomic.Bool{}, + grandpa: grandpa, + shutdownCatchup: make(chan struct{}), + } + c.readyToCatchUp.Store(true) + return c +} + +func (c *catchUp) tryCatchUp(round uint64, setID uint64, peer peer.ID) error { + logger.Warnf("Trying to catch up") + c.lock.Lock() + if !c.readyToCatchUp.Load() { + // Fine we just skip + return nil + } + c.lock.Lock() + catchUpRequest := newCatchUpRequest(round, setID) + cm, err := catchUpRequest.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting catchUpRequest to network message: %w", err) + } + + logger.Warnf("sending catchup request message: %v", catchUpRequest) + err = c.grandpa.network.SendMessage(peer, cm) + if err != nil { + return fmt.Errorf("sending catchUpRequest to network message: %w", err) + } + c.readyToCatchUp.Store(false) + logger.Warnf("successfully tryed catch up") + return nil +} + +func (c *catchUp) initCatchUp() { + logger.Warnf("Initializing catch up") + const duration = time.Second * 30 + ticker := time.NewTicker(duration) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + logger.Warnf("Ready to catch up again") + c.readyToCatchUp.Store(true) + case <-c.shutdownCatchup: + logger.Warnf("Closing catch up") + return + } + } +} diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index ac968afd11..69856ece06 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -80,6 +80,9 @@ type Service struct { finalisedCh chan *types.FinalisationInfo telemetry Telemetry + + //catchUp *catchUp + catchUp *catchUp } // Config represents a GRANDPA service configuration @@ -157,14 +160,18 @@ func NewService(cfg *Config) (*Service, error) { return nil, err } + s.catchUp = newCatchUp(s) + s.messageHandler = NewMessageHandler(s, s.blockState, cfg.Telemetry) s.tracker = newTracker(s.blockState, s.messageHandler) s.paused.Store(false) + return s, nil } // Start begins the GRANDPA finality service func (s *Service) Start() error { + go s.catchUp.initCatchUp() // if we're not an authority, we don't need to worry about the voting process. // the grandpa service is only used to verify incoming block justifications if !s.authority { @@ -190,6 +197,7 @@ func (s *Service) Stop() error { s.cancel() s.blockState.FreeFinalisedNotifierChannel(s.finalisedCh) + close(s.catchUp.shutdownCatchup) if !s.authority { return nil @@ -1168,6 +1176,7 @@ func (s *Service) handleCommitMessage(commitMessage *CommitMessage) error { commitMessage.Vote.Hash, uint(commitMessage.Vote.Number)) if err != nil { if errors.Is(err, database.ErrNotFound) { + logger.Warnf("Not able to verify, adding commit to tracker") s.tracker.addCommit(commitMessage) } diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index a5e456dfcf..efcb4532e8 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -67,7 +67,7 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. return nil, nil //nolint:nilnil case *NeighbourPacketV1: // we can afford to not retry handling neighbour message, if it errors. - return nil, h.handleNeighbourMessage(msg) + return nil, h.handleNeighbourMessage(msg, from) case *CatchUpRequest: return h.handleCatchUpRequest(msg) case *CatchUpResponse: @@ -85,25 +85,14 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } } -func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1) error { +func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) error { // TODO(#2931) // This should be the receiver side of the handling messages, NOT GOSSIP - if h.isStart { - logger.Errorf("Received initial neighbor msg") - neighbourMessage := &NeighbourPacketV1{ - Round: packet.Round, - SetID: packet.SetID, - Number: packet.Number, - } - - cm, err := neighbourMessage.ToConsensusMessage() + if h.grandpa.state.round < packet.Round { + err := h.grandpa.catchUp.tryCatchUp(packet.Round, packet.SetID, from) if err != nil { - return fmt.Errorf("converting neighbour message to network message: %w", err) + return err } - - logger.Errorf("sending neighbour message: %v", neighbourMessage) - h.grandpa.network.GossipMessage(cm) - h.isStart = false } // TODO handle in normal case? @@ -142,7 +131,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { return nil } - logger.Debugf( + logger.Warnf( "received catch up response with hash %s for round %d and set id %d", msg.Hash, msg.Round, msg.SetID) diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 35bedf7162..43d93c3ffd 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -73,7 +73,7 @@ func (t *tracker) addCommit(cm *CommitMessage) { t.commits.add(cm) t.neighborIn <- NeighbourPacketV1{ Round: cm.Round + 1, - SetID: cm.SetID, // need to hceck for set changes + SetID: cm.SetID, // need to check for set changes Number: 0, // This gets modified later } } From 035646c201bc67db89c7f8f3efa4363f1d5d1777 Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 1 Oct 2024 02:02:10 +0900 Subject: [PATCH 03/23] wip --- lib/grandpa/catch_up.go | 4 ++-- lib/grandpa/message_handler.go | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/grandpa/catch_up.go b/lib/grandpa/catch_up.go index b44138e134..83d537309c 100644 --- a/lib/grandpa/catch_up.go +++ b/lib/grandpa/catch_up.go @@ -28,12 +28,12 @@ func newCatchUp(grandpa *Service) *catchUp { func (c *catchUp) tryCatchUp(round uint64, setID uint64, peer peer.ID) error { logger.Warnf("Trying to catch up") - c.lock.Lock() + //c.lock.Lock() if !c.readyToCatchUp.Load() { // Fine we just skip return nil } - c.lock.Lock() + //c.lock.Lock() catchUpRequest := newCatchUpRequest(round, setID) cm, err := catchUpRequest.ToConsensusMessage() if err != nil { diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index efcb4532e8..3fb5fbc14c 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -89,7 +89,7 @@ func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from // TODO(#2931) // This should be the receiver side of the handling messages, NOT GOSSIP if h.grandpa.state.round < packet.Round { - err := h.grandpa.catchUp.tryCatchUp(packet.Round, packet.SetID, from) + err := h.grandpa.catchUp.tryCatchUp(1, packet.SetID, from) if err != nil { return err } @@ -127,6 +127,9 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMe } func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { + logger.Warnf( + "received catch up response with hash %s for round %d and set id %d", + msg.Hash, msg.Round, msg.SetID) if !h.grandpa.authority { return nil } From 8f8c06600836b8885639145e1fb17c7abbbdf085 Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 3 Oct 2024 21:57:11 +0900 Subject: [PATCH 04/23] wip/impl neighbor tracking --- lib/grandpa/catch_up.go | 135 +++++++++++++++++++++++---------- lib/grandpa/grandpa.go | 8 +- lib/grandpa/message_handler.go | 12 +-- 3 files changed, 100 insertions(+), 55 deletions(-) diff --git a/lib/grandpa/catch_up.go b/lib/grandpa/catch_up.go index 83d537309c..2aa87cb4ed 100644 --- a/lib/grandpa/catch_up.go +++ b/lib/grandpa/catch_up.go @@ -3,54 +3,69 @@ package grandpa import ( "fmt" "github.com/libp2p/go-libp2p/core/peer" - "sync" - "sync/atomic" "time" ) -type catchUp struct { - lock sync.Mutex +type neighborState struct { + setID uint64 + round uint64 + //highestFinalized uint32 not sure if i need this or not +} + +type NeighborTracker struct { + //grandpa *Service + network Network - grandpa *Service - readyToCatchUp *atomic.Bool - shutdownCatchup chan struct{} + peerview map[peer.ID]neighborState + currentSetID uint64 + currentRound uint64 + highestFinalized uint32 } -func newCatchUp(grandpa *Service) *catchUp { - c := &catchUp{ - readyToCatchUp: &atomic.Bool{}, - grandpa: grandpa, - shutdownCatchup: make(chan struct{}), +func NewNeighborTracker(network Network) *NeighborTracker { + return &NeighborTracker{ + network: network, + peerview: make(map[peer.ID]neighborState), } - c.readyToCatchUp.Store(true) - return c } -func (c *catchUp) tryCatchUp(round uint64, setID uint64, peer peer.ID) error { - logger.Warnf("Trying to catch up") - //c.lock.Lock() - if !c.readyToCatchUp.Load() { - // Fine we just skip - return nil - } - //c.lock.Lock() - catchUpRequest := newCatchUpRequest(round, setID) - cm, err := catchUpRequest.ToConsensusMessage() - if err != nil { - return fmt.Errorf("converting catchUpRequest to network message: %w", err) - } +func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { + nt.currentSetID = setID + nt.currentRound = round + nt.highestFinalized = highestFinalized +} + + +func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64) { + peerState := neighborState{setID, round} + nt.peerview[p] = peerState +} - logger.Warnf("sending catchup request message: %v", catchUpRequest) - err = c.grandpa.network.SendMessage(peer, cm) - if err != nil { - return fmt.Errorf("sending catchUpRequest to network message: %w", err) +func (nt *NeighborTracker) BroadcastNeighborMsg() error { + for id, peerState := range nt.peerview { + if !(peerState.round < nt.currentRound || peerState.setID < nt.currentSetID) { + // Send msg + packet := NeighbourPacketV1{ + Round: nt.currentRound, + SetID: nt.currentSetID, + Number: nt.highestFinalized, + } + + cm, err := packet.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting NeighbourPacketV1 to network message: %w", err) + } + + err = nt.network.SendMessage(id, cm) + if err != nil { + return fmt.Errorf("sending message to peer: %v", id) + } + } } - c.readyToCatchUp.Store(false) - logger.Warnf("successfully tryed catch up") return nil } -func (c *catchUp) initCatchUp() { +func (nt *NeighborTracker) initCatchUp() { logger.Warnf("Initializing catch up") const duration = time.Second * 30 ticker := time.NewTicker(duration) @@ -60,10 +75,52 @@ func (c *catchUp) initCatchUp() { select { case <-ticker.C: logger.Warnf("Ready to catch up again") - c.readyToCatchUp.Store(true) - case <-c.shutdownCatchup: - logger.Warnf("Closing catch up") - return - } + + //case <-c.shutdownCatchup: + // logger.Warnf("Closing catch up") + // return + //} } } + +//type catchUp struct { +// lock sync.Mutex +// +// grandpa *Service +// readyToCatchUp *atomic.Bool +// shutdownCatchup chan struct{} +//} +// +//func newCatchUp(grandpa *Service) *catchUp { +// c := &catchUp{ +// readyToCatchUp: &atomic.Bool{}, +// grandpa: grandpa, +// shutdownCatchup: make(chan struct{}), +// } +// c.readyToCatchUp.Store(true) +// return c +//} + +//func (c *catchUp) tryCatchUp(round uint64, setID uint64, peer peer.ID) error { +// logger.Warnf("Trying to catch up") +// //c.lock.Lock() +// if !c.readyToCatchUp.Load() { +// // Fine we just skip +// return nil +// } +// //c.lock.Lock() +// catchUpRequest := newCatchUpRequest(round, setID) +// cm, err := catchUpRequest.ToConsensusMessage() +// if err != nil { +// return fmt.Errorf("converting catchUpRequest to network message: %w", err) +// } +// +// logger.Warnf("sending catchup request message: %v", catchUpRequest) +// err = c.grandpa.network.SendMessage(peer, cm) +// if err != nil { +// return fmt.Errorf("sending catchUpRequest to network message: %w", err) +// } +// c.readyToCatchUp.Store(false) +// logger.Warnf("successfully tryed catch up") +// return nil +//} diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 69856ece06..4fc27812c4 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -81,8 +81,7 @@ type Service struct { telemetry Telemetry - //catchUp *catchUp - catchUp *catchUp + neighborTracker *NeighborTracker } // Config represents a GRANDPA service configuration @@ -154,14 +153,13 @@ func NewService(cfg *Config) (*Service, error) { finalisedCh: finalisedCh, interval: cfg.Interval, telemetry: cfg.Telemetry, + neighborTracker: NewNeighborTracker(cfg.Network), } if err := s.registerProtocol(); err != nil { return nil, err } - s.catchUp = newCatchUp(s) - s.messageHandler = NewMessageHandler(s, s.blockState, cfg.Telemetry) s.tracker = newTracker(s.blockState, s.messageHandler) s.paused.Store(false) @@ -171,7 +169,6 @@ func NewService(cfg *Config) (*Service, error) { // Start begins the GRANDPA finality service func (s *Service) Start() error { - go s.catchUp.initCatchUp() // if we're not an authority, we don't need to worry about the voting process. // the grandpa service is only used to verify incoming block justifications if !s.authority { @@ -197,7 +194,6 @@ func (s *Service) Stop() error { s.cancel() s.blockState.FreeFinalisedNotifierChannel(s.finalisedCh) - close(s.catchUp.shutdownCatchup) if !s.authority { return nil diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 3fb5fbc14c..bbad949100 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -86,16 +86,8 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) error { - // TODO(#2931) - // This should be the receiver side of the handling messages, NOT GOSSIP - if h.grandpa.state.round < packet.Round { - err := h.grandpa.catchUp.tryCatchUp(1, packet.SetID, from) - if err != nil { - return err - } - } - - // TODO handle in normal case? + // TODO figure out if i need to do anything more than this, i suspect no + h.grandpa.neighborTracker.UpdatePeer(from, packet.SetID, packet.Round) return nil } From fef75bb0c9e3e81eaee702449c9eb369132df639 Mon Sep 17 00:00:00 2001 From: jimboj Date: Mon, 7 Oct 2024 19:29:43 +0900 Subject: [PATCH 05/23] neighbor message tracker --- lib/grandpa/catch_up.go | 126 -------------------------------- lib/grandpa/grandpa.go | 12 ++- lib/grandpa/message_handler.go | 7 +- lib/grandpa/message_tracker.go | 32 +------- lib/grandpa/neighbor_tracker.go | 121 ++++++++++++++++++++++++++++++ 5 files changed, 138 insertions(+), 160 deletions(-) delete mode 100644 lib/grandpa/catch_up.go create mode 100644 lib/grandpa/neighbor_tracker.go diff --git a/lib/grandpa/catch_up.go b/lib/grandpa/catch_up.go deleted file mode 100644 index 2aa87cb4ed..0000000000 --- a/lib/grandpa/catch_up.go +++ /dev/null @@ -1,126 +0,0 @@ -package grandpa - -import ( - "fmt" - "github.com/libp2p/go-libp2p/core/peer" - "time" -) - -type neighborState struct { - setID uint64 - round uint64 - //highestFinalized uint32 not sure if i need this or not -} - -type NeighborTracker struct { - //grandpa *Service - network Network - - peerview map[peer.ID]neighborState - currentSetID uint64 - currentRound uint64 - highestFinalized uint32 -} - -func NewNeighborTracker(network Network) *NeighborTracker { - return &NeighborTracker{ - network: network, - peerview: make(map[peer.ID]neighborState), - } -} - -func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { - nt.currentSetID = setID - nt.currentRound = round - nt.highestFinalized = highestFinalized -} - - -func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64) { - peerState := neighborState{setID, round} - nt.peerview[p] = peerState -} - -func (nt *NeighborTracker) BroadcastNeighborMsg() error { - for id, peerState := range nt.peerview { - if !(peerState.round < nt.currentRound || peerState.setID < nt.currentSetID) { - // Send msg - packet := NeighbourPacketV1{ - Round: nt.currentRound, - SetID: nt.currentSetID, - Number: nt.highestFinalized, - } - - cm, err := packet.ToConsensusMessage() - if err != nil { - return fmt.Errorf("converting NeighbourPacketV1 to network message: %w", err) - } - - err = nt.network.SendMessage(id, cm) - if err != nil { - return fmt.Errorf("sending message to peer: %v", id) - } - } - } - return nil -} - -func (nt *NeighborTracker) initCatchUp() { - logger.Warnf("Initializing catch up") - const duration = time.Second * 30 - ticker := time.NewTicker(duration) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - logger.Warnf("Ready to catch up again") - - //case <-c.shutdownCatchup: - // logger.Warnf("Closing catch up") - // return - //} - } -} - -//type catchUp struct { -// lock sync.Mutex -// -// grandpa *Service -// readyToCatchUp *atomic.Bool -// shutdownCatchup chan struct{} -//} -// -//func newCatchUp(grandpa *Service) *catchUp { -// c := &catchUp{ -// readyToCatchUp: &atomic.Bool{}, -// grandpa: grandpa, -// shutdownCatchup: make(chan struct{}), -// } -// c.readyToCatchUp.Store(true) -// return c -//} - -//func (c *catchUp) tryCatchUp(round uint64, setID uint64, peer peer.ID) error { -// logger.Warnf("Trying to catch up") -// //c.lock.Lock() -// if !c.readyToCatchUp.Load() { -// // Fine we just skip -// return nil -// } -// //c.lock.Lock() -// catchUpRequest := newCatchUpRequest(round, setID) -// cm, err := catchUpRequest.ToConsensusMessage() -// if err != nil { -// return fmt.Errorf("converting catchUpRequest to network message: %w", err) -// } -// -// logger.Warnf("sending catchup request message: %v", catchUpRequest) -// err = c.grandpa.network.SendMessage(peer, cm) -// if err != nil { -// return fmt.Errorf("sending catchUpRequest to network message: %w", err) -// } -// c.readyToCatchUp.Store(false) -// logger.Warnf("successfully tryed catch up") -// return nil -//} diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 4fc27812c4..a73b15f49e 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -77,7 +77,8 @@ type Service struct { bestFinalCandidate map[uint64]*Vote // map of round number -> best final candidate // channels for communication with other services - finalisedCh chan *types.FinalisationInfo + finalisedCh chan *types.FinalisationInfo + neighborMsgChan chan neighborData telemetry Telemetry @@ -153,9 +154,11 @@ func NewService(cfg *Config) (*Service, error) { finalisedCh: finalisedCh, interval: cfg.Interval, telemetry: cfg.Telemetry, - neighborTracker: NewNeighborTracker(cfg.Network), + neighborMsgChan: make(chan neighborData), } + s.neighborTracker = NewNeighborTracker(s, s.neighborMsgChan) + if err := s.registerProtocol(); err != nil { return nil, err } @@ -169,6 +172,9 @@ func NewService(cfg *Config) (*Service, error) { // Start begins the GRANDPA finality service func (s *Service) Start() error { + // Start the neighbor message tracker + s.neighborTracker.Start() // TODO fix + // if we're not an authority, we don't need to worry about the voting process. // the grandpa service is only used to verify incoming block justifications if !s.authority { @@ -195,6 +201,8 @@ func (s *Service) Stop() error { s.cancel() s.blockState.FreeFinalisedNotifierChannel(s.finalisedCh) + s.neighborTracker.Stop() + if !s.authority { return nil } diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index bbad949100..6680c95622 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -86,8 +86,11 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) error { - // TODO figure out if i need to do anything more than this, i suspect no - h.grandpa.neighborTracker.UpdatePeer(from, packet.SetID, packet.Round) + logger.Debugf("handleing neighbor message from peer %v with set id %v and round %v", from, packet.SetID, packet.Round) + h.grandpa.neighborMsgChan <- neighborData{ + peer: from, + neighborMsg: packet, + } return nil } diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 43d93c3ffd..ee4f781d69 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -22,8 +22,7 @@ type tracker struct { in chan *types.Block // receive imported block from BlockState stopped chan struct{} - neighborIn chan NeighbourPacketV1 // trigger the sending of a neighbor message - stoppedNeighbor chan struct{} + neighborIn chan NeighbourPacketV1 // trigger the sending of a neighbor message catchUpResponseMessageMutex sync.Mutex // round(uint64) is used as key and *CatchUpResponse as value @@ -43,8 +42,7 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { in: bs.GetImportedBlockNotifierChannel(), stopped: make(chan struct{}), - neighborIn: make(chan NeighbourPacketV1), - stoppedNeighbor: make(chan struct{}), + neighborIn: make(chan NeighbourPacketV1), catchUpResponseMessages: make(map[uint64]*CatchUpResponse), } @@ -52,12 +50,10 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { func (t *tracker) start() { go t.handleBlocks() - go t.handleNeighborMessage() } func (t *tracker) stop() { close(t.stopped) - close(t.stoppedNeighbor) t.blockState.FreeImportedBlockNotifierChannel(t.in) } @@ -106,30 +102,6 @@ func (t *tracker) handleBlocks() { } } -func (t *tracker) handleNeighborMessage() { - // https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 - const duration = time.Minute * 2 - ticker := time.NewTicker(duration) - defer ticker.Stop() - - for { - select { - case msg := <-t.neighborIn: - logger.Warnf("Event Channel handleNeighborMessage Triggered") - err := t.handler.grandpa.handleNeighborMessage(msg.Round, msg.SetID) - if err != nil { - logger.Errorf("handling neighbor message: %v", err) - } - - ticker.Reset(duration) - case <-ticker.C: - logger.Warnf("Tick handleNeighborMessage") - case <-t.stoppedNeighbor: - return - } - } -} - func (t *tracker) handleBlock(b *types.Block) { h := b.Header.Hash() vms := t.votes.messages(h) diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go new file mode 100644 index 0000000000..eb1b1e1c17 --- /dev/null +++ b/lib/grandpa/neighbor_tracker.go @@ -0,0 +1,121 @@ +package grandpa + +import ( + "fmt" + "github.com/ChainSafe/gossamer/dot/types" + "github.com/libp2p/go-libp2p/core/peer" + "time" +) + +type neighborData struct { + peer peer.ID + neighborMsg *NeighbourPacketV1 +} + +type neighborState struct { + setID uint64 + round uint64 + //highestFinalized uint32 not sure if i need this or not +} + +type NeighborTracker struct { + grandpa *Service + + peerview map[peer.ID]neighborState + currentSetID uint64 + currentRound uint64 + highestFinalized uint32 + + finalizationCha chan *types.FinalisationInfo + neighborMsgChan chan neighborData + stoppedNeighbor chan struct{} +} + +func NewNeighborTracker(grandpa *Service, neighborChan chan neighborData) *NeighborTracker { + return &NeighborTracker{ + grandpa: grandpa, + peerview: make(map[peer.ID]neighborState), + finalizationCha: grandpa.blockState.GetFinalisedNotifierChannel(), + neighborMsgChan: neighborChan, + stoppedNeighbor: make(chan struct{}), + } +} + +func (nt *NeighborTracker) Start() { + go nt.run() +} + +func (nt *NeighborTracker) Stop() { + nt.grandpa.blockState.FreeFinalisedNotifierChannel(nt.finalizationCha) + nt.stoppedNeighbor <- struct{}{} + close(nt.neighborMsgChan) +} + +func (nt *NeighborTracker) run() { + logger.Info("starting neighbor tracker") + // https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 + const duration = time.Minute * 2 + ticker := time.NewTicker(duration) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + logger.Debugf("neighbor message broadcast triggered by ticker") + err := nt.BroadcastNeighborMsg() + if err != nil { + logger.Errorf("broadcasting neighbor message: %v", err) + } + + case block := <-nt.finalizationCha: + nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) + err := nt.BroadcastNeighborMsg() + if err != nil { + logger.Errorf("broadcasting neighbor message: %v", err) + } + ticker.Reset(duration) + case neighborData := <-nt.neighborMsgChan: + if neighborData.neighborMsg != nil { + nt.UpdatePeer(neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round) + } + case <-nt.stoppedNeighbor: + logger.Info("stopping neighbor tracker") + return + } + } +} + +func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { + nt.currentSetID = setID + nt.currentRound = round + nt.highestFinalized = highestFinalized +} + +func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64) { + peerState := neighborState{setID, round} + nt.peerview[p] = peerState +} + +func (nt *NeighborTracker) BroadcastNeighborMsg() error { + logger.Warnf("braodcasting neighbor message to relevant peers") + for id, peerState := range nt.peerview { + if !(peerState.round < nt.currentRound || peerState.setID < nt.currentSetID) { + packet := NeighbourPacketV1{ + Round: nt.currentRound, + SetID: nt.currentSetID, + Number: nt.highestFinalized, + } + + cm, err := packet.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting NeighbourPacketV1 to network message: %w", err) + } + + err = nt.grandpa.network.SendMessage(id, cm) + if err != nil { + return fmt.Errorf("sending message to peer: %v", id) + } + } + } + return nil +} From fd0ffc2e33d3928be79bbd88db05b47b3b46da84 Mon Sep 17 00:00:00 2001 From: jimboj Date: Mon, 7 Oct 2024 19:37:04 +0900 Subject: [PATCH 06/23] remove changes not needed --- lib/grandpa/grandpa.go | 27 ++------------------------- lib/grandpa/message_handler.go | 12 +----------- lib/grandpa/message_tracker.go | 9 --------- 3 files changed, 3 insertions(+), 45 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index a73b15f49e..9ebf7d9331 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -172,8 +172,7 @@ func NewService(cfg *Config) (*Service, error) { // Start begins the GRANDPA finality service func (s *Service) Start() error { - // Start the neighbor message tracker - s.neighborTracker.Start() // TODO fix + s.neighborTracker.Start() // if we're not an authority, we don't need to worry about the voting process. // the grandpa service is only used to verify incoming block justifications @@ -1151,30 +1150,8 @@ func (s *Service) handleVoteMessage(from peer.ID, vote *VoteMessage) (err error) return nil } -func (s *Service) handleNeighborMessage(round uint64, setID uint64) error { - // TODO sender side of neighbor msg - highestHeader, err := s.blockState.GetHighestFinalisedHeader() - if err != nil { - return err - } - neighbourMessage := &NeighbourPacketV1{ - Round: round, - SetID: setID, - Number: uint32(highestHeader.Number), - } - - cm, err := neighbourMessage.ToConsensusMessage() - if err != nil { - return fmt.Errorf("converting neighbour message to network message: %w", err) - } - - logger.Errorf("sending neighbour message: %v", neighbourMessage) - s.network.GossipMessage(cm) - return nil -} - func (s *Service) handleCommitMessage(commitMessage *CommitMessage) error { - logger.Warnf("received commit message: %+v", commitMessage) + logger.Debugf("received commit message: %+v", commitMessage) err := verifyBlockHashAgainstBlockNumber(s.blockState, commitMessage.Vote.Hash, uint(commitMessage.Vote.Number)) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 6680c95622..72ea9f68b0 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -22,17 +22,11 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -var ( - ErrNeighbourVersionNotSupported = errors.New("neighbour version not supported") -) - // MessageHandler handles GRANDPA consensus messages type MessageHandler struct { grandpa *Service blockState BlockState telemetry Telemetry - - isStart bool // This is a temp hacky way } // NewMessageHandler returns a new MessageHandler @@ -41,7 +35,6 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer grandpa: grandpa, blockState: blockState, telemetry: telemetryMailer, - isStart: true, } } @@ -122,14 +115,11 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMe } func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { - logger.Warnf( - "received catch up response with hash %s for round %d and set id %d", - msg.Hash, msg.Round, msg.SetID) if !h.grandpa.authority { return nil } - logger.Warnf( + logger.Debugf( "received catch up response with hash %s for round %d and set id %d", msg.Hash, msg.Round, msg.SetID) diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index ee4f781d69..74a2108a03 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -22,8 +22,6 @@ type tracker struct { in chan *types.Block // receive imported block from BlockState stopped chan struct{} - neighborIn chan NeighbourPacketV1 // trigger the sending of a neighbor message - catchUpResponseMessageMutex sync.Mutex // round(uint64) is used as key and *CatchUpResponse as value catchUpResponseMessages map[uint64]*CatchUpResponse @@ -42,8 +40,6 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { in: bs.GetImportedBlockNotifierChannel(), stopped: make(chan struct{}), - neighborIn: make(chan NeighbourPacketV1), - catchUpResponseMessages: make(map[uint64]*CatchUpResponse), } } @@ -67,11 +63,6 @@ func (t *tracker) addVote(peerID peer.ID, message *VoteMessage) { func (t *tracker) addCommit(cm *CommitMessage) { t.commits.add(cm) - t.neighborIn <- NeighbourPacketV1{ - Round: cm.Round + 1, - SetID: cm.SetID, // need to check for set changes - Number: 0, // This gets modified later - } } func (t *tracker) addCatchUpResponse(_ *CatchUpResponse) { From 97cbd31845db5e7ad2913212fe0a5343cf404c5f Mon Sep 17 00:00:00 2001 From: jimboj Date: Mon, 7 Oct 2024 19:38:31 +0900 Subject: [PATCH 07/23] remove minor diff --- lib/grandpa/message_tracker.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 74a2108a03..381760acc6 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -33,13 +33,12 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { commitsCapacity = 1000 ) return &tracker{ - blockState: bs, - handler: handler, - votes: newVotesTracker(votesCapacity), - commits: newCommitsTracker(commitsCapacity), - in: bs.GetImportedBlockNotifierChannel(), - stopped: make(chan struct{}), - + blockState: bs, + handler: handler, + votes: newVotesTracker(votesCapacity), + commits: newCommitsTracker(commitsCapacity), + in: bs.GetImportedBlockNotifierChannel(), + stopped: make(chan struct{}), catchUpResponseMessages: make(map[uint64]*CatchUpResponse), } } From 0c694bb2d4d30bdf1f390896238219a3e0fbc340 Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 8 Oct 2024 00:14:37 +0900 Subject: [PATCH 08/23] wip/feedback --- lib/grandpa/message_handler.go | 2 +- lib/grandpa/neighbor_tracker.go | 28 +++++++++++++++------------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 72ea9f68b0..ccf1fcabe5 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -79,7 +79,7 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) error { - logger.Debugf("handleing neighbor message from peer %v with set id %v and round %v", from, packet.SetID, packet.Round) + logger.Debugf("handling neighbor message from peer %v with set id %v and round %v", from.ShortString(), packet.SetID, packet.Round) h.grandpa.neighborMsgChan <- neighborData{ peer: from, neighborMsg: packet, diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index eb1b1e1c17..3a8813feab 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -13,9 +13,9 @@ type neighborData struct { } type neighborState struct { - setID uint64 - round uint64 - //highestFinalized uint32 not sure if i need this or not + setID uint64 + round uint64 + highestFinalized uint32 } type NeighborTracker struct { @@ -68,15 +68,17 @@ func (nt *NeighborTracker) run() { } case block := <-nt.finalizationCha: - nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) - err := nt.BroadcastNeighborMsg() - if err != nil { - logger.Errorf("broadcasting neighbor message: %v", err) + if block != nil { + nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) + err := nt.BroadcastNeighborMsg() + if err != nil { + logger.Errorf("broadcasting neighbor message: %v", err) + } + ticker.Reset(duration) } - ticker.Reset(duration) case neighborData := <-nt.neighborMsgChan: - if neighborData.neighborMsg != nil { - nt.UpdatePeer(neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round) + if neighborData.neighborMsg.Number > nt.peerview[neighborData.peer].highestFinalized { + nt.UpdatePeer(neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round, neighborData.neighborMsg.Number) } case <-nt.stoppedNeighbor: logger.Info("stopping neighbor tracker") @@ -91,15 +93,15 @@ func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinali nt.highestFinalized = highestFinalized } -func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64) { - peerState := neighborState{setID, round} +func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) { + peerState := neighborState{setID, round, highestFinalized} nt.peerview[p] = peerState } func (nt *NeighborTracker) BroadcastNeighborMsg() error { logger.Warnf("braodcasting neighbor message to relevant peers") for id, peerState := range nt.peerview { - if !(peerState.round < nt.currentRound || peerState.setID < nt.currentSetID) { + if peerState.setID > nt.currentSetID || peerState.round > nt.currentRound { packet := NeighbourPacketV1{ Round: nt.currentRound, SetID: nt.currentSetID, From 41a3c76b127ed35102d27995bc569dfcb6d8a1e4 Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 8 Oct 2024 15:53:24 +0900 Subject: [PATCH 09/23] respond to feedback --- lib/grandpa/grandpa.go | 1 + lib/grandpa/neighbor_tracker.go | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 9ebf7d9331..e2796390ec 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -201,6 +201,7 @@ func (s *Service) Stop() error { s.blockState.FreeFinalisedNotifierChannel(s.finalisedCh) s.neighborTracker.Stop() + close(s.neighborTracker.neighborMsgChan) if !s.authority { return nil diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 3a8813feab..02a6966df6 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -7,6 +7,9 @@ import ( "time" ) +// NeighborBroadcastPeriod See https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 +const NeighborBroadcastPeriod = time.Minute * 2 + type neighborData struct { peer peer.ID neighborMsg *NeighbourPacketV1 @@ -47,14 +50,12 @@ func (nt *NeighborTracker) Start() { func (nt *NeighborTracker) Stop() { nt.grandpa.blockState.FreeFinalisedNotifierChannel(nt.finalizationCha) - nt.stoppedNeighbor <- struct{}{} - close(nt.neighborMsgChan) + close(nt.stoppedNeighbor) } func (nt *NeighborTracker) run() { logger.Info("starting neighbor tracker") - // https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 - const duration = time.Minute * 2 + const duration = NeighborBroadcastPeriod ticker := time.NewTicker(duration) defer ticker.Stop() @@ -100,19 +101,18 @@ func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, hig func (nt *NeighborTracker) BroadcastNeighborMsg() error { logger.Warnf("braodcasting neighbor message to relevant peers") - for id, peerState := range nt.peerview { - if peerState.setID > nt.currentSetID || peerState.round > nt.currentRound { - packet := NeighbourPacketV1{ - Round: nt.currentRound, - SetID: nt.currentSetID, - Number: nt.highestFinalized, - } - - cm, err := packet.ToConsensusMessage() - if err != nil { - return fmt.Errorf("converting NeighbourPacketV1 to network message: %w", err) - } + packet := NeighbourPacketV1{ + Round: nt.currentRound, + SetID: nt.currentSetID, + Number: nt.highestFinalized, + } + cm, err := packet.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting NeighbourPacketV1 to network message: %w", err) + } + for id, peerState := range nt.peerview { + if peerState.round >= nt.currentRound && peerState.setID >= nt.currentSetID { err = nt.grandpa.network.SendMessage(id, cm) if err != nil { return fmt.Errorf("sending message to peer: %v", id) From f23987b08f7dae8a2a94f3571917ca6c993a9be6 Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 8 Oct 2024 16:26:39 +0900 Subject: [PATCH 10/23] add unit tests --- lib/grandpa/neighbor_tracker.go | 27 ++++- lib/grandpa/neighbor_tracker_test.go | 141 +++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 5 deletions(-) create mode 100644 lib/grandpa/neighbor_tracker_test.go diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 02a6966df6..4347522706 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -70,8 +70,11 @@ func (nt *NeighborTracker) run() { case block := <-nt.finalizationCha: if block != nil { - nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) - err := nt.BroadcastNeighborMsg() + err := nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) + if err != nil { + logger.Errorf("updating neighbor state: %v", err) + } + err = nt.BroadcastNeighborMsg() if err != nil { logger.Errorf("broadcasting neighbor message: %v", err) } @@ -79,7 +82,10 @@ func (nt *NeighborTracker) run() { } case neighborData := <-nt.neighborMsgChan: if neighborData.neighborMsg.Number > nt.peerview[neighborData.peer].highestFinalized { - nt.UpdatePeer(neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round, neighborData.neighborMsg.Number) + err := nt.UpdatePeer(neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round, neighborData.neighborMsg.Number) + if err != nil { + logger.Errorf("updating neighbor: %v", err) + } } case <-nt.stoppedNeighbor: logger.Info("stopping neighbor tracker") @@ -88,15 +94,26 @@ func (nt *NeighborTracker) run() { } } -func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { +func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) error { + if nt == nil { + return fmt.Errorf("neighbor tracker is nil") + } nt.currentSetID = setID nt.currentRound = round nt.highestFinalized = highestFinalized + return nil } -func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) { +func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) error { + if nt == nil { + return fmt.Errorf("neighbor tracker is nil") + } + if nt.peerview == nil { + return fmt.Errorf("neighbour tracker has nil peer tracker") + } peerState := neighborState{setID, round, highestFinalized} nt.peerview[p] = peerState + return nil } func (nt *NeighborTracker) BroadcastNeighborMsg() error { diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go new file mode 100644 index 0000000000..8f5c0fa5b1 --- /dev/null +++ b/lib/grandpa/neighbor_tracker_test.go @@ -0,0 +1,141 @@ +package grandpa + +import ( + "fmt" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNeighborTracker_UpdatePeer(t *testing.T) { + initPeerview := map[peer.ID]neighborState{} + initPeerview["testPeer"] = neighborState{ + setID: 1, + round: 2, + highestFinalized: 3, + } + type args struct { + p peer.ID + setID uint64 + round uint64 + highestFinalized uint32 + } + tests := []struct { + name string + tracker *NeighborTracker + args args + expectedState neighborState + expectedErr error + }{ + { + name: "simple update", + tracker: &NeighborTracker{ + peerview: map[peer.ID]neighborState{}, + }, + args: args{ + p: "testPeer", + setID: 1, + round: 2, + highestFinalized: 3, + }, + expectedState: neighborState{ + setID: 1, + round: 2, + highestFinalized: 3, + }, + }, + { + name: "nil peerview", + tracker: &NeighborTracker{}, + args: args{ + p: "testPeer", + setID: 1, + round: 2, + highestFinalized: 3, + }, + expectedErr: fmt.Errorf("neighbour tracker has nil peer tracker"), + }, + { + name: "updating existing peer", + tracker: &NeighborTracker{ + peerview: map[peer.ID]neighborState{}, + }, + args: args{ + p: "testPeer", + setID: 4, + round: 5, + highestFinalized: 6, + }, + expectedState: neighborState{ + setID: 4, + round: 5, + highestFinalized: 6, + }, + }, + { + name: "nil tracker", + args: args{ + p: "testPeer", + setID: 1, + round: 2, + highestFinalized: 3, + }, + expectedErr: fmt.Errorf("neighbor tracker is nil"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nt := tt.tracker + err := nt.UpdatePeer(tt.args.p, tt.args.setID, tt.args.round, tt.args.highestFinalized) + require.Equal(t, err, tt.expectedErr) + if nt != nil { + require.Equal(t, tt.expectedState, nt.peerview[tt.args.p]) + } + }) + } +} + +func TestNeighborTracker_UpdateState(t *testing.T) { + type args struct { + setID uint64 + round uint64 + highestFinalized uint32 + } + tests := []struct { + name string + tracker *NeighborTracker + args args + expectedErr error + }{ + { + name: "nil tracker", + args: args{ + setID: 1, + round: 2, + highestFinalized: 3, + }, + expectedErr: fmt.Errorf("neighbor tracker is nil"), + }, + { + name: "happy path", + tracker: &NeighborTracker{}, + args: args{ + setID: 1, + round: 2, + highestFinalized: 3, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nt := tt.tracker + err := nt.UpdateState(tt.args.setID, tt.args.round, tt.args.highestFinalized) + require.Equal(t, err, tt.expectedErr) + if nt != nil { + require.Equal(t, nt.currentSetID, tt.args.setID) + require.Equal(t, nt.currentRound, tt.args.round) + require.Equal(t, nt.highestFinalized, tt.args.highestFinalized) + } + }) + } +} From 532afc1a52665a3ddac8104a31963abfc94ce60a Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 15 Oct 2024 15:24:39 +0900 Subject: [PATCH 11/23] use const for ticker period --- lib/grandpa/neighbor_tracker.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 4347522706..d8eb7d1167 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -55,8 +55,7 @@ func (nt *NeighborTracker) Stop() { func (nt *NeighborTracker) run() { logger.Info("starting neighbor tracker") - const duration = NeighborBroadcastPeriod - ticker := time.NewTicker(duration) + ticker := time.NewTicker(NeighborBroadcastPeriod) defer ticker.Stop() for { @@ -78,7 +77,7 @@ func (nt *NeighborTracker) run() { if err != nil { logger.Errorf("broadcasting neighbor message: %v", err) } - ticker.Reset(duration) + ticker.Reset(NeighborBroadcastPeriod) } case neighborData := <-nt.neighborMsgChan: if neighborData.neighborMsg.Number > nt.peerview[neighborData.peer].highestFinalized { From 3c0d6c67e13c6b777e92660889e404796e7f84c3 Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 15 Oct 2024 19:53:49 +0900 Subject: [PATCH 12/23] feedback and lint --- lib/grandpa/message_handler.go | 5 +-- lib/grandpa/neighbor_tracker.go | 47 +++++++++++++--------------- lib/grandpa/neighbor_tracker_test.go | 44 ++++++-------------------- 3 files changed, 34 insertions(+), 62 deletions(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index ccf1fcabe5..baf7f01146 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -78,8 +78,9 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } } -func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) error { - logger.Debugf("handling neighbor message from peer %v with set id %v and round %v", from.ShortString(), packet.SetID, packet.Round) +func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) error { //nolint + logger.Debugf("handling neighbour message from peer %v with set id %v and round %v", + from.ShortString(), packet.SetID, packet.Round) h.grandpa.neighborMsgChan <- neighborData{ peer: from, neighborMsg: packet, diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index d8eb7d1167..747f1a41c9 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -2,13 +2,14 @@ package grandpa import ( "fmt" + "time" + "github.com/ChainSafe/gossamer/dot/types" "github.com/libp2p/go-libp2p/core/peer" - "time" ) -// NeighborBroadcastPeriod See https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 -const NeighborBroadcastPeriod = time.Minute * 2 +// https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 //nolint +const neighbourBroadcastPeriod = time.Minute * 2 type neighborData struct { peer peer.ID @@ -54,59 +55,54 @@ func (nt *NeighborTracker) Stop() { } func (nt *NeighborTracker) run() { - logger.Info("starting neighbor tracker") - ticker := time.NewTicker(NeighborBroadcastPeriod) + logger.Info("starting neighbour tracker") + ticker := time.NewTicker(neighbourBroadcastPeriod) defer ticker.Stop() for { select { case <-ticker.C: - logger.Debugf("neighbor message broadcast triggered by ticker") + logger.Debugf("neighbour message broadcast triggered by ticker") err := nt.BroadcastNeighborMsg() if err != nil { - logger.Errorf("broadcasting neighbor message: %v", err) + logger.Errorf("broadcasting neighbour message: %v", err) } case block := <-nt.finalizationCha: if block != nil { - err := nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) - if err != nil { - logger.Errorf("updating neighbor state: %v", err) - } - err = nt.BroadcastNeighborMsg() + nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) //nolint + err := nt.BroadcastNeighborMsg() if err != nil { - logger.Errorf("broadcasting neighbor message: %v", err) + logger.Errorf("broadcasting neighbour message: %v", err) } - ticker.Reset(NeighborBroadcastPeriod) + ticker.Reset(neighbourBroadcastPeriod) } case neighborData := <-nt.neighborMsgChan: if neighborData.neighborMsg.Number > nt.peerview[neighborData.peer].highestFinalized { - err := nt.UpdatePeer(neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round, neighborData.neighborMsg.Number) + err := nt.UpdatePeer( + neighborData.peer, + neighborData.neighborMsg.SetID, + neighborData.neighborMsg.Round, + neighborData.neighborMsg.Number, + ) if err != nil { - logger.Errorf("updating neighbor: %v", err) + logger.Errorf("updating neighbour: %v", err) } } case <-nt.stoppedNeighbor: - logger.Info("stopping neighbor tracker") + logger.Info("stopping neighbour tracker") return } } } -func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) error { - if nt == nil { - return fmt.Errorf("neighbor tracker is nil") - } +func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { nt.currentSetID = setID nt.currentRound = round nt.highestFinalized = highestFinalized - return nil } func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) error { - if nt == nil { - return fmt.Errorf("neighbor tracker is nil") - } if nt.peerview == nil { return fmt.Errorf("neighbour tracker has nil peer tracker") } @@ -116,7 +112,6 @@ func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, hig } func (nt *NeighborTracker) BroadcastNeighborMsg() error { - logger.Warnf("braodcasting neighbor message to relevant peers") packet := NeighbourPacketV1{ Round: nt.currentRound, SetID: nt.currentSetID, diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go index 8f5c0fa5b1..e56a36e56f 100644 --- a/lib/grandpa/neighbor_tracker_test.go +++ b/lib/grandpa/neighbor_tracker_test.go @@ -2,9 +2,10 @@ package grandpa import ( "fmt" + "testing" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" - "testing" ) func TestNeighborTracker_UpdatePeer(t *testing.T) { @@ -72,25 +73,13 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { highestFinalized: 6, }, }, - { - name: "nil tracker", - args: args{ - p: "testPeer", - setID: 1, - round: 2, - highestFinalized: 3, - }, - expectedErr: fmt.Errorf("neighbor tracker is nil"), - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nt := tt.tracker err := nt.UpdatePeer(tt.args.p, tt.args.setID, tt.args.round, tt.args.highestFinalized) require.Equal(t, err, tt.expectedErr) - if nt != nil { - require.Equal(t, tt.expectedState, nt.peerview[tt.args.p]) - } + require.Equal(t, tt.expectedState, nt.peerview[tt.args.p]) }) } } @@ -102,20 +91,10 @@ func TestNeighborTracker_UpdateState(t *testing.T) { highestFinalized uint32 } tests := []struct { - name string - tracker *NeighborTracker - args args - expectedErr error + name string + tracker *NeighborTracker + args args }{ - { - name: "nil tracker", - args: args{ - setID: 1, - round: 2, - highestFinalized: 3, - }, - expectedErr: fmt.Errorf("neighbor tracker is nil"), - }, { name: "happy path", tracker: &NeighborTracker{}, @@ -129,13 +108,10 @@ func TestNeighborTracker_UpdateState(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nt := tt.tracker - err := nt.UpdateState(tt.args.setID, tt.args.round, tt.args.highestFinalized) - require.Equal(t, err, tt.expectedErr) - if nt != nil { - require.Equal(t, nt.currentSetID, tt.args.setID) - require.Equal(t, nt.currentRound, tt.args.round) - require.Equal(t, nt.highestFinalized, tt.args.highestFinalized) - } + nt.UpdateState(tt.args.setID, tt.args.round, tt.args.highestFinalized) + require.Equal(t, nt.currentSetID, tt.args.setID) + require.Equal(t, nt.currentRound, tt.args.round) + require.Equal(t, nt.highestFinalized, tt.args.highestFinalized) }) } } From 76836a54d4853df4f289bb684eeecae4235ca293 Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 15 Oct 2024 20:06:59 +0900 Subject: [PATCH 13/23] remove return err from handleNeighbourMSg --- lib/grandpa/message_handler.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index baf7f01146..041539a8fd 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -59,8 +59,8 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. return nil, nil //nolint:nilnil case *NeighbourPacketV1: - // we can afford to not retry handling neighbour message, if it errors. - return nil, h.handleNeighbourMessage(msg, from) + h.handleNeighbourMessage(msg, from) + return nil, nil //nolint:nilnil case *CatchUpRequest: return h.handleCatchUpRequest(msg) case *CatchUpResponse: @@ -78,14 +78,13 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } } -func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) error { //nolint +func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1, from peer.ID) { logger.Debugf("handling neighbour message from peer %v with set id %v and round %v", from.ShortString(), packet.SetID, packet.Round) h.grandpa.neighborMsgChan <- neighborData{ peer: from, neighborMsg: packet, } - return nil } func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMessage, error) { From 8451a4cbae7fd0ae78331c6a7a6e5011bd94140f Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 15 Oct 2024 20:16:22 +0900 Subject: [PATCH 14/23] fix lint and license --- lib/grandpa/neighbor_tracker.go | 3 +++ lib/grandpa/neighbor_tracker_test.go | 11 +++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 747f1a41c9..a5648b85df 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -1,3 +1,6 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package grandpa import ( diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go index e56a36e56f..a6398b59e4 100644 --- a/lib/grandpa/neighbor_tracker_test.go +++ b/lib/grandpa/neighbor_tracker_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package grandpa import ( @@ -29,7 +32,7 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { expectedErr error }{ { - name: "simple update", + name: "simple_update", tracker: &NeighborTracker{ peerview: map[peer.ID]neighborState{}, }, @@ -46,7 +49,7 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { }, }, { - name: "nil peerview", + name: "nil_peerview", tracker: &NeighborTracker{}, args: args{ p: "testPeer", @@ -57,7 +60,7 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { expectedErr: fmt.Errorf("neighbour tracker has nil peer tracker"), }, { - name: "updating existing peer", + name: "updating_existing_peer", tracker: &NeighborTracker{ peerview: map[peer.ID]neighborState{}, }, @@ -96,7 +99,7 @@ func TestNeighborTracker_UpdateState(t *testing.T) { args args }{ { - name: "happy path", + name: "happy_path", tracker: &NeighborTracker{}, args: args{ setID: 1, From 566b228949e570846666c435614ae5104f0e7258 Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 17 Oct 2024 18:00:37 +0900 Subject: [PATCH 15/23] test for broadcast neighbour msg --- lib/grandpa/grandpa.go | 4 +- lib/grandpa/neighbor_tracker.go | 27 ++++----- lib/grandpa/neighbor_tracker_test.go | 89 ++++++++++++++++++++++------ 3 files changed, 82 insertions(+), 38 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index e2796390ec..195c939b20 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -82,7 +82,7 @@ type Service struct { telemetry Telemetry - neighborTracker *NeighborTracker + neighborTracker *neighborTracker } // Config represents a GRANDPA service configuration @@ -157,7 +157,7 @@ func NewService(cfg *Config) (*Service, error) { neighborMsgChan: make(chan neighborData), } - s.neighborTracker = NewNeighborTracker(s, s.neighborMsgChan) + s.neighborTracker = newNeighborTracker(s, s.neighborMsgChan) if err := s.registerProtocol(); err != nil { return nil, err diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index a5648b85df..e50cf28d90 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -25,7 +25,7 @@ type neighborState struct { highestFinalized uint32 } -type NeighborTracker struct { +type neighborTracker struct { grandpa *Service peerview map[peer.ID]neighborState @@ -38,8 +38,8 @@ type NeighborTracker struct { stoppedNeighbor chan struct{} } -func NewNeighborTracker(grandpa *Service, neighborChan chan neighborData) *NeighborTracker { - return &NeighborTracker{ +func newNeighborTracker(grandpa *Service, neighborChan chan neighborData) *neighborTracker { + return &neighborTracker{ grandpa: grandpa, peerview: make(map[peer.ID]neighborState), finalizationCha: grandpa.blockState.GetFinalisedNotifierChannel(), @@ -48,16 +48,16 @@ func NewNeighborTracker(grandpa *Service, neighborChan chan neighborData) *Neigh } } -func (nt *NeighborTracker) Start() { +func (nt *neighborTracker) Start() { go nt.run() } -func (nt *NeighborTracker) Stop() { +func (nt *neighborTracker) Stop() { nt.grandpa.blockState.FreeFinalisedNotifierChannel(nt.finalizationCha) close(nt.stoppedNeighbor) } -func (nt *NeighborTracker) run() { +func (nt *neighborTracker) run() { logger.Info("starting neighbour tracker") ticker := time.NewTicker(neighbourBroadcastPeriod) defer ticker.Stop() @@ -82,15 +82,12 @@ func (nt *NeighborTracker) run() { } case neighborData := <-nt.neighborMsgChan: if neighborData.neighborMsg.Number > nt.peerview[neighborData.peer].highestFinalized { - err := nt.UpdatePeer( + nt.UpdatePeer( neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round, neighborData.neighborMsg.Number, ) - if err != nil { - logger.Errorf("updating neighbour: %v", err) - } } case <-nt.stoppedNeighbor: logger.Info("stopping neighbour tracker") @@ -99,22 +96,18 @@ func (nt *NeighborTracker) run() { } } -func (nt *NeighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { +func (nt *neighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { nt.currentSetID = setID nt.currentRound = round nt.highestFinalized = highestFinalized } -func (nt *NeighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) error { - if nt.peerview == nil { - return fmt.Errorf("neighbour tracker has nil peer tracker") - } +func (nt *neighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) { peerState := neighborState{setID, round, highestFinalized} nt.peerview[p] = peerState - return nil } -func (nt *NeighborTracker) BroadcastNeighborMsg() error { +func (nt *neighborTracker) BroadcastNeighborMsg() error { packet := NeighbourPacketV1{ Round: nt.currentRound, SetID: nt.currentSetID, diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go index a6398b59e4..1beec10de2 100644 --- a/lib/grandpa/neighbor_tracker_test.go +++ b/lib/grandpa/neighbor_tracker_test.go @@ -5,6 +5,7 @@ package grandpa import ( "fmt" + "go.uber.org/mock/gomock" "testing" "github.com/libp2p/go-libp2p/core/peer" @@ -26,14 +27,13 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { } tests := []struct { name string - tracker *NeighborTracker + tracker *neighborTracker args args expectedState neighborState - expectedErr error }{ { name: "simple_update", - tracker: &NeighborTracker{ + tracker: &neighborTracker{ peerview: map[peer.ID]neighborState{}, }, args: args{ @@ -48,20 +48,9 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { highestFinalized: 3, }, }, - { - name: "nil_peerview", - tracker: &NeighborTracker{}, - args: args{ - p: "testPeer", - setID: 1, - round: 2, - highestFinalized: 3, - }, - expectedErr: fmt.Errorf("neighbour tracker has nil peer tracker"), - }, { name: "updating_existing_peer", - tracker: &NeighborTracker{ + tracker: &neighborTracker{ peerview: map[peer.ID]neighborState{}, }, args: args{ @@ -80,8 +69,7 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nt := tt.tracker - err := nt.UpdatePeer(tt.args.p, tt.args.setID, tt.args.round, tt.args.highestFinalized) - require.Equal(t, err, tt.expectedErr) + nt.UpdatePeer(tt.args.p, tt.args.setID, tt.args.round, tt.args.highestFinalized) require.Equal(t, tt.expectedState, nt.peerview[tt.args.p]) }) } @@ -95,12 +83,12 @@ func TestNeighborTracker_UpdateState(t *testing.T) { } tests := []struct { name string - tracker *NeighborTracker + tracker *neighborTracker args args }{ { name: "happy_path", - tracker: &NeighborTracker{}, + tracker: &neighborTracker{}, args: args{ setID: 1, round: 2, @@ -118,3 +106,66 @@ func TestNeighborTracker_UpdateState(t *testing.T) { }) } } + +func TestNeighborTracker_BroadcastNeighborMsg(t *testing.T) { + ctrl := gomock.NewController(t) + // Err path + mockNetworkErr := NewMockNetwork(ctrl) + packet := NeighbourPacketV1{ + Round: 5, + SetID: 5, + } + cm, err := packet.ToConsensusMessage() + mockNetworkErr.EXPECT().SendMessage(peer.ID("error"), cm).Return(fmt.Errorf("test error sending message")) + + grandpaServiceErr := &Service{ + network: mockNetworkErr, + } + peerViewErr := make(map[peer.ID]neighborState) + peerViewErr["error"] = neighborState{ + round: 5, + setID: 5, + } + + neighborTrackerErr := neighborTracker{ + grandpa: grandpaServiceErr, + peerview: peerViewErr, + currentRound: 5, + currentSetID: 5, + } + err = neighborTrackerErr.BroadcastNeighborMsg() + require.Error(t, err) + + // Happy path + mockNetworkOk := NewMockNetwork(ctrl) + mockNetworkOk.EXPECT().SendMessage(peer.ID("equal"), cm).Return(nil) + mockNetworkOk.EXPECT().SendMessage(peer.ID("ahead"), cm).Return(nil) + + grandpaService := &Service{ + network: mockNetworkOk, + } + + peerViewOk := make(map[peer.ID]neighborState) + peerViewOk["lowSet"] = neighborState{ + setID: 1, + } + peerViewOk["lowRound"] = neighborState{ + round: 1, + } + peerViewOk["equal"] = neighborState{ + round: 5, + setID: 5, + } + peerViewOk["ahead"] = neighborState{ + round: 7, + setID: 5, + } + neighborTrackerOk := neighborTracker{ + grandpa: grandpaService, + peerview: peerViewOk, + currentRound: 5, + currentSetID: 5, + } + err = neighborTrackerOk.BroadcastNeighborMsg() + require.NoError(t, err) +} From 39d2b22746c7672185f717ffbf1008e535800225 Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 17 Oct 2024 18:25:39 +0900 Subject: [PATCH 16/23] test to update peer while running --- lib/grandpa/neighbor_tracker.go | 1 + lib/grandpa/neighbor_tracker_test.go | 92 +++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 10 deletions(-) diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index e50cf28d90..9a55353531 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -91,6 +91,7 @@ func (nt *neighborTracker) run() { } case <-nt.stoppedNeighbor: logger.Info("stopping neighbour tracker") + nt.Stop() return } } diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go index 1beec10de2..ab10e16465 100644 --- a/lib/grandpa/neighbor_tracker_test.go +++ b/lib/grandpa/neighbor_tracker_test.go @@ -5,14 +5,15 @@ package grandpa import ( "fmt" - "go.uber.org/mock/gomock" - "testing" - + "github.com/ChainSafe/gossamer/dot/types" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "testing" + "time" ) -func TestNeighborTracker_UpdatePeer(t *testing.T) { +func TestNeighbourTracker_UpdatePeer(t *testing.T) { initPeerview := map[peer.ID]neighborState{} initPeerview["testPeer"] = neighborState{ setID: 1, @@ -75,7 +76,7 @@ func TestNeighborTracker_UpdatePeer(t *testing.T) { } } -func TestNeighborTracker_UpdateState(t *testing.T) { +func TestNeighbourTracker_UpdateState(t *testing.T) { type args struct { setID uint64 round uint64 @@ -107,7 +108,7 @@ func TestNeighborTracker_UpdateState(t *testing.T) { } } -func TestNeighborTracker_BroadcastNeighborMsg(t *testing.T) { +func TestNeighbourTracker_BroadcastNeighborMsg(t *testing.T) { ctrl := gomock.NewController(t) // Err path mockNetworkErr := NewMockNetwork(ctrl) @@ -127,13 +128,13 @@ func TestNeighborTracker_BroadcastNeighborMsg(t *testing.T) { setID: 5, } - neighborTrackerErr := neighborTracker{ + neighbourTrackerErr := neighborTracker{ grandpa: grandpaServiceErr, peerview: peerViewErr, currentRound: 5, currentSetID: 5, } - err = neighborTrackerErr.BroadcastNeighborMsg() + err = neighbourTrackerErr.BroadcastNeighborMsg() require.Error(t, err) // Happy path @@ -160,12 +161,83 @@ func TestNeighborTracker_BroadcastNeighborMsg(t *testing.T) { round: 7, setID: 5, } - neighborTrackerOk := neighborTracker{ + neighbourTrackerOk := neighborTracker{ grandpa: grandpaService, peerview: peerViewOk, currentRound: 5, currentSetID: 5, } - err = neighborTrackerOk.BroadcastNeighborMsg() + err = neighbourTrackerOk.BroadcastNeighborMsg() require.NoError(t, err) } + +func TestNeighbourTracker_StartStop_viaFunctionCall(t *testing.T) { + ctrl := gomock.NewController(t) + finalizationChan := make(chan *types.FinalisationInfo) + blockStateMock := NewMockBlockState(ctrl) + blockStateMock.EXPECT(). + GetFinalisedNotifierChannel(). + Return(finalizationChan) + blockStateMock.EXPECT(). + FreeFinalisedNotifierChannel(finalizationChan) + + grandpaService := &Service{ + blockState: blockStateMock, + } + nt := newNeighborTracker(grandpaService, make(chan neighborData)) + nt.Start() + nt.Stop() +} + +func TestNeighbourTracker_StartStop_viaChannel(t *testing.T) { + ctrl := gomock.NewController(t) + finalizationChan := make(chan *types.FinalisationInfo) + blockStateMock := NewMockBlockState(ctrl) + blockStateMock.EXPECT(). + GetFinalisedNotifierChannel(). + Return(finalizationChan) + blockStateMock.EXPECT(). + FreeFinalisedNotifierChannel(finalizationChan) + + grandpaService := &Service{ + blockState: blockStateMock, + } + nt := newNeighborTracker(grandpaService, make(chan neighborData)) + nt.Start() + nt.stoppedNeighbor <- struct{}{} +} + +func TestNeighbourTracker_UpdatePeer_viaChannel(t *testing.T) { + ctrl := gomock.NewController(t) + finalizationChan := make(chan *types.FinalisationInfo) + blockStateMock := NewMockBlockState(ctrl) + blockStateMock.EXPECT(). + GetFinalisedNotifierChannel(). + Return(finalizationChan) + blockStateMock.EXPECT(). + FreeFinalisedNotifierChannel(finalizationChan) + + grandpaService := &Service{ + blockState: blockStateMock, + } + neighbourChan := make(chan neighborData) + nt := newNeighborTracker(grandpaService, neighbourChan) + nt.Start() + + neighbourChan <- neighborData{ + peer: "testPeer", + neighborMsg: &NeighbourPacketV1{ + Round: 5, + SetID: 6, + Number: 7, + }, + } + + time.Sleep(100 * time.Millisecond) + + require.Equal(t, uint64(5), nt.peerview["testPeer"].round) + require.Equal(t, uint64(6), nt.peerview["testPeer"].setID) + require.Equal(t, uint32(7), nt.peerview["testPeer"].highestFinalized) + + nt.Stop() +} From 3d1d0b7cb3b28c1c9499b3a8d99065a4b3afbcca Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 17 Oct 2024 18:43:58 +0900 Subject: [PATCH 17/23] fix TestHandleNetworkMessage --- lib/grandpa/grandpa.go | 6 ++++-- lib/grandpa/neighbor_tracker.go | 1 - lib/grandpa/neighbor_tracker_test.go | 4 +--- lib/grandpa/network_integration_test.go | 4 ++++ 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 195c939b20..bd3f135a25 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -133,6 +133,8 @@ func NewService(cfg *Config) (*Service, error) { cfg.Interval = defaultGrandpaInterval } + neighborMsgChan := make(chan neighborData) + ctx, cancel := context.WithCancel(context.Background()) s := &Service{ ctx: ctx, @@ -154,10 +156,10 @@ func NewService(cfg *Config) (*Service, error) { finalisedCh: finalisedCh, interval: cfg.Interval, telemetry: cfg.Telemetry, - neighborMsgChan: make(chan neighborData), + neighborMsgChan: neighborMsgChan, } - s.neighborTracker = newNeighborTracker(s, s.neighborMsgChan) + s.neighborTracker = newNeighborTracker(s, neighborMsgChan) if err := s.registerProtocol(); err != nil { return nil, err diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 9a55353531..e50cf28d90 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -91,7 +91,6 @@ func (nt *neighborTracker) run() { } case <-nt.stoppedNeighbor: logger.Info("stopping neighbour tracker") - nt.Stop() return } } diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go index ab10e16465..ac376ae312 100644 --- a/lib/grandpa/neighbor_tracker_test.go +++ b/lib/grandpa/neighbor_tracker_test.go @@ -196,8 +196,6 @@ func TestNeighbourTracker_StartStop_viaChannel(t *testing.T) { blockStateMock.EXPECT(). GetFinalisedNotifierChannel(). Return(finalizationChan) - blockStateMock.EXPECT(). - FreeFinalisedNotifierChannel(finalizationChan) grandpaService := &Service{ blockState: blockStateMock, @@ -232,7 +230,7 @@ func TestNeighbourTracker_UpdatePeer_viaChannel(t *testing.T) { Number: 7, }, } - + time.Sleep(100 * time.Millisecond) require.Equal(t, uint64(5), nt.peerview["testPeer"].round) diff --git a/lib/grandpa/network_integration_test.go b/lib/grandpa/network_integration_test.go index 6b03ed8581..4017f7bb48 100644 --- a/lib/grandpa/network_integration_test.go +++ b/lib/grandpa/network_integration_test.go @@ -42,6 +42,9 @@ func TestHandleNetworkMessage(t *testing.T) { gs, st := newTestService(t, aliceKeyPair) + err = gs.Start() + require.NoError(t, err) + just := []SignedVote{ { Vote: *testVote, @@ -76,4 +79,5 @@ func TestHandleNetworkMessage(t *testing.T) { propagate, err = gs.handleNetworkMessage(peer.ID(""), cm) require.NoError(t, err) require.False(t, propagate) + err = gs.Stop() } From 998d8cc5e19e438be4ac6418c633bc8bf125571d Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 17 Oct 2024 19:02:17 +0900 Subject: [PATCH 18/23] fix grandpa integration test --- lib/grandpa/message_handler_integration_test.go | 4 ++++ lib/grandpa/network_integration_test.go | 9 ++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/grandpa/message_handler_integration_test.go b/lib/grandpa/message_handler_integration_test.go index 907f080de1..f1713dd504 100644 --- a/lib/grandpa/message_handler_integration_test.go +++ b/lib/grandpa/message_handler_integration_test.go @@ -212,6 +212,8 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) { gs, st := newTestService(t, aliceKeyPair) + gs.neighborTracker.Start() + ctrl := gomock.NewController(t) telemetryMock := NewMockTelemetry(ctrl) @@ -250,6 +252,8 @@ func TestMessageHandler_NeighbourMessage(t *testing.T) { out, err := h.handleMessage("", NeighbourPacketV1) require.NoError(t, err) require.Nil(t, out) + + gs.neighborTracker.Stop() } func TestMessageHandler_VerifyJustification_InvalidSig(t *testing.T) { diff --git a/lib/grandpa/network_integration_test.go b/lib/grandpa/network_integration_test.go index 4017f7bb48..e04220e7e3 100644 --- a/lib/grandpa/network_integration_test.go +++ b/lib/grandpa/network_integration_test.go @@ -42,8 +42,10 @@ func TestHandleNetworkMessage(t *testing.T) { gs, st := newTestService(t, aliceKeyPair) - err = gs.Start() - require.NoError(t, err) + //err = gs.Start() + //require.NoError(t, err) + + gs.neighborTracker.Start() just := []SignedVote{ { @@ -79,5 +81,6 @@ func TestHandleNetworkMessage(t *testing.T) { propagate, err = gs.handleNetworkMessage(peer.ID(""), cm) require.NoError(t, err) require.False(t, propagate) - err = gs.Stop() + + gs.neighborTracker.Stop() } From 9f8e0dd85920a3da35fef735eaed537ecba5c1d1 Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 17 Oct 2024 19:06:45 +0900 Subject: [PATCH 19/23] fix linting --- lib/grandpa/neighbor_tracker_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go index ac376ae312..874eef8552 100644 --- a/lib/grandpa/neighbor_tracker_test.go +++ b/lib/grandpa/neighbor_tracker_test.go @@ -5,12 +5,13 @@ package grandpa import ( "fmt" + "testing" + "time" + "github.com/ChainSafe/gossamer/dot/types" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "testing" - "time" ) func TestNeighbourTracker_UpdatePeer(t *testing.T) { @@ -117,6 +118,7 @@ func TestNeighbourTracker_BroadcastNeighborMsg(t *testing.T) { SetID: 5, } cm, err := packet.ToConsensusMessage() + require.NoError(t, err) mockNetworkErr.EXPECT().SendMessage(peer.ID("error"), cm).Return(fmt.Errorf("test error sending message")) grandpaServiceErr := &Service{ From a137cbadf6a18715d81f6aeecdf290471a87e808 Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 17 Oct 2024 19:26:14 +0900 Subject: [PATCH 20/23] fix race conditions --- lib/grandpa/neighbor_tracker.go | 21 +++++++++++++++++---- lib/grandpa/neighbor_tracker_test.go | 11 ++++++----- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index e50cf28d90..8f6e4b2df0 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -5,6 +5,7 @@ package grandpa import ( "fmt" + "sync" "time" "github.com/ChainSafe/gossamer/dot/types" @@ -26,6 +27,7 @@ type neighborState struct { } type neighborTracker struct { + sync.Mutex grandpa *Service peerview map[peer.ID]neighborState @@ -73,7 +75,7 @@ func (nt *neighborTracker) run() { case block := <-nt.finalizationCha: if block != nil { - nt.UpdateState(block.SetID, block.Round, uint32(block.Header.Number)) //nolint + nt.updateState(block.SetID, block.Round, uint32(block.Header.Number)) //nolint err := nt.BroadcastNeighborMsg() if err != nil { logger.Errorf("broadcasting neighbour message: %v", err) @@ -82,7 +84,7 @@ func (nt *neighborTracker) run() { } case neighborData := <-nt.neighborMsgChan: if neighborData.neighborMsg.Number > nt.peerview[neighborData.peer].highestFinalized { - nt.UpdatePeer( + nt.updatePeer( neighborData.peer, neighborData.neighborMsg.SetID, neighborData.neighborMsg.Round, @@ -96,17 +98,28 @@ func (nt *neighborTracker) run() { } } -func (nt *neighborTracker) UpdateState(setID uint64, round uint64, highestFinalized uint32) { +func (nt *neighborTracker) updateState(setID uint64, round uint64, highestFinalized uint32) { + nt.Lock() + defer nt.Unlock() + nt.currentSetID = setID nt.currentRound = round nt.highestFinalized = highestFinalized } -func (nt *neighborTracker) UpdatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) { +func (nt *neighborTracker) updatePeer(p peer.ID, setID uint64, round uint64, highestFinalized uint32) { + nt.Lock() + defer nt.Unlock() peerState := neighborState{setID, round, highestFinalized} nt.peerview[p] = peerState } +func (nt *neighborTracker) getPeer(p peer.ID) neighborState { + nt.Lock() + defer nt.Unlock() + return nt.peerview[p] +} + func (nt *neighborTracker) BroadcastNeighborMsg() error { packet := NeighbourPacketV1{ Round: nt.currentRound, diff --git a/lib/grandpa/neighbor_tracker_test.go b/lib/grandpa/neighbor_tracker_test.go index 874eef8552..2a7ff7b7ea 100644 --- a/lib/grandpa/neighbor_tracker_test.go +++ b/lib/grandpa/neighbor_tracker_test.go @@ -71,7 +71,7 @@ func TestNeighbourTracker_UpdatePeer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nt := tt.tracker - nt.UpdatePeer(tt.args.p, tt.args.setID, tt.args.round, tt.args.highestFinalized) + nt.updatePeer(tt.args.p, tt.args.setID, tt.args.round, tt.args.highestFinalized) require.Equal(t, tt.expectedState, nt.peerview[tt.args.p]) }) } @@ -101,7 +101,7 @@ func TestNeighbourTracker_UpdateState(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nt := tt.tracker - nt.UpdateState(tt.args.setID, tt.args.round, tt.args.highestFinalized) + nt.updateState(tt.args.setID, tt.args.round, tt.args.highestFinalized) require.Equal(t, nt.currentSetID, tt.args.setID) require.Equal(t, nt.currentRound, tt.args.round) require.Equal(t, nt.highestFinalized, tt.args.highestFinalized) @@ -235,9 +235,10 @@ func TestNeighbourTracker_UpdatePeer_viaChannel(t *testing.T) { time.Sleep(100 * time.Millisecond) - require.Equal(t, uint64(5), nt.peerview["testPeer"].round) - require.Equal(t, uint64(6), nt.peerview["testPeer"].setID) - require.Equal(t, uint32(7), nt.peerview["testPeer"].highestFinalized) + testPeer := nt.getPeer("testPeer") + require.Equal(t, uint64(5), testPeer.round) + require.Equal(t, uint64(6), testPeer.setID) + require.Equal(t, uint32(7), testPeer.highestFinalized) nt.Stop() } From 468931696f24ec6239dcc87788c1cb7678ab6260 Mon Sep 17 00:00:00 2001 From: jimboj Date: Wed, 30 Oct 2024 15:55:14 +0700 Subject: [PATCH 21/23] respond to most of feedback --- lib/grandpa/grandpa.go | 1 - lib/grandpa/neighbor_tracker.go | 2 +- lib/grandpa/network_integration_test.go | 5 +---- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index bd3f135a25..bd3048b99f 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -168,7 +168,6 @@ func NewService(cfg *Config) (*Service, error) { s.messageHandler = NewMessageHandler(s, s.blockState, cfg.Telemetry) s.tracker = newTracker(s.blockState, s.messageHandler) s.paused.Store(false) - return s, nil } diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 8f6e4b2df0..4a3b61097b 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -12,7 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -// https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 //nolint +// How often neighbor messages should be rebroadcast in the case where no new packets are created const neighbourBroadcastPeriod = time.Minute * 2 type neighborData struct { diff --git a/lib/grandpa/network_integration_test.go b/lib/grandpa/network_integration_test.go index e04220e7e3..7e1069e30c 100644 --- a/lib/grandpa/network_integration_test.go +++ b/lib/grandpa/network_integration_test.go @@ -41,10 +41,7 @@ func TestHandleNetworkMessage(t *testing.T) { aliceKeyPair := kr.Alice().(*ed25519.Keypair) gs, st := newTestService(t, aliceKeyPair) - - //err = gs.Start() - //require.NoError(t, err) - + gs.neighborTracker.Start() just := []SignedVote{ From 7d67f800414ed84f88e2ba6bf83ab925ee43479b Mon Sep 17 00:00:00 2001 From: jimboj Date: Wed, 30 Oct 2024 16:01:19 +0700 Subject: [PATCH 22/23] add wait group to neighbor tracker --- lib/grandpa/neighbor_tracker.go | 11 ++++++++++- lib/grandpa/network_integration_test.go | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 4a3b61097b..936cc726b5 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -38,6 +38,7 @@ type neighborTracker struct { finalizationCha chan *types.FinalisationInfo neighborMsgChan chan neighborData stoppedNeighbor chan struct{} + wg sync.WaitGroup } func newNeighborTracker(grandpa *Service, neighborChan chan neighborData) *neighborTracker { @@ -51,12 +52,20 @@ func newNeighborTracker(grandpa *Service, neighborChan chan neighborData) *neigh } func (nt *neighborTracker) Start() { - go nt.run() + //go nt.run() + + nt.wg.Add(1) + go func() { + nt.run() + nt.wg.Done() + }() } func (nt *neighborTracker) Stop() { nt.grandpa.blockState.FreeFinalisedNotifierChannel(nt.finalizationCha) close(nt.stoppedNeighbor) + + nt.wg.Wait() } func (nt *neighborTracker) run() { diff --git a/lib/grandpa/network_integration_test.go b/lib/grandpa/network_integration_test.go index 7e1069e30c..f9e56ed9cd 100644 --- a/lib/grandpa/network_integration_test.go +++ b/lib/grandpa/network_integration_test.go @@ -41,7 +41,7 @@ func TestHandleNetworkMessage(t *testing.T) { aliceKeyPair := kr.Alice().(*ed25519.Keypair) gs, st := newTestService(t, aliceKeyPair) - + gs.neighborTracker.Start() just := []SignedVote{ From 3dbb1db0e8b1ea7b663e82ab0bb430c1468e141d Mon Sep 17 00:00:00 2001 From: jimboj Date: Thu, 31 Oct 2024 15:23:50 +0700 Subject: [PATCH 23/23] move wg.Done() into a defer --- lib/grandpa/neighbor_tracker.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/grandpa/neighbor_tracker.go b/lib/grandpa/neighbor_tracker.go index 936cc726b5..f01da904dd 100644 --- a/lib/grandpa/neighbor_tracker.go +++ b/lib/grandpa/neighbor_tracker.go @@ -12,7 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -// How often neighbor messages should be rebroadcast in the case where no new packets are created +// How often neighbour messages should be rebroadcast in the case where no new packets are created const neighbourBroadcastPeriod = time.Minute * 2 type neighborData struct { @@ -52,13 +52,8 @@ func newNeighborTracker(grandpa *Service, neighborChan chan neighborData) *neigh } func (nt *neighborTracker) Start() { - //go nt.run() - nt.wg.Add(1) - go func() { - nt.run() - nt.wg.Done() - }() + go nt.run() } func (nt *neighborTracker) Stop() { @@ -71,7 +66,10 @@ func (nt *neighborTracker) Stop() { func (nt *neighborTracker) run() { logger.Info("starting neighbour tracker") ticker := time.NewTicker(neighbourBroadcastPeriod) - defer ticker.Stop() + defer func() { + ticker.Stop() + nt.wg.Done() + }() for { select {