From 932f77857e3f23e28a87eaa359a9e6d6aa13d48d Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Mon, 11 Apr 2022 14:29:30 +0000 Subject: [PATCH 01/12] Disable `addCatchUpResponse` - Not implemented yet (see #1531) - Can be a possible memory leak --- lib/grandpa/message_tracker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 425d063051..af5d647548 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -76,10 +76,11 @@ func (t *tracker) addCommit(cm *CommitMessage) { t.commitMessages[cm.Vote.Hash] = cm } -func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { +func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { //nolint:unparam t.catchUpResponseMessageMutex.Lock() defer t.catchUpResponseMessageMutex.Unlock() - t.catchUpResponseMessages[cr.Round] = cr + // uncomment when usage is setup properly, see #1531 + // t.catchUpResponseMessages[cr.Round] = cr } func (t *tracker) handleBlocks() { From 3eafe1f867cd5bda9ffa9e8751d2a18a39eeb545 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 13 Apr 2022 11:31:26 +0000 Subject: [PATCH 02/12] Discard vote messages for rounds too far from state round --- lib/grandpa/vote_message.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index f2849ab09c..a2908e80f9 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -162,11 +162,18 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro logger.Warnf("failed to send CommitMessage: %s", err) } } else { - // round is higher than ours, perhaps we are behind. store vote in tracker for now - s.tracker.addVote(&networkVoteMessage{ - from: from, - msg: m, - }) + // Message round is higher than the round of our state, + // we may be lagging behind, so store the message in the tracker + // for processing later. + const maxFutureRoundsDiff = 5 + if m.Round < s.state.round+maxFutureRoundsDiff { + // We ensure the message round is not too far away + // from our state round, to avoid abuses of the tracker storage. + s.tracker.addVote(&networkVoteMessage{ + from: from, + msg: m, + }) + } } // TODO: get justification if your round is lower, or just do catch-up? (#1815) From 6d83ee6b2bc6e3d6135b9d4ba0c55a0626709e3a Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 15 Apr 2022 11:48:10 +0000 Subject: [PATCH 03/12] Vote messages tracker - Removes oldest vote messages when capacity is reached - Efficient removal of messages if they get processed - Efficient cleanup of oldest message - Uses a bit more space to store each block hash + auth ID --- lib/grandpa/message_tracker.go | 60 +++-- lib/grandpa/message_tracker_test.go | 71 +++--- lib/grandpa/tracker_votes.go | 179 +++++++++++++ lib/grandpa/tracker_votes_test.go | 372 ++++++++++++++++++++++++++++ lib/grandpa/vote_message.go | 10 +- 5 files changed, 618 insertions(+), 74 deletions(-) create mode 100644 lib/grandpa/tracker_votes.go create mode 100644 lib/grandpa/tracker_votes_test.go diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index af5d647548..395a1ef0df 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -9,7 +9,7 @@ import ( "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/crypto/ed25519" + "github.com/libp2p/go-libp2p-core/peer" ) // tracker keeps track of messages that have been received, but have failed to @@ -18,8 +18,8 @@ import ( type tracker struct { blockState BlockState handler *MessageHandler - // map of vote block hash -> array of VoteMessages for that hash - voteMessages map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage + votes votesTracker + // map of commit block hash to commit message commitMessages map[common.Hash]*CommitMessage mapLock sync.Mutex @@ -32,10 +32,11 @@ type tracker struct { } func newTracker(bs BlockState, handler *MessageHandler) *tracker { + const votesCapacity = 1000 return &tracker{ blockState: bs, handler: handler, - voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage), + votes: newVotesTracker(votesCapacity), commitMessages: make(map[common.Hash]*CommitMessage), mapLock: sync.Mutex{}, in: bs.GetImportedBlockNotifierChannel(), @@ -53,21 +54,15 @@ func (t *tracker) stop() { t.blockState.FreeImportedBlockNotifierChannel(t.in) } -func (t *tracker) addVote(v *networkVoteMessage) { - if v.msg == nil { +func (t *tracker) addVote(peerID peer.ID, message *VoteMessage) { + if message == nil { return } t.mapLock.Lock() defer t.mapLock.Unlock() - msgs, has := t.voteMessages[v.msg.Message.BlockHash] - if !has { - msgs = make(map[ed25519.PublicKeyBytes]*networkVoteMessage) - t.voteMessages[v.msg.Message.BlockHash] = msgs - } - - msgs[v.msg.Message.AuthorityID] = v + t.votes.add(peerID, message) } func (t *tracker) addCommit(cm *CommitMessage) { @@ -76,7 +71,7 @@ func (t *tracker) addCommit(cm *CommitMessage) { t.commitMessages[cm.Vote.Hash] = cm } -func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) { //nolint:unparam +func (t *tracker) addCatchUpResponse(_ *CatchUpResponse) { t.catchUpResponseMessageMutex.Lock() defer t.catchUpResponseMessageMutex.Unlock() // uncomment when usage is setup properly, see #1531 @@ -109,18 +104,17 @@ func (t *tracker) handleBlock(b *types.Block) { defer t.mapLock.Unlock() h := b.Header.Hash() - if vms, has := t.voteMessages[h]; has { - for _, v := range vms { - // handleMessage would never error for vote message - _, err := t.handler.handleMessage(v.from, v.msg) - if err != nil { - logger.Warnf("failed to handle vote message %v: %s", v, err) - } + vms := t.votes.getMessagesForBlockHash(h) + for _, v := range vms { + // handleMessage would never error for vote message + _, err := t.handler.handleMessage(v.from, v.msg) + if err != nil { + logger.Warnf("failed to handle vote message %v: %s", v, err) } - - delete(t.voteMessages, h) } + t.votes.delete(h) + if cm, has := t.commitMessages[h]; has { _, err := t.handler.handleMessage("", cm) if err != nil { @@ -135,18 +129,20 @@ func (t *tracker) handleTick() { t.mapLock.Lock() defer t.mapLock.Unlock() - for _, vms := range t.voteMessages { - for _, v := range vms { + var blockHashesDone []common.Hash + t.votes.forEach(func(peerID peer.ID, message *VoteMessage) { + _, err := t.handler.handleMessage(peerID, message) + if err != nil { // handleMessage would never error for vote message - _, err := t.handler.handleMessage(v.from, v.msg) - if err != nil { - logger.Debugf("failed to handle vote message %v: %s", v, err) - } + logger.Debugf("failed to handle vote message %v from peer id %s: %s", message, peerID, err) + } - if v.msg.Round < t.handler.grandpa.state.round && v.msg.SetID == t.handler.grandpa.state.setID { - delete(t.voteMessages, v.msg.Message.BlockHash) - } + if message.Round < t.handler.grandpa.state.round && message.SetID == t.handler.grandpa.state.setID { + blockHashesDone = append(blockHashesDone, message.Message.BlockHash) } + }) + for _, blockHashDone := range blockHashesDone { + t.votes.delete(blockHashDone) } for _, cm := range t.commitMessages { diff --git a/lib/grandpa/message_tracker_test.go b/lib/grandpa/message_tracker_test.go index 9388baa54f..3500638c33 100644 --- a/lib/grandpa/message_tracker_test.go +++ b/lib/grandpa/message_tracker_test.go @@ -16,6 +16,24 @@ import ( "github.com/stretchr/testify/require" ) +// getMessageFromVotesTracker returns the vote message +// from the votes tracker for the given block hash and authority ID. +func getMessageFromVotesTracker(votes votesTracker, + blockHash common.Hash, authorityID ed25519.PublicKeyBytes) ( + message *VoteMessage) { + authIDToData, has := votes.mapping[blockHash] + if !has { + return nil + } + + data, ok := authIDToData[authorityID] + if !ok { + return nil + } + + return data.message +} + func TestMessageTracker_ValidateMessage(t *testing.T) { kr, err := keystore.NewEd25519Keyring() require.NoError(t, err) @@ -33,13 +51,11 @@ func TestMessageTracker_ValidateMessage(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - expected := &networkVoteMessage{ - msg: msg, - } - _, err = gs.validateVoteMessage("", msg) require.Equal(t, err, ErrBlockDoesNotExist) - require.Equal(t, expected, gs.tracker.voteMessages[fake.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()]) + authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes() + voteMessage := getMessageFromVotesTracker(gs.tracker.votes, fake.Hash(), authorityID) + require.Equal(t, msg, voteMessage) } func TestMessageTracker_SendMessage(t *testing.T) { @@ -72,13 +88,11 @@ func TestMessageTracker_SendMessage(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - expected := &networkVoteMessage{ - msg: msg, - } - _, err = gs.validateVoteMessage("", msg) require.Equal(t, err, ErrBlockDoesNotExist) - require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()]) + authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes() + voteMessage := getMessageFromVotesTracker(gs.tracker.votes, next.Hash(), authorityID) + require.Equal(t, msg, voteMessage) err = gs.blockState.(*state.BlockState).AddBlock(&types.Block{ Header: *next, @@ -126,13 +140,11 @@ func TestMessageTracker_ProcessMessage(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - expected := &networkVoteMessage{ - msg: msg, - } - _, err = gs.validateVoteMessage("", msg) require.Equal(t, ErrBlockDoesNotExist, err) - require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()]) + authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes() + voteMessage := getMessageFromVotesTracker(gs.tracker.votes, next.Hash(), authorityID) + require.Equal(t, msg, voteMessage) err = gs.blockState.(*state.BlockState).AddBlock(&types.Block{ Header: *next, @@ -147,7 +159,7 @@ func TestMessageTracker_ProcessMessage(t *testing.T) { } pv, has := gs.prevotes.Load(kr.Alice().Public().(*ed25519.PublicKey).AsBytes()) require.True(t, has) - require.Equal(t, expectedVote, &pv.(*SignedVote).Vote, gs.tracker.voteMessages) + require.Equal(t, expectedVote, &pv.(*SignedVote).Vote, gs.tracker.votes) } func TestMessageTracker_MapInsideMap(t *testing.T) { @@ -163,8 +175,8 @@ func TestMessageTracker_MapInsideMap(t *testing.T) { } hash := header.Hash() - _, ok := gs.tracker.voteMessages[hash] - require.False(t, ok) + messages := gs.tracker.votes.getMessagesForBlockHash(hash) + require.Empty(t, messages) gs.keypair = kr.Alice().(*ed25519.Keypair) authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes() @@ -172,15 +184,10 @@ func TestMessageTracker_MapInsideMap(t *testing.T) { require.NoError(t, err) gs.keypair = kr.Bob().(*ed25519.Keypair) - gs.tracker.addVote(&networkVoteMessage{ - msg: msg, - }) - - voteMsgs, ok := gs.tracker.voteMessages[hash] - require.True(t, ok) + gs.tracker.addVote("", msg) - _, ok = voteMsgs[authorityID] - require.True(t, ok) + voteMessage := getMessageFromVotesTracker(gs.tracker.votes, hash, authorityID) + require.NotEmpty(t, voteMessage) } func TestMessageTracker_handleTick(t *testing.T) { @@ -197,9 +204,7 @@ func TestMessageTracker_handleTick(t *testing.T) { BlockHash: testHash, }, } - gs.tracker.addVote(&networkVoteMessage{ - msg: msg, - }) + gs.tracker.addVote("", msg) gs.tracker.handleTick() @@ -212,7 +217,7 @@ func TestMessageTracker_handleTick(t *testing.T) { } // shouldn't be deleted as round in message >= grandpa round - require.Equal(t, 1, len(gs.tracker.voteMessages[testHash])) + require.Len(t, gs.tracker.votes.getMessagesForBlockHash(testHash), 1) gs.state.round = 1 msg = &VoteMessage{ @@ -221,9 +226,7 @@ func TestMessageTracker_handleTick(t *testing.T) { BlockHash: testHash, }, } - gs.tracker.addVote(&networkVoteMessage{ - msg: msg, - }) + gs.tracker.addVote("", msg) gs.tracker.handleTick() @@ -235,5 +238,5 @@ func TestMessageTracker_handleTick(t *testing.T) { } // should be deleted as round in message < grandpa round - require.Empty(t, len(gs.tracker.voteMessages[testHash])) + require.Empty(t, gs.tracker.votes.getMessagesForBlockHash(testHash)) } diff --git a/lib/grandpa/tracker_votes.go b/lib/grandpa/tracker_votes.go new file mode 100644 index 0000000000..843b4337a7 --- /dev/null +++ b/lib/grandpa/tracker_votes.go @@ -0,0 +1,179 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package grandpa + +import ( + "container/list" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/crypto/ed25519" + "github.com/libp2p/go-libp2p-core/peer" +) + +// votesTracker tracks vote messages that could +// not be processed, and removes the oldest ones once +// its maximum capacity is reached. +// It is NOT THREAD SAFE to use. +type votesTracker struct { + // map of vote block hash to authority ID (ed25519 public Key) + // to data (peer id + message + tracking linked list element pointer) + mapping map[common.Hash]authorityIDToData + // double linked list of block hash + authority ID + // to track the order vote messages were added in. + linkedList *list.List + capacity int +} + +type authorityIDToData map[ed25519.PublicKeyBytes]voteMessageMapData + +type voteMessageMapData struct { + peerID peer.ID + message *VoteMessage + // element contains a blockHashAuthID value which + // itself contains a block hash and an authority ID. + element *list.Element +} + +type blockHashAuthID struct { + blockHash common.Hash + authorityID ed25519.PublicKeyBytes +} + +// newVotesTracker creates a new vote message tracker +// with the capacity specified. +func newVotesTracker(capacity int) votesTracker { + return votesTracker{ + mapping: make(map[common.Hash]authorityIDToData, capacity), + linkedList: list.New(), + capacity: capacity, + } +} + +func newBlockHashAuthID(blockHash common.Hash, + authorityID ed25519.PublicKeyBytes) blockHashAuthID { + return blockHashAuthID{ + blockHash: blockHash, + authorityID: authorityID, + } +} + +// add adds a vote message to the vote message tracker. +// If the vote message tracker capacity is reached, +// the oldest vote message is removed. +func (vt *votesTracker) add(peerID peer.ID, voteMessage *VoteMessage) { + signedMessage := voteMessage.Message + blockHash := signedMessage.BlockHash + authorityID := signedMessage.AuthorityID + + voteMessages, has := vt.mapping[blockHash] + if !has { + // add new block hash in tracker + vt.cleanup() + elementData := newBlockHashAuthID(blockHash, authorityID) + element := vt.linkedList.PushFront(elementData) + data := voteMessageMapData{ + peerID: peerID, + message: voteMessage, + element: element, + } + vt.mapping[blockHash] = authorityIDToData{ + authorityID: data, + } + return + } + + data, voteExists := voteMessages[authorityID] + if voteExists { + // vote already exists so override the vote for the authority ID; + // do not move the list element in the linked list to avoid + // someone re-sending an equivocatory vote message and going at the + // front of the list, hence erasing other possible valid vote messages + // in the tracker. + data.peerID = peerID + data.message = voteMessage + voteMessages[authorityID] = data + return + } + + // Add new authority ID in existing block hash map + vt.cleanup() + elementData := newBlockHashAuthID(blockHash, authorityID) + element := vt.linkedList.PushFront(elementData) + data = voteMessageMapData{ + peerID: peerID, + message: voteMessage, + element: element, + } + voteMessages[authorityID] = data +} + +// cleanup removes the oldest vote message from the tracker +// if the number of vote messages is at the tracker capacity. +// This method is designed to be called automatically from the +// add method and should not be called elsewhere. +func (vt *votesTracker) cleanup() { + if vt.linkedList.Len() < vt.capacity { + return + } + + oldestElement := vt.linkedList.Back() + vt.linkedList.Remove(oldestElement) + + oldestData := oldestElement.Value.(blockHashAuthID) + authIDToData := vt.mapping[oldestData.blockHash] + + delete(authIDToData, oldestData.authorityID) + if len(authIDToData) == 0 { + delete(vt.mapping, oldestData.blockHash) + } +} + +// delete deletes all the vote messages for a particular +// block hash from the vote messages tracker. +func (vt *votesTracker) delete(blockHash common.Hash) { + authIDToData, has := vt.mapping[blockHash] + if !has { + return + } + + for _, data := range authIDToData { + vt.linkedList.Remove(data.element) + } + + delete(vt.mapping, blockHash) +} + +// getMessagesForBlockHash returns all the vote messages +// for a particular block hash from the tracker as a slice +// of networkVoteMessage. +// It returns nil if the block hash does not exist. +func (vt *votesTracker) getMessagesForBlockHash( + blockHash common.Hash) (messages []networkVoteMessage) { + authIDToData, ok := vt.mapping[blockHash] + if !ok { + // Note authIDToData cannot be empty + return nil + } + + messages = make([]networkVoteMessage, 0, len(authIDToData)) + for _, data := range authIDToData { + message := networkVoteMessage{ + from: data.peerID, + msg: data.message, + } + messages = append(messages, message) + } + return messages +} + +// forEach runs the function `f` on each +// peer id + message stored in the tracker. +func (vt *votesTracker) forEach( + f func(peerID peer.ID, message *VoteMessage)) { + for _, authorityIDToData := range vt.mapping { + for _, data := range authorityIDToData { + f(data.peerID, data.message) + } + } +} diff --git a/lib/grandpa/tracker_votes_test.go b/lib/grandpa/tracker_votes_test.go new file mode 100644 index 0000000000..1225165060 --- /dev/null +++ b/lib/grandpa/tracker_votes_test.go @@ -0,0 +1,372 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package grandpa + +import ( + "bytes" + "container/list" + "sort" + "testing" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/crypto/ed25519" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// buildVoteMessage creates a test vote message using the +// given block hash and authority ID only. +func buildVoteMessage(blockHash common.Hash, + authorityID ed25519.PublicKeyBytes) *VoteMessage { + return &VoteMessage{ + Message: SignedMessage{ + BlockHash: blockHash, + AuthorityID: authorityID, + }, + } +} + +func assertVotesMapping(t *testing.T, + mapping map[common.Hash]authorityIDToData, + expected map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage) { + t.Helper() + + require.Len(t, mapping, len(expected), "mapping does not have the expected length") + for expectedBlockHash, expectedAuthIDToMessage := range expected { + submap, ok := mapping[expectedBlockHash] + require.Truef(t, ok, "block hash %s not found in mapping", expectedBlockHash) + require.Lenf(t, submap, len(expectedAuthIDToMessage), + "submapping for block hash %s does not have the expected length", expectedBlockHash) + for expectedAuthorityID, expectedMessage := range expectedAuthIDToMessage { + data, ok := submap[expectedAuthorityID] + assert.Truef(t, ok, + "submapping for block hash %s does not have expected authority id %s", + expectedBlockHash, expectedAuthorityID) + assert.Equalf(t, expectedMessage, data.message, + "message for block hash %s and authority id %s is not as expected", + expectedBlockHash, expectedAuthorityID) + } + } +} + +func Test_newVotesTracker(t *testing.T) { + t.Parallel() + + const capacity = 1 + expected := votesTracker{ + mapping: make(map[common.Hash]authorityIDToData, capacity), + linkedList: list.New(), + capacity: capacity, + } + vt := newVotesTracker(capacity) + + assert.Equal(t, expected, vt) +} + +// We cannot really unit test each method independently +// due to the dependency on the double linked list from +// the standard package `list` which has private fields +// which cannot be set. +// For example we cannot assert the votes tracker mapping +// entirely due to the linked list elements unexported fields. + +func Test_votesTracker_cleanup(t *testing.T) { + t.Parallel() + + t.Run("in same block", func(t *testing.T) { + t.Parallel() + + const capacity = 2 + tracker := newVotesTracker(capacity) + + blockHashA := common.Hash{0xa} + + authIDA := ed25519.PublicKeyBytes{0xa} + authIDB := ed25519.PublicKeyBytes{0xb} + authIDC := ed25519.PublicKeyBytes{0xc} + + messageBlockAAuthA := buildVoteMessage(blockHashA, authIDA) + messageBlockAAuthB := buildVoteMessage(blockHashA, authIDB) + messageBlockAAuthC := buildVoteMessage(blockHashA, authIDC) + + const somePeer = peer.ID("abc") + + tracker.add(somePeer, messageBlockAAuthA) + tracker.add(somePeer, messageBlockAAuthB) + // Add third message for block A and authority id C. + // This triggers a cleanup removing the oldest message + // which is for block A and authority id A. + tracker.add(somePeer, messageBlockAAuthC) + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + blockHashA: { + authIDB: messageBlockAAuthB, + authIDC: messageBlockAAuthC, + }, + }) + }) + + t.Run("remove entire block", func(t *testing.T) { + t.Parallel() + + const capacity = 2 + tracker := newVotesTracker(capacity) + + blockHashA := common.Hash{0xa} + blockHashB := common.Hash{0xb} + + authIDA := ed25519.PublicKeyBytes{0xa} + authIDB := ed25519.PublicKeyBytes{0xb} + + messageBlockAAuthA := buildVoteMessage(blockHashA, authIDA) + messageBlockBAuthA := buildVoteMessage(blockHashB, authIDA) + messageBlockBAuthB := buildVoteMessage(blockHashB, authIDB) + + const somePeer = peer.ID("abc") + + tracker.add(somePeer, messageBlockAAuthA) + tracker.add(somePeer, messageBlockBAuthA) + // Add third message for block B and authority id B. + // This triggers a cleanup removing the oldest message + // which is for block A and authority id A. The block A + // is also completely removed since it does not contain + // any authority ID (vote message) anymore. + tracker.add(somePeer, messageBlockBAuthB) + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + blockHashB: { + authIDA: messageBlockBAuthA, + authIDB: messageBlockBAuthB, + }, + }) + }) +} + +// This test verifies overidding a value does not affect the +// input order for which each message was added. +func Test_votesTracker_overriding(t *testing.T) { + t.Parallel() + + t.Run("override oldest", func(t *testing.T) { + t.Parallel() + + const capacity = 2 + tracker := newVotesTracker(capacity) + + blockHashA := common.Hash{0xa} + blockHashB := common.Hash{0xb} + + authIDA := ed25519.PublicKeyBytes{0xa} + authIDB := ed25519.PublicKeyBytes{0xb} + + messageBlockAAuthA := buildVoteMessage(blockHashA, authIDA) + messageBlockBAuthA := buildVoteMessage(blockHashB, authIDA) + messageBlockBAuthB := buildVoteMessage(blockHashB, authIDB) + + const somePeer = peer.ID("abc") + + tracker.add(somePeer, messageBlockAAuthA) + tracker.add(somePeer, messageBlockBAuthA) + tracker.add(somePeer, messageBlockAAuthA) // override oldest + tracker.add(somePeer, messageBlockBAuthB) + + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + blockHashB: { + authIDA: messageBlockBAuthA, + authIDB: messageBlockBAuthB, + }, + }) + }) + + t.Run("override newest", func(t *testing.T) { + t.Parallel() + + const capacity = 2 + tracker := newVotesTracker(capacity) + + blockHashA := common.Hash{0xa} + blockHashB := common.Hash{0xb} + + authIDA := ed25519.PublicKeyBytes{0xa} + authIDB := ed25519.PublicKeyBytes{0xb} + + messageBlockAAuthA := buildVoteMessage(blockHashA, authIDA) + messageBlockBAuthA := buildVoteMessage(blockHashB, authIDA) + messageBlockBAuthB := buildVoteMessage(blockHashB, authIDB) + + const somePeer = peer.ID("abc") + + tracker.add(somePeer, messageBlockAAuthA) + tracker.add(somePeer, messageBlockBAuthA) + tracker.add(somePeer, messageBlockBAuthA) // override newest + tracker.add(somePeer, messageBlockBAuthB) + + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + blockHashB: { + authIDA: messageBlockBAuthA, + authIDB: messageBlockBAuthB, + }, + }) + }) +} + +func Test_votesTracker_delete(t *testing.T) { + t.Parallel() + + t.Run("non existing block hash", func(t *testing.T) { + t.Parallel() + + const capacity = 2 + tracker := newVotesTracker(capacity) + + blockHashA := common.Hash{0xa} + blockHashB := common.Hash{0xb} + + authIDA := ed25519.PublicKeyBytes{0xa} + + messageBlockAAuthA := buildVoteMessage(blockHashA, authIDA) + + const somePeer = peer.ID("abc") + + tracker.add(somePeer, messageBlockAAuthA) + tracker.delete(blockHashB) + + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + blockHashA: { + authIDA: messageBlockAAuthA, + }, + }) + }) + + t.Run("existing block hash", func(t *testing.T) { + t.Parallel() + + const capacity = 2 + tracker := newVotesTracker(capacity) + + blockHashA := common.Hash{0xa} + authIDA := ed25519.PublicKeyBytes{0xa} + authIDB := ed25519.PublicKeyBytes{0xb} + messageBlockAAuthA := buildVoteMessage(blockHashA, authIDA) + messageBlockAAuthB := buildVoteMessage(blockHashA, authIDB) + + const somePeer = peer.ID("abc") + + tracker.add(somePeer, messageBlockAAuthA) + tracker.add(somePeer, messageBlockAAuthB) + tracker.delete(blockHashA) + + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{}) + }) +} + +func Test_votesTracker_getMessagesForBlockHash(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + votesTracker *votesTracker + blockHash common.Hash + messages []networkVoteMessage + }{ + "non existing block hash": { + votesTracker: &votesTracker{ + mapping: map[common.Hash]authorityIDToData{ + {1}: {}, + }, + linkedList: list.New(), + }, + blockHash: common.Hash{2}, + }, + "existing block hash": { + votesTracker: &votesTracker{ + mapping: map[common.Hash]authorityIDToData{ + {1}: { + ed25519.PublicKeyBytes{1}: { + peerID: "a", + message: &VoteMessage{Round: 1}, + }, + ed25519.PublicKeyBytes{2}: { + peerID: "a", + message: &VoteMessage{Round: 2}, + }, + }, + }, + }, + blockHash: common.Hash{1}, + messages: []networkVoteMessage{ + {from: peer.ID("a"), msg: &VoteMessage{Round: 1}}, + {from: peer.ID("a"), msg: &VoteMessage{Round: 2}}, + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + t.Parallel() + + vt := testCase.votesTracker + messages := vt.getMessagesForBlockHash(testCase.blockHash) + + assert.Equal(t, testCase.messages, messages) + }) + } +} + +func Test_votesTracker_forEach(t *testing.T) { + t.Parallel() + + const capacity = 10 + vt := newVotesTracker(capacity) + + blockHashA := common.Hash{0xa} + blockHashB := common.Hash{0xb} + + authIDA := ed25519.PublicKeyBytes{0xa} + authIDB := ed25519.PublicKeyBytes{0xb} + + messageBlockAAuthA := buildVoteMessage(blockHashA, authIDA) + messageBlockAAuthB := buildVoteMessage(blockHashA, authIDB) + messageBlockBAuthA := buildVoteMessage(blockHashB, authIDA) + + vt.add("a", messageBlockAAuthA) + vt.add("b", messageBlockAAuthB) + vt.add("b", messageBlockBAuthA) + + type result struct { + peerID peer.ID + message *VoteMessage + } + var results []result + + vt.forEach(func(peerID peer.ID, message *VoteMessage) { + results = append(results, result{ + peerID: peerID, + message: message, + }) + }) + + // Predictable messages order for assertion. + // Sort by block hash then authority id then peer ID. + sort.Slice(results, func(i, j int) bool { + blockHashFirst := results[i].message.Message.BlockHash + blockHashSecond := results[j].message.Message.BlockHash + if blockHashFirst == blockHashSecond { + authIDFirst := results[i].message.Message.AuthorityID + authIDSecond := results[j].message.Message.AuthorityID + if authIDFirst == authIDSecond { + return results[i].peerID < results[j].peerID + } + return bytes.Compare(authIDFirst[:], authIDSecond[:]) < 0 + } + return bytes.Compare(blockHashFirst[:], blockHashSecond[:]) < 0 + }) + + expectedResults := []result{ + {peerID: "a", message: messageBlockAAuthA}, + {peerID: "b", message: messageBlockAAuthB}, + {peerID: "b", message: messageBlockBAuthA}, + } + + assert.Equal(t, expectedResults, results) +} diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index a2908e80f9..b22385768a 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -169,10 +169,7 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro if m.Round < s.state.round+maxFutureRoundsDiff { // We ensure the message round is not too far away // from our state round, to avoid abuses of the tracker storage. - s.tracker.addVote(&networkVoteMessage{ - from: from, - msg: m, - }) + s.tracker.addVote(from, m) } } @@ -199,10 +196,7 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro errors.Is(err, blocktree.ErrDescendantNotFound) || errors.Is(err, blocktree.ErrEndNodeNotFound) || errors.Is(err, blocktree.ErrStartNodeNotFound) { - s.tracker.addVote(&networkVoteMessage{ - from: from, - msg: m, - }) + s.tracker.addVote(from, m) } if err != nil { return nil, err From d7501f47413fb2862df218db213d513fd1ee7bac Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 20 Apr 2022 11:22:42 +0000 Subject: [PATCH 04/12] Discard messages +- 1 round from our state --- lib/grandpa/vote_message.go | 72 +++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index b22385768a..1098aa7362 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -126,11 +126,13 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro // check for message signature pk, err := ed25519.NewPublicKey(m.Message.AuthorityID[:]) if err != nil { + // TODO: affect peer reputation return nil, err } err = validateMessageSignature(pk, m) if err != nil { + // TODO: affect peer reputation return nil, err } @@ -138,43 +140,53 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro return nil, ErrSetIDMismatch } - // check that vote is for current round - if m.Round != s.state.round { - if m.Round < s.state.round { - // peer doesn't know round was finalised, send out another commit message - header, err := s.blockState.GetFinalisedHeader(m.Round, m.SetID) - if err != nil { - return nil, err - } + const maxRoundsLag = 1 + minRoundAccepted := s.state.round - maxRoundsLag + if minRoundAccepted > s.state.round { + // we overflowed below 0 so set the minimum to 0. + minRoundAccepted = 0 + } - cm, err := s.newCommitMessage(header, m.Round) - if err != nil { - return nil, err - } + const maxRoundsAhead = 1 + maxRoundAccepted := s.state.round + maxRoundsAhead - // send finalised block from previous round to network - msg, err := cm.ToConsensusMessage() - if err != nil { - return nil, err - } + if m.Round < minRoundAccepted || m.Round > maxRoundAccepted { + // Discard message + // TODO: affect peer reputation, this is shameful impolite behaviour + return nil, nil //nolint:nilnil + } - if err = s.network.SendMessage(from, msg); err != nil { - logger.Warnf("failed to send CommitMessage: %s", err) - } - } else { - // Message round is higher than the round of our state, - // we may be lagging behind, so store the message in the tracker - // for processing later. - const maxFutureRoundsDiff = 5 - if m.Round < s.state.round+maxFutureRoundsDiff { - // We ensure the message round is not too far away - // from our state round, to avoid abuses of the tracker storage. - s.tracker.addVote(from, m) - } + if m.Round < s.state.round { + // message round is lagging by 1 + // peer doesn't know round was finalised, send out another commit message + header, err := s.blockState.GetFinalisedHeader(m.Round, m.SetID) + if err != nil { + return nil, err + } + + cm, err := s.newCommitMessage(header, m.Round) + if err != nil { + return nil, err + } + + // send finalised block from previous round to network + msg, err := cm.ToConsensusMessage() + if err != nil { + return nil, err + } + + if err = s.network.SendMessage(from, msg); err != nil { + logger.Warnf("failed to send CommitMessage: %s", err) } // TODO: get justification if your round is lower, or just do catch-up? (#1815) return nil, errRoundMismatch(m.Round, s.state.round) + } else if m.Round > s.state.round { + // Message round is higher by 1 than the round of our state, + // we may be lagging behind, so store the message in the tracker + // for processing later in the coming few milliseconds. + s.tracker.addVote(from, m) + return nil, errRoundMismatch(m.Round, s.state.round) } // check for equivocation ie. multiple votes within one subround From dec5bce875a1b99eb1edbcdbf0627686f9c478a4 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Mon, 25 Apr 2022 10:34:17 +0000 Subject: [PATCH 05/12] Fix `Test_votesTracker_getMessagesForBlockHash` --- lib/grandpa/tracker_votes.go | 2 +- lib/grandpa/tracker_votes_test.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/grandpa/tracker_votes.go b/lib/grandpa/tracker_votes.go index 843b4337a7..67e1de2e36 100644 --- a/lib/grandpa/tracker_votes.go +++ b/lib/grandpa/tracker_votes.go @@ -146,7 +146,7 @@ func (vt *votesTracker) delete(blockHash common.Hash) { // getMessagesForBlockHash returns all the vote messages // for a particular block hash from the tracker as a slice -// of networkVoteMessage. +// of networkVoteMessage. There is no order in the slice. // It returns nil if the block hash does not exist. func (vt *votesTracker) getMessagesForBlockHash( blockHash common.Hash) (messages []networkVoteMessage) { diff --git a/lib/grandpa/tracker_votes_test.go b/lib/grandpa/tracker_votes_test.go index 1225165060..4094af3a17 100644 --- a/lib/grandpa/tracker_votes_test.go +++ b/lib/grandpa/tracker_votes_test.go @@ -308,6 +308,13 @@ func Test_votesTracker_getMessagesForBlockHash(t *testing.T) { vt := testCase.votesTracker messages := vt.getMessagesForBlockHash(testCase.blockHash) + sort.Slice(messages, func(i, j int) bool { + if messages[i].from == messages[j].from { + return messages[i].msg.Round < messages[j].msg.Round + } + return messages[i].from < messages[j].from + }) + assert.Equal(t, testCase.messages, messages) }) } From 9475ab0501c0481b91f8ecd45f593e4a95b4e906 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 4 May 2022 15:17:43 +0000 Subject: [PATCH 06/12] Rename `tracker_votes.go` to `votes_tracker.go` --- lib/grandpa/{tracker_votes.go => votes_tracker.go} | 0 lib/grandpa/{tracker_votes_test.go => votes_tracker_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename lib/grandpa/{tracker_votes.go => votes_tracker.go} (100%) rename lib/grandpa/{tracker_votes_test.go => votes_tracker_test.go} (100%) diff --git a/lib/grandpa/tracker_votes.go b/lib/grandpa/votes_tracker.go similarity index 100% rename from lib/grandpa/tracker_votes.go rename to lib/grandpa/votes_tracker.go diff --git a/lib/grandpa/tracker_votes_test.go b/lib/grandpa/votes_tracker_test.go similarity index 100% rename from lib/grandpa/tracker_votes_test.go rename to lib/grandpa/votes_tracker_test.go From 2235b132b1864856a103364e46847535efa77116 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Wed, 4 May 2022 15:20:25 +0000 Subject: [PATCH 07/12] Rename `getMessagesForBlockHash` to `messages` --- lib/grandpa/message_tracker.go | 2 +- lib/grandpa/message_tracker_test.go | 6 +++--- lib/grandpa/votes_tracker.go | 6 +++--- lib/grandpa/votes_tracker_test.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 395a1ef0df..474a175b6d 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -104,7 +104,7 @@ func (t *tracker) handleBlock(b *types.Block) { defer t.mapLock.Unlock() h := b.Header.Hash() - vms := t.votes.getMessagesForBlockHash(h) + vms := t.votes.messages(h) for _, v := range vms { // handleMessage would never error for vote message _, err := t.handler.handleMessage(v.from, v.msg) diff --git a/lib/grandpa/message_tracker_test.go b/lib/grandpa/message_tracker_test.go index 3500638c33..4b64b15b05 100644 --- a/lib/grandpa/message_tracker_test.go +++ b/lib/grandpa/message_tracker_test.go @@ -175,7 +175,7 @@ func TestMessageTracker_MapInsideMap(t *testing.T) { } hash := header.Hash() - messages := gs.tracker.votes.getMessagesForBlockHash(hash) + messages := gs.tracker.votes.messages(hash) require.Empty(t, messages) gs.keypair = kr.Alice().(*ed25519.Keypair) @@ -217,7 +217,7 @@ func TestMessageTracker_handleTick(t *testing.T) { } // shouldn't be deleted as round in message >= grandpa round - require.Len(t, gs.tracker.votes.getMessagesForBlockHash(testHash), 1) + require.Len(t, gs.tracker.votes.messages(testHash), 1) gs.state.round = 1 msg = &VoteMessage{ @@ -238,5 +238,5 @@ func TestMessageTracker_handleTick(t *testing.T) { } // should be deleted as round in message < grandpa round - require.Empty(t, gs.tracker.votes.getMessagesForBlockHash(testHash)) + require.Empty(t, gs.tracker.votes.messages(testHash)) } diff --git a/lib/grandpa/votes_tracker.go b/lib/grandpa/votes_tracker.go index 67e1de2e36..5c64e1abbb 100644 --- a/lib/grandpa/votes_tracker.go +++ b/lib/grandpa/votes_tracker.go @@ -144,12 +144,12 @@ func (vt *votesTracker) delete(blockHash common.Hash) { delete(vt.mapping, blockHash) } -// getMessagesForBlockHash returns all the vote messages +// messages returns all the vote messages // for a particular block hash from the tracker as a slice // of networkVoteMessage. There is no order in the slice. // It returns nil if the block hash does not exist. -func (vt *votesTracker) getMessagesForBlockHash( - blockHash common.Hash) (messages []networkVoteMessage) { +func (vt *votesTracker) messages(blockHash common.Hash) ( + messages []networkVoteMessage) { authIDToData, ok := vt.mapping[blockHash] if !ok { // Note authIDToData cannot be empty diff --git a/lib/grandpa/votes_tracker_test.go b/lib/grandpa/votes_tracker_test.go index 4094af3a17..086cacb492 100644 --- a/lib/grandpa/votes_tracker_test.go +++ b/lib/grandpa/votes_tracker_test.go @@ -260,7 +260,7 @@ func Test_votesTracker_delete(t *testing.T) { }) } -func Test_votesTracker_getMessagesForBlockHash(t *testing.T) { +func Test_votesTracker_messages(t *testing.T) { t.Parallel() testCases := map[string]struct { @@ -306,7 +306,7 @@ func Test_votesTracker_getMessagesForBlockHash(t *testing.T) { t.Parallel() vt := testCase.votesTracker - messages := vt.getMessagesForBlockHash(testCase.blockHash) + messages := vt.messages(testCase.blockHash) sort.Slice(messages, func(i, j int) bool { if messages[i].from == messages[j].from { From 5a388f15ae79e36f7fb6a0b715e1558da5d89925 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 13 May 2022 11:40:29 +0000 Subject: [PATCH 08/12] Link to Grandpa polite issue --- lib/grandpa/vote_message.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 1098aa7362..d54e605fa1 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -126,13 +126,15 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro // check for message signature pk, err := ed25519.NewPublicKey(m.Message.AuthorityID[:]) if err != nil { - // TODO: affect peer reputation + // TODO Affect peer reputation + // https://github.com/ChainSafe/gossamer/issues/2505 return nil, err } err = validateMessageSignature(pk, m) if err != nil { - // TODO: affect peer reputation + // TODO Affect peer reputation + // https://github.com/ChainSafe/gossamer/issues/2505 return nil, err } @@ -153,6 +155,7 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro if m.Round < minRoundAccepted || m.Round > maxRoundAccepted { // Discard message // TODO: affect peer reputation, this is shameful impolite behaviour + // https://github.com/ChainSafe/gossamer/issues/2505 return nil, nil //nolint:nilnil } From 5b117cca88acb01b82ab3f848ff80bd0c1f2d488 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 13 May 2022 11:47:55 +0000 Subject: [PATCH 09/12] Less repetition in `add` --- lib/grandpa/votes_tracker.go | 49 +++++++++++++++--------------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/lib/grandpa/votes_tracker.go b/lib/grandpa/votes_tracker.go index 5c64e1abbb..de380ac7bc 100644 --- a/lib/grandpa/votes_tracker.go +++ b/lib/grandpa/votes_tracker.go @@ -66,41 +66,32 @@ func (vt *votesTracker) add(peerID peer.ID, voteMessage *VoteMessage) { blockHash := signedMessage.BlockHash authorityID := signedMessage.AuthorityID - voteMessages, has := vt.mapping[blockHash] - if !has { - // add new block hash in tracker - vt.cleanup() - elementData := newBlockHashAuthID(blockHash, authorityID) - element := vt.linkedList.PushFront(elementData) - data := voteMessageMapData{ - peerID: peerID, - message: voteMessage, - element: element, + voteMessages, blockHashExists := vt.mapping[blockHash] + if blockHashExists { + data, voteExists := voteMessages[authorityID] + if voteExists { + // vote already exists so override the vote for the authority ID; + // do not move the list element in the linked list to avoid + // someone re-sending an equivocatory vote message and going at the + // front of the list, hence erasing other possible valid vote messages + // in the tracker. + data.peerID = peerID + data.message = voteMessage + voteMessages[authorityID] = data + return } - vt.mapping[blockHash] = authorityIDToData{ - authorityID: data, - } - return - } - - data, voteExists := voteMessages[authorityID] - if voteExists { - // vote already exists so override the vote for the authority ID; - // do not move the list element in the linked list to avoid - // someone re-sending an equivocatory vote message and going at the - // front of the list, hence erasing other possible valid vote messages - // in the tracker. - data.peerID = peerID - data.message = voteMessage - voteMessages[authorityID] = data - return + // continue below and add the authority ID and data to the tracker. + } else { + // add new block hash in tracker + voteMessages = make(authorityIDToData) + vt.mapping[blockHash] = voteMessages + // continue below and add the authority ID and data to the tracker. } - // Add new authority ID in existing block hash map vt.cleanup() elementData := newBlockHashAuthID(blockHash, authorityID) element := vt.linkedList.PushFront(elementData) - data = voteMessageMapData{ + data := voteMessageMapData{ peerID: peerID, message: voteMessage, element: element, From 5e9ba1e2b76876e889d6f99780ea36d99a3d62d4 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 13 May 2022 14:56:30 +0000 Subject: [PATCH 10/12] Replace `forEach` by `networkVoteMessages` --- lib/grandpa/message_tracker.go | 10 +++----- lib/grandpa/votes_tracker.go | 17 +++++++++---- lib/grandpa/votes_tracker_test.go | 42 ++++++------------------------- 3 files changed, 23 insertions(+), 46 deletions(-) diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 474a175b6d..26bb1b42c2 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -129,8 +129,9 @@ func (t *tracker) handleTick() { t.mapLock.Lock() defer t.mapLock.Unlock() - var blockHashesDone []common.Hash - t.votes.forEach(func(peerID peer.ID, message *VoteMessage) { + for _, networkVoteMessage := range t.votes.networkVoteMessages() { + peerID := networkVoteMessage.from + message := networkVoteMessage.msg _, err := t.handler.handleMessage(peerID, message) if err != nil { // handleMessage would never error for vote message @@ -138,11 +139,8 @@ func (t *tracker) handleTick() { } if message.Round < t.handler.grandpa.state.round && message.SetID == t.handler.grandpa.state.setID { - blockHashesDone = append(blockHashesDone, message.Message.BlockHash) + t.votes.delete(message.Message.BlockHash) } - }) - for _, blockHashDone := range blockHashesDone { - t.votes.delete(blockHashDone) } for _, cm := range t.commitMessages { diff --git a/lib/grandpa/votes_tracker.go b/lib/grandpa/votes_tracker.go index de380ac7bc..a90ef0968e 100644 --- a/lib/grandpa/votes_tracker.go +++ b/lib/grandpa/votes_tracker.go @@ -158,13 +158,20 @@ func (vt *votesTracker) messages(blockHash common.Hash) ( return messages } -// forEach runs the function `f` on each -// peer id + message stored in the tracker. -func (vt *votesTracker) forEach( - f func(peerID peer.ID, message *VoteMessage)) { +// networkVoteMessages returns all pairs of +// peer id + message stored in the tracker +// as a slice of networkVoteMessages. +func (vt *votesTracker) networkVoteMessages() ( + messages []networkVoteMessage) { + messages = make([]networkVoteMessage, 0, vt.linkedList.Len()) for _, authorityIDToData := range vt.mapping { for _, data := range authorityIDToData { - f(data.peerID, data.message) + message := networkVoteMessage{ + from: data.peerID, + msg: data.message, + } + messages = append(messages, message) } } + return messages } diff --git a/lib/grandpa/votes_tracker_test.go b/lib/grandpa/votes_tracker_test.go index 086cacb492..eb1e2e4734 100644 --- a/lib/grandpa/votes_tracker_test.go +++ b/lib/grandpa/votes_tracker_test.go @@ -4,7 +4,6 @@ package grandpa import ( - "bytes" "container/list" "sort" "testing" @@ -320,7 +319,7 @@ func Test_votesTracker_messages(t *testing.T) { } } -func Test_votesTracker_forEach(t *testing.T) { +func Test_votesTracker_networkVoteMessages(t *testing.T) { t.Parallel() const capacity = 10 @@ -340,40 +339,13 @@ func Test_votesTracker_forEach(t *testing.T) { vt.add("b", messageBlockAAuthB) vt.add("b", messageBlockBAuthA) - type result struct { - peerID peer.ID - message *VoteMessage - } - var results []result - - vt.forEach(func(peerID peer.ID, message *VoteMessage) { - results = append(results, result{ - peerID: peerID, - message: message, - }) - }) - - // Predictable messages order for assertion. - // Sort by block hash then authority id then peer ID. - sort.Slice(results, func(i, j int) bool { - blockHashFirst := results[i].message.Message.BlockHash - blockHashSecond := results[j].message.Message.BlockHash - if blockHashFirst == blockHashSecond { - authIDFirst := results[i].message.Message.AuthorityID - authIDSecond := results[j].message.Message.AuthorityID - if authIDFirst == authIDSecond { - return results[i].peerID < results[j].peerID - } - return bytes.Compare(authIDFirst[:], authIDSecond[:]) < 0 - } - return bytes.Compare(blockHashFirst[:], blockHashSecond[:]) < 0 - }) + networkVoteMessages := vt.networkVoteMessages() - expectedResults := []result{ - {peerID: "a", message: messageBlockAAuthA}, - {peerID: "b", message: messageBlockAAuthB}, - {peerID: "b", message: messageBlockBAuthA}, + expectedNetworkVoteMessages := []networkVoteMessage{ + {from: "a", msg: messageBlockAAuthA}, + {from: "b", msg: messageBlockAAuthB}, + {from: "b", msg: messageBlockBAuthA}, } - assert.Equal(t, expectedResults, results) + assert.ElementsMatch(t, expectedNetworkVoteMessages, networkVoteMessages) } From c6ccd6aa11959e70b537157e324ca5eadfeae359 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 13 May 2022 14:59:02 +0000 Subject: [PATCH 11/12] Change to have list element pointer as map value --- lib/grandpa/message_tracker_test.go | 6 +- lib/grandpa/votes_tracker.go | 100 ++++++++++------------------ lib/grandpa/votes_tracker_test.go | 69 +++++++++++-------- 3 files changed, 80 insertions(+), 95 deletions(-) diff --git a/lib/grandpa/message_tracker_test.go b/lib/grandpa/message_tracker_test.go index 4b64b15b05..56a43adeef 100644 --- a/lib/grandpa/message_tracker_test.go +++ b/lib/grandpa/message_tracker_test.go @@ -21,17 +21,17 @@ import ( func getMessageFromVotesTracker(votes votesTracker, blockHash common.Hash, authorityID ed25519.PublicKeyBytes) ( message *VoteMessage) { - authIDToData, has := votes.mapping[blockHash] + authorityIDToElement, has := votes.mapping[blockHash] if !has { return nil } - data, ok := authIDToData[authorityID] + element, ok := authorityIDToElement[authorityID] if !ok { return nil } - return data.message + return element.Value.(networkVoteMessage).msg } func TestMessageTracker_ValidateMessage(t *testing.T) { diff --git a/lib/grandpa/votes_tracker.go b/lib/grandpa/votes_tracker.go index a90ef0968e..ed69088e5c 100644 --- a/lib/grandpa/votes_tracker.go +++ b/lib/grandpa/votes_tracker.go @@ -17,47 +17,23 @@ import ( // It is NOT THREAD SAFE to use. type votesTracker struct { // map of vote block hash to authority ID (ed25519 public Key) - // to data (peer id + message + tracking linked list element pointer) - mapping map[common.Hash]authorityIDToData - // double linked list of block hash + authority ID - // to track the order vote messages were added in. + // to linked list element pointer + mapping map[common.Hash]map[ed25519.PublicKeyBytes]*list.Element + // double linked list of voteMessageData (peer ID + Vote Message) linkedList *list.List capacity int } -type authorityIDToData map[ed25519.PublicKeyBytes]voteMessageMapData - -type voteMessageMapData struct { - peerID peer.ID - message *VoteMessage - // element contains a blockHashAuthID value which - // itself contains a block hash and an authority ID. - element *list.Element -} - -type blockHashAuthID struct { - blockHash common.Hash - authorityID ed25519.PublicKeyBytes -} - // newVotesTracker creates a new vote message tracker // with the capacity specified. func newVotesTracker(capacity int) votesTracker { return votesTracker{ - mapping: make(map[common.Hash]authorityIDToData, capacity), + mapping: make(map[common.Hash]map[ed25519.PublicKeyBytes]*list.Element, capacity), linkedList: list.New(), capacity: capacity, } } -func newBlockHashAuthID(blockHash common.Hash, - authorityID ed25519.PublicKeyBytes) blockHashAuthID { - return blockHashAuthID{ - blockHash: blockHash, - authorityID: authorityID, - } -} - // add adds a vote message to the vote message tracker. // If the vote message tracker capacity is reached, // the oldest vote message is removed. @@ -66,37 +42,36 @@ func (vt *votesTracker) add(peerID peer.ID, voteMessage *VoteMessage) { blockHash := signedMessage.BlockHash authorityID := signedMessage.AuthorityID - voteMessages, blockHashExists := vt.mapping[blockHash] + authorityIDToElement, blockHashExists := vt.mapping[blockHash] if blockHashExists { - data, voteExists := voteMessages[authorityID] + element, voteExists := authorityIDToElement[authorityID] if voteExists { // vote already exists so override the vote for the authority ID; // do not move the list element in the linked list to avoid // someone re-sending an equivocatory vote message and going at the // front of the list, hence erasing other possible valid vote messages // in the tracker. - data.peerID = peerID - data.message = voteMessage - voteMessages[authorityID] = data + element.Value = networkVoteMessage{ + from: peerID, + msg: voteMessage, + } return } // continue below and add the authority ID and data to the tracker. } else { // add new block hash in tracker - voteMessages = make(authorityIDToData) - vt.mapping[blockHash] = voteMessages + authorityIDToElement = make(map[ed25519.PublicKeyBytes]*list.Element) + vt.mapping[blockHash] = authorityIDToElement // continue below and add the authority ID and data to the tracker. } vt.cleanup() - elementData := newBlockHashAuthID(blockHash, authorityID) - element := vt.linkedList.PushFront(elementData) - data := voteMessageMapData{ - peerID: peerID, - message: voteMessage, - element: element, + elementData := networkVoteMessage{ + from: peerID, + msg: voteMessage, } - voteMessages[authorityID] = data + element := vt.linkedList.PushFront(elementData) + authorityIDToElement[authorityID] = element } // cleanup removes the oldest vote message from the tracker @@ -111,25 +86,28 @@ func (vt *votesTracker) cleanup() { oldestElement := vt.linkedList.Back() vt.linkedList.Remove(oldestElement) - oldestData := oldestElement.Value.(blockHashAuthID) - authIDToData := vt.mapping[oldestData.blockHash] + oldestData := oldestElement.Value.(networkVoteMessage) + oldestBlockHash := oldestData.msg.Message.BlockHash + oldestAuthorityID := oldestData.msg.Message.AuthorityID - delete(authIDToData, oldestData.authorityID) - if len(authIDToData) == 0 { - delete(vt.mapping, oldestData.blockHash) + authIDToElement := vt.mapping[oldestBlockHash] + + delete(authIDToElement, oldestAuthorityID) + if len(authIDToElement) == 0 { + delete(vt.mapping, oldestBlockHash) } } // delete deletes all the vote messages for a particular // block hash from the vote messages tracker. func (vt *votesTracker) delete(blockHash common.Hash) { - authIDToData, has := vt.mapping[blockHash] + authIDToElement, has := vt.mapping[blockHash] if !has { return } - for _, data := range authIDToData { - vt.linkedList.Remove(data.element) + for _, element := range authIDToElement { + vt.linkedList.Remove(element) } delete(vt.mapping, blockHash) @@ -141,18 +119,15 @@ func (vt *votesTracker) delete(blockHash common.Hash) { // It returns nil if the block hash does not exist. func (vt *votesTracker) messages(blockHash common.Hash) ( messages []networkVoteMessage) { - authIDToData, ok := vt.mapping[blockHash] + authIDToElement, ok := vt.mapping[blockHash] if !ok { - // Note authIDToData cannot be empty + // Note authIDToElement cannot be empty return nil } - messages = make([]networkVoteMessage, 0, len(authIDToData)) - for _, data := range authIDToData { - message := networkVoteMessage{ - from: data.peerID, - msg: data.message, - } + messages = make([]networkVoteMessage, 0, len(authIDToElement)) + for _, element := range authIDToElement { + message := element.Value.(networkVoteMessage) messages = append(messages, message) } return messages @@ -164,12 +139,9 @@ func (vt *votesTracker) messages(blockHash common.Hash) ( func (vt *votesTracker) networkVoteMessages() ( messages []networkVoteMessage) { messages = make([]networkVoteMessage, 0, vt.linkedList.Len()) - for _, authorityIDToData := range vt.mapping { - for _, data := range authorityIDToData { - message := networkVoteMessage{ - from: data.peerID, - msg: data.message, - } + for _, authorityIDToElement := range vt.mapping { + for _, element := range authorityIDToElement { + message := element.Value.(networkVoteMessage) messages = append(messages, message) } } diff --git a/lib/grandpa/votes_tracker_test.go b/lib/grandpa/votes_tracker_test.go index eb1e2e4734..a7a8c2c066 100644 --- a/lib/grandpa/votes_tracker_test.go +++ b/lib/grandpa/votes_tracker_test.go @@ -27,9 +27,17 @@ func buildVoteMessage(blockHash common.Hash, } } +func wrapVoteMessageWithPeerID(voteMessage *VoteMessage, + peerID peer.ID) networkVoteMessage { + return networkVoteMessage{ + from: peerID, + msg: voteMessage, + } +} + func assertVotesMapping(t *testing.T, - mapping map[common.Hash]authorityIDToData, - expected map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage) { + mapping map[common.Hash]map[ed25519.PublicKeyBytes]*list.Element, + expected map[common.Hash]map[ed25519.PublicKeyBytes]networkVoteMessage) { t.Helper() require.Len(t, mapping, len(expected), "mapping does not have the expected length") @@ -38,13 +46,14 @@ func assertVotesMapping(t *testing.T, require.Truef(t, ok, "block hash %s not found in mapping", expectedBlockHash) require.Lenf(t, submap, len(expectedAuthIDToMessage), "submapping for block hash %s does not have the expected length", expectedBlockHash) - for expectedAuthorityID, expectedMessage := range expectedAuthIDToMessage { - data, ok := submap[expectedAuthorityID] + for expectedAuthorityID, expectedNetworkVoteMessage := range expectedAuthIDToMessage { + element, ok := submap[expectedAuthorityID] assert.Truef(t, ok, "submapping for block hash %s does not have expected authority id %s", expectedBlockHash, expectedAuthorityID) - assert.Equalf(t, expectedMessage, data.message, - "message for block hash %s and authority id %s is not as expected", + actualNetworkVoteMessage := element.Value.(networkVoteMessage) + assert.Equalf(t, expectedNetworkVoteMessage, actualNetworkVoteMessage, + "network vote message for block hash %s and authority id %s is not as expected", expectedBlockHash, expectedAuthorityID) } } @@ -55,7 +64,7 @@ func Test_newVotesTracker(t *testing.T) { const capacity = 1 expected := votesTracker{ - mapping: make(map[common.Hash]authorityIDToData, capacity), + mapping: make(map[common.Hash]map[ed25519.PublicKeyBytes]*list.Element, capacity), linkedList: list.New(), capacity: capacity, } @@ -98,10 +107,10 @@ func Test_votesTracker_cleanup(t *testing.T) { // This triggers a cleanup removing the oldest message // which is for block A and authority id A. tracker.add(somePeer, messageBlockAAuthC) - assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]networkVoteMessage{ blockHashA: { - authIDB: messageBlockAAuthB, - authIDC: messageBlockAAuthC, + authIDB: wrapVoteMessageWithPeerID(messageBlockAAuthB, somePeer), + authIDC: wrapVoteMessageWithPeerID(messageBlockAAuthC, somePeer), }, }) }) @@ -132,10 +141,10 @@ func Test_votesTracker_cleanup(t *testing.T) { // is also completely removed since it does not contain // any authority ID (vote message) anymore. tracker.add(somePeer, messageBlockBAuthB) - assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]networkVoteMessage{ blockHashB: { - authIDA: messageBlockBAuthA, - authIDB: messageBlockBAuthB, + authIDA: wrapVoteMessageWithPeerID(messageBlockBAuthA, somePeer), + authIDB: wrapVoteMessageWithPeerID(messageBlockBAuthB, somePeer), }, }) }) @@ -169,10 +178,10 @@ func Test_votesTracker_overriding(t *testing.T) { tracker.add(somePeer, messageBlockAAuthA) // override oldest tracker.add(somePeer, messageBlockBAuthB) - assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]networkVoteMessage{ blockHashB: { - authIDA: messageBlockBAuthA, - authIDB: messageBlockBAuthB, + authIDA: wrapVoteMessageWithPeerID(messageBlockBAuthA, somePeer), + authIDB: wrapVoteMessageWithPeerID(messageBlockBAuthB, somePeer), }, }) }) @@ -200,10 +209,10 @@ func Test_votesTracker_overriding(t *testing.T) { tracker.add(somePeer, messageBlockBAuthA) // override newest tracker.add(somePeer, messageBlockBAuthB) - assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]networkVoteMessage{ blockHashB: { - authIDA: messageBlockBAuthA, - authIDB: messageBlockBAuthB, + authIDA: wrapVoteMessageWithPeerID(messageBlockBAuthA, somePeer), + authIDB: wrapVoteMessageWithPeerID(messageBlockBAuthB, somePeer), }, }) }) @@ -230,9 +239,9 @@ func Test_votesTracker_delete(t *testing.T) { tracker.add(somePeer, messageBlockAAuthA) tracker.delete(blockHashB) - assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{ + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]networkVoteMessage{ blockHashA: { - authIDA: messageBlockAAuthA, + authIDA: wrapVoteMessageWithPeerID(messageBlockAAuthA, somePeer), }, }) }) @@ -255,7 +264,7 @@ func Test_votesTracker_delete(t *testing.T) { tracker.add(somePeer, messageBlockAAuthB) tracker.delete(blockHashA) - assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]*VoteMessage{}) + assertVotesMapping(t, tracker.mapping, map[common.Hash]map[ed25519.PublicKeyBytes]networkVoteMessage{}) }) } @@ -269,7 +278,7 @@ func Test_votesTracker_messages(t *testing.T) { }{ "non existing block hash": { votesTracker: &votesTracker{ - mapping: map[common.Hash]authorityIDToData{ + mapping: map[common.Hash]map[ed25519.PublicKeyBytes]*list.Element{ {1}: {}, }, linkedList: list.New(), @@ -278,15 +287,19 @@ func Test_votesTracker_messages(t *testing.T) { }, "existing block hash": { votesTracker: &votesTracker{ - mapping: map[common.Hash]authorityIDToData{ + mapping: map[common.Hash]map[ed25519.PublicKeyBytes]*list.Element{ {1}: { ed25519.PublicKeyBytes{1}: { - peerID: "a", - message: &VoteMessage{Round: 1}, + Value: networkVoteMessage{ + from: "a", + msg: &VoteMessage{Round: 1}, + }, }, ed25519.PublicKeyBytes{2}: { - peerID: "a", - message: &VoteMessage{Round: 2}, + Value: networkVoteMessage{ + from: "a", + msg: &VoteMessage{Round: 2}, + }, }, }, }, From e67e6c42f08b78dc70d7ee8b4183b2e94c360bec Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 13 May 2022 12:24:52 -0400 Subject: [PATCH 12/12] Update lib/grandpa/message_tracker.go Co-authored-by: Timothy Wu --- lib/grandpa/message_tracker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 26bb1b42c2..00e7ef801a 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -113,6 +113,7 @@ func (t *tracker) handleBlock(b *types.Block) { } } + // delete block hash that may or may not be in the tracker. t.votes.delete(h) if cm, has := t.commitMessages[h]; has {