diff --git a/consensus/spos/bls/proxy/subroundsHandler_test.go b/consensus/spos/bls/proxy/subroundsHandler_test.go index 97567771e00..8367968260e 100644 --- a/consensus/spos/bls/proxy/subroundsHandler_test.go +++ b/consensus/spos/bls/proxy/subroundsHandler_test.go @@ -81,6 +81,7 @@ func getDefaultArgumentsSubroundHandler() (*SubroundsHandlerArgs, *spos.Consensu consensusCore.SetEnableEpochsHandler(epochsEnable) consensusCore.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{}) consensusCore.SetEpochNotifier(epochNotifier) + consensusCore.SetInvalidSignersCache(&consensus.InvalidSignersCacheMock{}) handlerArgs.ConsensusCoreHandler = consensusCore return handlerArgs, consensusCore diff --git a/consensus/spos/bls/v2/export_test.go b/consensus/spos/bls/v2/export_test.go index d792eceb7bb..b5623388488 100644 --- a/consensus/spos/bls/v2/export_test.go +++ b/consensus/spos/bls/v2/export_test.go @@ -302,7 +302,7 @@ func (sr *subroundEndRound) ReceivedInvalidSignersInfo(cnsDta *consensus.Message } // VerifyInvalidSigners calls the unexported verifyInvalidSigners function -func (sr *subroundEndRound) VerifyInvalidSigners(invalidSigners []byte) error { +func (sr *subroundEndRound) VerifyInvalidSigners(invalidSigners []byte) ([]string, error) { return sr.verifyInvalidSigners(invalidSigners) } @@ -313,7 +313,7 @@ func (sr *subroundEndRound) GetMinConsensusGroupIndexOfManagedKeys() int { // CreateAndBroadcastInvalidSigners calls the unexported createAndBroadcastInvalidSigners function func (sr *subroundEndRound) CreateAndBroadcastInvalidSigners(invalidSigners []byte) { - sr.createAndBroadcastInvalidSigners(invalidSigners) + sr.createAndBroadcastInvalidSigners(invalidSigners, nil) } // GetFullMessagesForInvalidSigners calls the unexported getFullMessagesForInvalidSigners function diff --git a/consensus/spos/bls/v2/subroundEndRound.go b/consensus/spos/bls/v2/subroundEndRound.go index 2e9d6e5f590..7c05335b505 100644 --- a/consensus/spos/bls/v2/subroundEndRound.go +++ b/consensus/spos/bls/v2/subroundEndRound.go @@ -150,7 +150,12 @@ func (sr *subroundEndRound) receivedInvalidSignersInfo(_ context.Context, cnsDta return false } - err := sr.verifyInvalidSigners(cnsDta.InvalidSigners) + invalidSignersCache := sr.InvalidSignersCache() + if invalidSignersCache.CheckKnownInvalidSigners(cnsDta.BlockHeaderHash, cnsDta.InvalidSigners) { + return false + } + + invalidSignersPubKeys, err := sr.verifyInvalidSigners(cnsDta.InvalidSigners) if err != nil { log.Trace("receivedInvalidSignersInfo.verifyInvalidSigners", "error", err.Error()) return false @@ -158,6 +163,8 @@ func (sr *subroundEndRound) receivedInvalidSignersInfo(_ context.Context, cnsDta log.Debug("step 3: invalid signers info has been evaluated") + invalidSignersCache.AddInvalidSigners(cnsDta.BlockHeaderHash, cnsDta.InvalidSigners, invalidSignersPubKeys) + sr.PeerHonestyHandler().ChangeScore( messageSender, spos.GetConsensusTopicID(sr.ShardCoordinator()), @@ -167,32 +174,37 @@ func (sr *subroundEndRound) receivedInvalidSignersInfo(_ context.Context, cnsDta return true } -func (sr *subroundEndRound) verifyInvalidSigners(invalidSigners []byte) error { +func (sr *subroundEndRound) verifyInvalidSigners(invalidSigners []byte) ([]string, error) { messages, err := sr.MessageSigningHandler().Deserialize(invalidSigners) if err != nil { - return err + return nil, err } + pubKeys := make([]string, 0, len(messages)) for _, msg := range messages { - err = sr.verifyInvalidSigner(msg) - if err != nil { - return err + pubKey, errVerify := sr.verifyInvalidSigner(msg) + if errVerify != nil { + return nil, errVerify + } + + if len(pubKey) > 0 { + pubKeys = append(pubKeys, pubKey) } } - return nil + return pubKeys, nil } -func (sr *subroundEndRound) verifyInvalidSigner(msg p2p.MessageP2P) error { +func (sr *subroundEndRound) verifyInvalidSigner(msg p2p.MessageP2P) (string, error) { err := sr.MessageSigningHandler().Verify(msg) if err != nil { - return err + return "", err } cnsMsg := &consensus.Message{} err = sr.Marshalizer().Unmarshal(cnsMsg, msg.Data()) if err != nil { - return err + return "", err } err = sr.SigningHandler().VerifySingleSignature(cnsMsg.PubKey, cnsMsg.BlockHeaderHash, cnsMsg.SignatureShare) @@ -203,9 +215,11 @@ func (sr *subroundEndRound) verifyInvalidSigner(msg p2p.MessageP2P) error { "error", err.Error(), ) sr.applyBlacklistOnNode(msg.Peer()) + + return string(cnsMsg.PubKey), nil } - return nil + return "", nil } func (sr *subroundEndRound) applyBlacklistOnNode(peer core.PeerID) { @@ -501,24 +515,23 @@ func (sr *subroundEndRound) handleInvalidSignersOnAggSigFail() ([]byte, []byte, invalidPubKeys, err := sr.verifyNodesOnAggSigFail(ctx) cancel() if err != nil { - log.Debug("doEndRoundJobByNode.verifyNodesOnAggSigFail", "error", err.Error()) + log.Debug("handleInvalidSignersOnAggSigFail.verifyNodesOnAggSigFail", "error", err.Error()) return nil, nil, err } - _, err = sr.getFullMessagesForInvalidSigners(invalidPubKeys) + invalidSigners, err := sr.getFullMessagesForInvalidSigners(invalidPubKeys) if err != nil { - log.Debug("doEndRoundJobByNode.getFullMessagesForInvalidSigners", "error", err.Error()) + log.Debug("handleInvalidSignersOnAggSigFail.getFullMessagesForInvalidSigners", "error", err.Error()) return nil, nil, err } - // TODO: handle invalid signers broadcast without flooding the network - // if len(invalidSigners) > 0 { - // sr.createAndBroadcastInvalidSigners(invalidSigners) - // } + if len(invalidSigners) > 0 { + sr.createAndBroadcastInvalidSigners(invalidSigners, invalidPubKeys) + } bitmap, sig, err := sr.computeAggSigOnValidNodes() if err != nil { - log.Debug("doEndRoundJobByNode.computeAggSigOnValidNodes", "error", err.Error()) + log.Debug("handleInvalidSignersOnAggSigFail.computeAggSigOnValidNodes", "error", err.Error()) return nil, nil, err } @@ -618,7 +631,7 @@ func (sr *subroundEndRound) getRandomManagedKeyProofSender() string { return randManagedKey } -func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []byte) { +func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []byte, invalidSignersPubKeys []string) { if !sr.ShouldConsiderSelfKeyInConsensus() { return } @@ -646,6 +659,8 @@ func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []by invalidSigners, ) + sr.InvalidSignersCache().AddInvalidSigners(sr.GetData(), invalidSigners, invalidSignersPubKeys) + err = sr.BroadcastMessenger().BroadcastConsensusMessage(cnsMsg) if err != nil { log.Debug("doEndRoundJob.BroadcastConsensusMessage", "error", err.Error()) diff --git a/consensus/spos/bls/v2/subroundEndRound_test.go b/consensus/spos/bls/v2/subroundEndRound_test.go index 76474809273..d11f9500154 100644 --- a/consensus/spos/bls/v2/subroundEndRound_test.go +++ b/consensus/spos/bls/v2/subroundEndRound_test.go @@ -1502,6 +1502,27 @@ func TestSubroundEndRound_ReceivedInvalidSignersInfo(t *testing.T) { res := sr.ReceivedInvalidSignersInfo(&cnsData) assert.False(t, res) }) + t.Run("invalid signers cache already has this message", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{ + CheckKnownInvalidSignersCalled: func(headerHash []byte, invalidSigners []byte) bool { + return true + }, + } + container.SetInvalidSignersCache(invalidSignersCache) + + sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) + cnsData := consensus.Message{ + BlockHeaderHash: []byte("X"), + PubKey: []byte("A"), + InvalidSigners: []byte("invalidSignersData"), + } + + res := sr.ReceivedInvalidSignersInfo(&cnsData) + assert.False(t, res) + }) t.Run("invalid signers data", func(t *testing.T) { t.Parallel() @@ -1528,6 +1549,13 @@ func TestSubroundEndRound_ReceivedInvalidSignersInfo(t *testing.T) { t.Parallel() container := consensusMocks.InitConsensusCore() + wasAddInvalidSignersCalled := false + invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{ + AddInvalidSignersCalled: func(headerHash []byte, invalidSigners []byte, invalidPublicKeys []string) { + wasAddInvalidSignersCalled = true + }, + } + container.SetInvalidSignersCache(invalidSignersCache) sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) sr.SetHeader(&block.HeaderV2{ @@ -1541,6 +1569,7 @@ func TestSubroundEndRound_ReceivedInvalidSignersInfo(t *testing.T) { res := sr.ReceivedInvalidSignersInfo(&cnsData) assert.True(t, res) + require.True(t, wasAddInvalidSignersCalled) }) } @@ -1552,7 +1581,6 @@ func TestVerifyInvalidSigners(t *testing.T) { container := consensusMocks.InitConsensusCore() - expectedErr := errors.New("expected err") messageSigningHandler := &mock.MessageSigningHandlerStub{ DeserializeCalled: func(messagesBytes []byte) ([]p2p.MessageP2P, error) { return nil, expectedErr @@ -1563,7 +1591,7 @@ func TestVerifyInvalidSigners(t *testing.T) { sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) - err := sr.VerifyInvalidSigners([]byte{}) + _, err := sr.VerifyInvalidSigners([]byte{}) require.Equal(t, expectedErr, err) }) @@ -1577,7 +1605,6 @@ func TestVerifyInvalidSigners(t *testing.T) { }} invalidSignersBytes, _ := container.Marshalizer().Marshal(invalidSigners) - expectedErr := errors.New("expected err") messageSigningHandler := &mock.MessageSigningHandlerStub{ DeserializeCalled: func(messagesBytes []byte) ([]p2p.MessageP2P, error) { require.Equal(t, invalidSignersBytes, messagesBytes) @@ -1592,7 +1619,7 @@ func TestVerifyInvalidSigners(t *testing.T) { sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) - err := sr.VerifyInvalidSigners(invalidSignersBytes) + _, err := sr.VerifyInvalidSigners(invalidSignersBytes) require.Equal(t, expectedErr, err) }) @@ -1634,7 +1661,7 @@ func TestVerifyInvalidSigners(t *testing.T) { sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) - err := sr.VerifyInvalidSigners(invalidSignersBytes) + _, err := sr.VerifyInvalidSigners(invalidSignersBytes) require.Nil(t, err) require.True(t, wasCalled) }) @@ -1662,7 +1689,7 @@ func TestVerifyInvalidSigners(t *testing.T) { sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) - err := sr.VerifyInvalidSigners(invalidSignersBytes) + _, err := sr.VerifyInvalidSigners(invalidSignersBytes) require.Nil(t, err) }) } @@ -1704,17 +1731,25 @@ func TestSubroundEndRound_CreateAndBroadcastInvalidSigners(t *testing.T) { expectedInvalidSigners := []byte("invalid signers") - wasCalled := false + wasBroadcastConsensusMessageCalled := false container := consensusMocks.InitConsensusCore() messenger := &consensusMocks.BroadcastMessengerMock{ BroadcastConsensusMessageCalled: func(message *consensus.Message) error { assert.Equal(t, expectedInvalidSigners, message.InvalidSigners) - wasCalled = true + wasBroadcastConsensusMessageCalled = true wg.Done() return nil }, } container.SetBroadcastMessenger(messenger) + + wasAddInvalidSignersCalled := false + invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{ + AddInvalidSignersCalled: func(headerHash []byte, invalidSigners []byte, invalidPublicKeys []string) { + wasAddInvalidSignersCalled = true + }, + } + container.SetInvalidSignersCache(invalidSignersCache) sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) sr.SetSelfPubKey("A") @@ -1722,7 +1757,8 @@ func TestSubroundEndRound_CreateAndBroadcastInvalidSigners(t *testing.T) { wg.Wait() - require.True(t, wasCalled) + require.True(t, wasBroadcastConsensusMessageCalled) + require.True(t, wasAddInvalidSignersCalled) }) } diff --git a/consensus/spos/bls/v2/subroundStartRound.go b/consensus/spos/bls/v2/subroundStartRound.go index e8e20319a0a..4e3be13f5cd 100644 --- a/consensus/spos/bls/v2/subroundStartRound.go +++ b/consensus/spos/bls/v2/subroundStartRound.go @@ -101,6 +101,7 @@ func (sr *subroundStartRound) doStartRoundJob(_ context.Context) bool { topic := spos.GetConsensusTopicID(sr.ShardCoordinator()) sr.GetAntiFloodHandler().ResetForTopic(topic) sr.worker.ResetConsensusMessages() + sr.worker.ResetInvalidSignersCache() return true } diff --git a/consensus/spos/consensusCore.go b/consensus/spos/consensusCore.go index a25a071eb64..c255d704822 100644 --- a/consensus/spos/consensusCore.go +++ b/consensus/spos/consensusCore.go @@ -43,6 +43,7 @@ type ConsensusCore struct { enableEpochsHandler common.EnableEpochsHandler equivalentProofsPool consensus.EquivalentProofsPool epochNotifier process.EpochNotifier + invalidSignersCache InvalidSignersCache } // ConsensusCoreArgs store all arguments that are needed to create a ConsensusCore object @@ -72,6 +73,7 @@ type ConsensusCoreArgs struct { EnableEpochsHandler common.EnableEpochsHandler EquivalentProofsPool consensus.EquivalentProofsPool EpochNotifier process.EpochNotifier + InvalidSignersCache InvalidSignersCache } // NewConsensusCore creates a new ConsensusCore instance @@ -104,6 +106,7 @@ func NewConsensusCore( enableEpochsHandler: args.EnableEpochsHandler, equivalentProofsPool: args.EquivalentProofsPool, epochNotifier: args.EpochNotifier, + invalidSignersCache: args.InvalidSignersCache, } err := ValidateConsensusCore(consensusCore) @@ -239,6 +242,11 @@ func (cc *ConsensusCore) EquivalentProofsPool() consensus.EquivalentProofsPool { return cc.equivalentProofsPool } +// InvalidSignersCache returns the invalid signers cache component +func (cc *ConsensusCore) InvalidSignersCache() InvalidSignersCache { + return cc.invalidSignersCache +} + // SetBlockchain sets blockchain handler func (cc *ConsensusCore) SetBlockchain(blockChain data.ChainHandler) { cc.blockChain = blockChain @@ -364,6 +372,11 @@ func (cc *ConsensusCore) SetEpochNotifier(epochNotifier process.EpochNotifier) { cc.epochNotifier = epochNotifier } +// SetInvalidSignersCache sets the invalid signers cache +func (cc *ConsensusCore) SetInvalidSignersCache(cache InvalidSignersCache) { + cc.invalidSignersCache = cache +} + // IsInterfaceNil returns true if there is no value under the interface func (cc *ConsensusCore) IsInterfaceNil() bool { return cc == nil diff --git a/consensus/spos/consensusCoreValidator.go b/consensus/spos/consensusCoreValidator.go index cfd6b5e63ba..e3033fa24a9 100644 --- a/consensus/spos/consensusCoreValidator.go +++ b/consensus/spos/consensusCoreValidator.go @@ -88,6 +88,9 @@ func ValidateConsensusCore(container ConsensusCoreHandler) error { if check.IfNil(container.EpochStartRegistrationHandler()) { return ErrNilEpochStartNotifier } + if check.IfNil(container.InvalidSignersCache()) { + return ErrNilInvalidSignersCache + } return nil } diff --git a/consensus/spos/consensusCoreValidator_test.go b/consensus/spos/consensusCoreValidator_test.go index 128d651fc45..f199cd0b7e5 100644 --- a/consensus/spos/consensusCoreValidator_test.go +++ b/consensus/spos/consensusCoreValidator_test.go @@ -46,6 +46,7 @@ func initConsensusDataContainer() *spos.ConsensusCore { enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{} proofsPool := &dataRetriever.ProofsPoolMock{} epochNotifier := &epochNotifierMock.EpochNotifierStub{} + invalidSignersCache := &consensusMocks.InvalidSignersCacheMock{} consensusCore, _ := spos.NewConsensusCore(&spos.ConsensusCoreArgs{ BlockChain: blockChain, @@ -73,6 +74,7 @@ func initConsensusDataContainer() *spos.ConsensusCore { EnableEpochsHandler: enableEpochsHandler, EquivalentProofsPool: proofsPool, EpochNotifier: epochNotifier, + InvalidSignersCache: invalidSignersCache, }) return consensusCore @@ -372,6 +374,17 @@ func TestConsensusContainerValidator_ValidateNilEpochStartRegistrationHandlerSho assert.Equal(t, spos.ErrNilEpochStartNotifier, err) } +func TestConsensusContainerValidator_ValidateNilInvalidSignersCacheShouldFail(t *testing.T) { + t.Parallel() + + container := initConsensusDataContainer() + container.SetInvalidSignersCache(nil) + + err := spos.ValidateConsensusCore(container) + + assert.Equal(t, spos.ErrNilInvalidSignersCache, err) +} + func TestConsensusContainerValidator_ShouldWork(t *testing.T) { t.Parallel() diff --git a/consensus/spos/consensusCore_test.go b/consensus/spos/consensusCore_test.go index bff893b6e00..1f44827f857 100644 --- a/consensus/spos/consensusCore_test.go +++ b/consensus/spos/consensusCore_test.go @@ -41,6 +41,7 @@ func createDefaultConsensusCoreArgs() *spos.ConsensusCoreArgs { EnableEpochsHandler: consensusCoreMock.EnableEpochsHandler(), EquivalentProofsPool: consensusCoreMock.EquivalentProofsPool(), EpochNotifier: consensusCoreMock.EpochNotifier(), + InvalidSignersCache: &consensus.InvalidSignersCacheMock{}, } return args } diff --git a/consensus/spos/errors.go b/consensus/spos/errors.go index 279b2ffbd8b..d89a58865f3 100644 --- a/consensus/spos/errors.go +++ b/consensus/spos/errors.go @@ -276,3 +276,9 @@ var ErrNilEpochNotifier = errors.New("nil epoch notifier") // ErrNilEpochStartNotifier signals that nil epoch start notifier has been provided var ErrNilEpochStartNotifier = errors.New("nil epoch start notifier") + +// ErrInvalidSignersAlreadyReceived signals that an invalid signers message has been already received +var ErrInvalidSignersAlreadyReceived = errors.New("invalid signers already received") + +// ErrNilInvalidSignersCache signals that nil invalid signers has been provided +var ErrNilInvalidSignersCache = errors.New("nil invalid signers cache") diff --git a/consensus/spos/interface.go b/consensus/spos/interface.go index cbbe4c3236e..f32db829b29 100644 --- a/consensus/spos/interface.go +++ b/consensus/spos/interface.go @@ -48,6 +48,7 @@ type ConsensusCoreHandler interface { EnableEpochsHandler() common.EnableEpochsHandler EquivalentProofsPool() consensus.EquivalentProofsPool EpochNotifier() process.EpochNotifier + InvalidSignersCache() InvalidSignersCache IsInterfaceNil() bool } @@ -127,6 +128,8 @@ type WorkerHandler interface { ResetConsensusMessages() // ResetConsensusRoundState resets the consensus round state when transitioning to a different consensus version ResetConsensusRoundState() + // ResetInvalidSignersCache resets the invalid signers cache + ResetInvalidSignersCache() // IsInterfaceNil returns true if there is no value under the interface IsInterfaceNil() bool } @@ -265,3 +268,11 @@ type RoundThresholdHandler interface { FallbackThreshold(subroundId int) int SetFallbackThreshold(subroundId int, threshold int) } + +// InvalidSignersCache encapsulates the methods needed for a invalid signers cache +type InvalidSignersCache interface { + AddInvalidSigners(headerHash []byte, invalidSigners []byte, invalidPublicKeys []string) + CheckKnownInvalidSigners(headerHash []byte, invalidSigners []byte) bool + Reset() + IsInterfaceNil() bool +} diff --git a/consensus/spos/invalidSignersCache.go b/consensus/spos/invalidSignersCache.go new file mode 100644 index 00000000000..ca26867f49d --- /dev/null +++ b/consensus/spos/invalidSignersCache.go @@ -0,0 +1,130 @@ +package spos + +import ( + "sync" + + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/hashing" + "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/consensus" + "github.com/multiversx/mx-chain-go/p2p" +) + +// ArgInvalidSignersCache defines the DTO used to create a new instance of invalidSignersCache +type ArgInvalidSignersCache struct { + Hasher hashing.Hasher + SigningHandler p2p.P2PSigningHandler + Marshaller marshal.Marshalizer +} + +type invalidSignersCache struct { + sync.RWMutex + invalidSignersHashesMap map[string]struct{} + invalidSignersForHeaderMap map[string]map[string]struct{} + hasher hashing.Hasher + signingHandler p2p.P2PSigningHandler + marshaller marshal.Marshalizer +} + +// NewInvalidSignersCache returns a new instance of invalidSignersCache +func NewInvalidSignersCache(args ArgInvalidSignersCache) (*invalidSignersCache, error) { + err := checkArgs(args) + if err != nil { + return nil, err + } + + return &invalidSignersCache{ + invalidSignersHashesMap: make(map[string]struct{}), + invalidSignersForHeaderMap: make(map[string]map[string]struct{}), + hasher: args.Hasher, + signingHandler: args.SigningHandler, + marshaller: args.Marshaller, + }, nil +} + +func checkArgs(args ArgInvalidSignersCache) error { + if check.IfNil(args.Hasher) { + return ErrNilHasher + } + if check.IfNil(args.SigningHandler) { + return ErrNilSigningHandler + } + if check.IfNil(args.Marshaller) { + return ErrNilMarshalizer + } + + return nil +} + +// AddInvalidSigners adds the provided hash into the internal map if it does not exist +func (cache *invalidSignersCache) AddInvalidSigners(headerHash []byte, invalidSigners []byte, invalidPublicKeys []string) { + if len(invalidPublicKeys) == 0 || len(invalidSigners) == 0 { + return + } + + cache.Lock() + defer cache.Unlock() + + invalidSignersHash := cache.hasher.Compute(string(invalidSigners)) + cache.invalidSignersHashesMap[string(invalidSignersHash)] = struct{}{} + + _, ok := cache.invalidSignersForHeaderMap[string(headerHash)] + if !ok { + cache.invalidSignersForHeaderMap[string(headerHash)] = make(map[string]struct{}) + } + + for _, pk := range invalidPublicKeys { + cache.invalidSignersForHeaderMap[string(headerHash)][pk] = struct{}{} + } +} + +// CheckKnownInvalidSigners checks whether all the provided invalid signers are known for the header hash +func (cache *invalidSignersCache) CheckKnownInvalidSigners(headerHash []byte, serializedInvalidSigners []byte) bool { + cache.RLock() + defer cache.RUnlock() + + invalidSignersHash := cache.hasher.Compute(string(serializedInvalidSigners)) + _, hasSameInvalidSigners := cache.invalidSignersHashesMap[string(invalidSignersHash)] + if hasSameInvalidSigners { + return true + } + + _, isHeaderKnown := cache.invalidSignersForHeaderMap[string(headerHash)] + if !isHeaderKnown { + return false + } + + invalidSignersP2PMessages, err := cache.signingHandler.Deserialize(serializedInvalidSigners) + if err != nil { + return false + } + + for _, msg := range invalidSignersP2PMessages { + cnsMsg := &consensus.Message{} + err = cache.marshaller.Unmarshal(cnsMsg, msg.Data()) + if err != nil { + return false + } + + _, isKnownInvalidSigner := cache.invalidSignersForHeaderMap[string(headerHash)][string(cnsMsg.PubKey)] + if !isKnownInvalidSigner { + return false + } + } + + return true +} + +// Reset clears the internal maps +func (cache *invalidSignersCache) Reset() { + cache.Lock() + defer cache.Unlock() + + cache.invalidSignersHashesMap = make(map[string]struct{}) + cache.invalidSignersForHeaderMap = make(map[string]map[string]struct{}) +} + +// IsInterfaceNil returns true if there is no value under the interface +func (cache *invalidSignersCache) IsInterfaceNil() bool { + return cache == nil +} diff --git a/consensus/spos/invalidSignersCache_test.go b/consensus/spos/invalidSignersCache_test.go new file mode 100644 index 00000000000..354a5b9306c --- /dev/null +++ b/consensus/spos/invalidSignersCache_test.go @@ -0,0 +1,201 @@ +package spos + +import ( + "crypto/rand" + "fmt" + "sync" + "testing" + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiversx/mx-chain-communication-go/p2p/data" + "github.com/multiversx/mx-chain-communication-go/p2p/libp2p" + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-go/consensus" + consensusMock "github.com/multiversx/mx-chain-go/consensus/mock" + "github.com/multiversx/mx-chain-go/p2p" + "github.com/multiversx/mx-chain-go/testscommon" + "github.com/multiversx/mx-chain-go/testscommon/marshallerMock" + "github.com/stretchr/testify/require" +) + +func createMockArgs() ArgInvalidSignersCache { + return ArgInvalidSignersCache{ + Hasher: &testscommon.HasherStub{}, + SigningHandler: &consensusMock.MessageSigningHandlerStub{}, + Marshaller: &marshallerMock.MarshalizerStub{}, + } +} + +func TestInvalidSignersCache_IsInterfaceNil(t *testing.T) { + t.Parallel() + + var cache *invalidSignersCache + require.True(t, cache.IsInterfaceNil()) + + cache, _ = NewInvalidSignersCache(createMockArgs()) + require.False(t, cache.IsInterfaceNil()) +} + +func TestNewInvalidSignersCache(t *testing.T) { + t.Parallel() + + t.Run("nil Hasher should error", func(t *testing.T) { + t.Parallel() + + args := createMockArgs() + args.Hasher = nil + + cache, err := NewInvalidSignersCache(args) + require.Equal(t, ErrNilHasher, err) + require.Nil(t, cache) + }) + t.Run("nil SigningHandler should error", func(t *testing.T) { + t.Parallel() + + args := createMockArgs() + args.SigningHandler = nil + + cache, err := NewInvalidSignersCache(args) + require.Equal(t, ErrNilSigningHandler, err) + require.Nil(t, cache) + }) + t.Run("nil Marshaller should error", func(t *testing.T) { + t.Parallel() + + args := createMockArgs() + args.Marshaller = nil + + cache, err := NewInvalidSignersCache(args) + require.Equal(t, ErrNilMarshalizer, err) + require.Nil(t, cache) + }) + t.Run("should work", func(t *testing.T) { + t.Parallel() + + cache, err := NewInvalidSignersCache(createMockArgs()) + require.NoError(t, err) + require.NotNil(t, cache) + }) +} + +func TestInvalidSignersCache(t *testing.T) { + t.Parallel() + + t.Run("all ops should work", func(t *testing.T) { + t.Parallel() + + headerHash1 := []byte("headerHash11") + invalidSigners1 := []byte("invalidSigners1") + pubKeys1 := []string{"pk0", "pk1"} + invalidSigners2 := []byte("invalidSigners2") + + args := createMockArgs() + args.Hasher = &testscommon.HasherStub{ + ComputeCalled: func(s string) []byte { + return []byte(s) + }, + } + args.SigningHandler = &consensusMock.MessageSigningHandlerStub{ + DeserializeCalled: func(messagesBytes []byte) ([]p2p.MessageP2P, error) { + if string(messagesBytes) == string(invalidSigners1) { + m1, _ := libp2p.NewMessage(createDummyP2PMessage(), &testscommon.ProtoMarshalizerMock{}, "") + m2, _ := libp2p.NewMessage(createDummyP2PMessage(), &testscommon.ProtoMarshalizerMock{}, "") + return []p2p.MessageP2P{m1, m2}, nil + } + + m1, _ := libp2p.NewMessage(createDummyP2PMessage(), &testscommon.ProtoMarshalizerMock{}, "") + return []p2p.MessageP2P{m1}, nil + }, + } + cnt := 0 + args.Marshaller = &marshallerMock.MarshalizerStub{ + UnmarshalCalled: func(obj interface{}, buff []byte) error { + message := obj.(*consensus.Message) + message.PubKey = []byte(fmt.Sprintf("pk%d", cnt)) + cnt++ + + return nil + }, + } + cache, _ := NewInvalidSignersCache(args) + require.NotNil(t, cache) + + cache.AddInvalidSigners(nil, nil, nil) // early return, for coverage only + + require.False(t, cache.CheckKnownInvalidSigners(headerHash1, invalidSigners1)) + + cache.AddInvalidSigners(headerHash1, invalidSigners1, pubKeys1) + require.True(t, cache.CheckKnownInvalidSigners(headerHash1, invalidSigners1)) // should find in signers by hashes map + + require.True(t, cache.CheckKnownInvalidSigners(headerHash1, invalidSigners2)) // should have different hash but the known signers + + cache.Reset() + require.False(t, cache.CheckKnownInvalidSigners(headerHash1, invalidSigners1)) + }) + t.Run("concurrent ops should work", func(t *testing.T) { + t.Parallel() + + defer func() { + r := recover() + if r != nil { + require.Fail(t, "should have not panicked") + } + }() + + args := createMockArgs() + cache, _ := NewInvalidSignersCache(args) + require.NotNil(t, cache) + + numCalls := 1000 + wg := sync.WaitGroup{} + wg.Add(numCalls) + + for i := 0; i < numCalls; i++ { + go func(idx int) { + switch idx % 3 { + case 0: + cache.AddInvalidSigners([]byte("hash"), []byte("invalidSigners"), []string{"pk0", "pk1"}) + case 1: + cache.CheckKnownInvalidSigners([]byte("hash"), []byte("invalidSigners")) + case 2: + cache.Reset() + default: + require.Fail(t, "should not happen") + } + + wg.Done() + }(i) + } + + wg.Wait() + }) +} + +func createDummyP2PMessage() *pubsub.Message { + marshaller := &testscommon.ProtoMarshalizerMock{} + topicMessage := &data.TopicMessage{ + Timestamp: time.Now().Unix(), + Payload: []byte("data"), + Version: 1, + } + buff, _ := marshaller.Marshal(topicMessage) + topic := "topic" + mes := &pb.Message{ + From: getRandomID().Bytes(), + Data: buff, + Topic: &topic, + } + + return &pubsub.Message{Message: mes} +} + +func getRandomID() core.PeerID { + prvKey, _, _ := crypto.GenerateSecp256k1Key(rand.Reader) + id, _ := peer.IDFromPublicKey(prvKey.GetPublic()) + + return core.PeerID(id) +} diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index 825e46eb231..3f1d6d9ef69 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -18,7 +18,6 @@ import ( "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" crypto "github.com/multiversx/mx-chain-crypto-go" - "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/consensus" errorsErd "github.com/multiversx/mx-chain-go/errors" @@ -84,6 +83,8 @@ type Worker struct { nodeRedundancyHandler consensus.NodeRedundancyHandler peerBlacklistHandler consensus.PeerBlacklistHandler closer core.SafeCloser + + invalidSignersCache InvalidSignersCache } // WorkerArgs holds the consensus worker arguments @@ -114,6 +115,7 @@ type WorkerArgs struct { NodeRedundancyHandler consensus.NodeRedundancyHandler PeerBlacklistHandler consensus.PeerBlacklistHandler EnableEpochsHandler common.EnableEpochsHandler + InvalidSignersCache InvalidSignersCache } // NewWorker creates a new Worker object @@ -166,6 +168,7 @@ func NewWorker(args *WorkerArgs) (*Worker, error) { peerBlacklistHandler: args.PeerBlacklistHandler, closer: closing.NewSafeChanCloser(), enableEpochsHandler: args.EnableEpochsHandler, + invalidSignersCache: args.InvalidSignersCache, } wrk.consensusMessageValidator = consensusMessageValidatorObj @@ -269,6 +272,9 @@ func checkNewWorkerParams(args *WorkerArgs) error { if check.IfNil(args.EnableEpochsHandler) { return ErrNilEnableEpochsHandler } + if check.IfNil(args.InvalidSignersCache) { + return ErrNilInvalidSignersCache + } return nil } @@ -447,6 +453,7 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP isMessageWithBlockBody := wrk.consensusService.IsMessageWithBlockBody(msgType) isMessageWithBlockHeader := wrk.consensusService.IsMessageWithBlockHeader(msgType) isMessageWithBlockBodyAndHeader := wrk.consensusService.IsMessageWithBlockBodyAndHeader(msgType) + isMessageWithInvalidSigners := wrk.consensusService.IsMessageWithInvalidSigners(msgType) if isMessageWithBlockBody || isMessageWithBlockBodyAndHeader { wrk.doJobOnMessageWithBlockBody(cnsMsg) @@ -463,6 +470,13 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP wrk.doJobOnMessageWithSignature(cnsMsg, message) } + if isMessageWithInvalidSigners { + err = wrk.verifyMessageWithInvalidSigners(cnsMsg) + if err != nil { + return nil, err + } + } + errNotCritical := wrk.checkSelfState(cnsMsg) if errNotCritical != nil { log.Trace("checkSelfState", "error", errNotCritical.Error()) @@ -485,7 +499,8 @@ func (wrk *Worker) shouldBlacklistPeer(err error) bool { errors.Is(err, errorsErd.ErrSignatureMismatch) || errors.Is(err, nodesCoordinator.ErrEpochNodesConfigDoesNotExist) || errors.Is(err, ErrMessageTypeLimitReached) || - errors.Is(err, ErrEquivalentMessageAlreadyReceived) { + errors.Is(err, ErrEquivalentMessageAlreadyReceived) || + errors.Is(err, ErrInvalidSignersAlreadyReceived) { return false } @@ -560,6 +575,16 @@ func (wrk *Worker) doJobOnMessageWithHeader(cnsMsg *consensus.Message) error { return nil } +func (wrk *Worker) verifyMessageWithInvalidSigners(cnsMsg *consensus.Message) error { + // No need to guard this method by verification of common.EquivalentMessagesFlag as invalidSignersCache will have entries only for consensus v2 + if wrk.invalidSignersCache.CheckKnownInvalidSigners(cnsMsg.BlockHeaderHash, cnsMsg.InvalidSigners) { + // return error here to avoid further broadcast of this message + return ErrInvalidSignersAlreadyReceived + } + + return nil +} + func (wrk *Worker) checkHeaderPreviousProof(header data.HeaderHandler) error { if wrk.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.GetEpoch()) { return fmt.Errorf("%w : received header on consensus topic after equivalent messages activation", ErrConsensusMessageNotExpected) @@ -805,6 +830,11 @@ func (wrk *Worker) ResetConsensusRoundState() { wrk.consensusState.ResetConsensusRoundState() } +// ResetInvalidSignersCache resets the invalid signers cache +func (wrk *Worker) ResetInvalidSignersCache() { + wrk.invalidSignersCache.Reset() +} + func (wrk *Worker) checkValidityAndProcessFinalInfo(cnsMsg *consensus.Message, p2pMessage p2p.MessageP2P) error { msgType := consensus.MessageType(cnsMsg.MsgType) diff --git a/consensus/spos/worker_test.go b/consensus/spos/worker_test.go index 5860fc020c8..a617899a66d 100644 --- a/consensus/spos/worker_test.go +++ b/consensus/spos/worker_test.go @@ -123,6 +123,7 @@ func createDefaultWorkerArgs(appStatusHandler core.AppStatusHandler) *spos.Worke NodeRedundancyHandler: &mock.NodeRedundancyHandlerStub{}, PeerBlacklistHandler: &mock.PeerBlacklistHandlerStub{}, EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, + InvalidSignersCache: &consensusMocks.InvalidSignersCacheMock{}, } return workerArgs @@ -390,6 +391,17 @@ func TestWorker_NewWorkerPoolEnableEpochsHandlerNilShouldFail(t *testing.T) { assert.Equal(t, spos.ErrNilEnableEpochsHandler, err) } +func TestWorker_NewWorkerPoolInvalidSignersCacheNilShouldFail(t *testing.T) { + t.Parallel() + + workerArgs := createDefaultWorkerArgs(&statusHandlerMock.AppStatusHandlerStub{}) + workerArgs.InvalidSignersCache = nil + wrk, err := spos.NewWorker(workerArgs) + + assert.Nil(t, wrk) + assert.Equal(t, spos.ErrNilInvalidSignersCache, err) +} + func TestWorker_NewWorkerShouldWork(t *testing.T) { t.Parallel() @@ -1999,6 +2011,83 @@ func TestWorker_ProcessReceivedMessageWithSignature(t *testing.T) { }) } +func TestWorker_ProcessReceivedMessageWithInvalidSigners(t *testing.T) { + t.Parallel() + + workerArgs := createDefaultWorkerArgs(&statusHandlerMock.AppStatusHandlerStub{}) + cntCheckKnownInvalidSignersCalled := 0 + workerArgs.InvalidSignersCache = &consensusMocks.InvalidSignersCacheMock{ + CheckKnownInvalidSignersCalled: func(headerHash []byte, invalidSigners []byte) bool { + cntCheckKnownInvalidSignersCalled++ + return cntCheckKnownInvalidSignersCalled > 1 + }, + } + workerArgs.AntifloodHandler = &mock.P2PAntifloodHandlerStub{ + CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { + return nil + }, + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { + return nil + }, + BlacklistPeerCalled: func(peer core.PeerID, reason string, duration time.Duration) { + require.Fail(t, "should have not been called") + }, + } + workerArgs.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledCalled: func(flag core.EnableEpochFlag) bool { + return true + }, + } + wrk, _ := spos.NewWorker(workerArgs) + wrk.ConsensusState().SetHeader(&block.HeaderV2{}) + + hdr := &block.Header{} + hdr.Nonce = 1 + hdr.TimeStamp = uint64(wrk.RoundHandler().TimeStamp().Unix()) + hdrStr, _ := mock.MarshalizerMock{}.Marshal(hdr) + hdrHash := (&hashingMocks.HasherMock{}).Compute(string(hdrStr)) + pubKey := []byte(wrk.ConsensusState().ConsensusGroup()[0]) + + invalidSigners := []byte("invalid signers") + cnsMsg := consensus.NewConsensusMessage( + hdrHash, + nil, + nil, + nil, + pubKey, + bytes.Repeat([]byte("a"), SignatureSize), + int(bls.MtInvalidSigners), + 0, + chainID, + nil, + nil, + nil, + currentPid, + invalidSigners, + ) + buff, err := wrk.Marshalizer().Marshal(cnsMsg) + require.Nil(t, err) + + msg := &p2pmocks.P2PMessageMock{ + DataField: buff, + PeerField: currentPid, + SignatureField: []byte("signature"), + } + + // first call should be ok + msgID, err := wrk.ProcessReceivedMessage(msg, "", &p2pmocks.MessengerStub{}) + require.Nil(t, err) + require.Nil(t, msgID) + + // reset the received messages to allow a second one of the same type + wrk.ResetConsensusMessages() + + // second call should see this message as already received and return error + msgID, err = wrk.ProcessReceivedMessage(msg, "", &p2pmocks.MessengerStub{}) + require.Equal(t, spos.ErrInvalidSignersAlreadyReceived, err) + require.Nil(t, msgID) +} + func TestWorker_ReceivedHeader(t *testing.T) { t.Parallel() diff --git a/factory/consensus/consensusComponents.go b/factory/consensus/consensusComponents.go index cad39348aa2..d469016921b 100644 --- a/factory/consensus/consensusComponents.go +++ b/factory/consensus/consensusComponents.go @@ -180,6 +180,21 @@ func (ccf *consensusComponentsFactory) Create() (*consensusComponents, error) { return nil, err } + p2pSigningHandler, err := ccf.createP2pSigningHandler() + if err != nil { + return nil, err + } + + argsInvalidSignersCacher := spos.ArgInvalidSignersCache{ + Hasher: ccf.coreComponents.Hasher(), + SigningHandler: p2pSigningHandler, + Marshaller: ccf.coreComponents.InternalMarshalizer(), + } + invalidSignersCache, err := spos.NewInvalidSignersCache(argsInvalidSignersCacher) + if err != nil { + return nil, err + } + workerArgs := &spos.WorkerArgs{ ConsensusService: consensusService, BlockChain: ccf.dataComponents.Blockchain(), @@ -207,6 +222,7 @@ func (ccf *consensusComponentsFactory) Create() (*consensusComponents, error) { NodeRedundancyHandler: ccf.processComponents.NodeRedundancyHandler(), PeerBlacklistHandler: cc.peerBlacklistHandler, EnableEpochsHandler: ccf.coreComponents.EnableEpochsHandler(), + InvalidSignersCache: invalidSignersCache, } cc.worker, err = spos.NewWorker(workerArgs) @@ -226,11 +242,6 @@ func (ccf *consensusComponentsFactory) Create() (*consensusComponents, error) { return nil, err } - p2pSigningHandler, err := ccf.createP2pSigningHandler() - if err != nil { - return nil, err - } - consensusArgs := &spos.ConsensusCoreArgs{ BlockChain: ccf.dataComponents.Blockchain(), BlockProcessor: ccf.processComponents.BlockProcessor(), @@ -257,6 +268,7 @@ func (ccf *consensusComponentsFactory) Create() (*consensusComponents, error) { EnableEpochsHandler: ccf.coreComponents.EnableEpochsHandler(), EquivalentProofsPool: ccf.dataComponents.Datapool().Proofs(), EpochNotifier: ccf.coreComponents.EpochNotifier(), + InvalidSignersCache: invalidSignersCache, } consensusDataContainer, err := spos.NewConsensusCore( diff --git a/factory/consensus/consensusComponents_test.go b/factory/consensus/consensusComponents_test.go index d13318ba2b5..243cf11b04a 100644 --- a/factory/consensus/consensusComponents_test.go +++ b/factory/consensus/consensusComponents_test.go @@ -770,7 +770,7 @@ func TestConsensusComponentsFactory_Create(t *testing.T) { cnt := 0 netwCompStub.MessengerCalled = func() p2p.Messenger { cnt++ - if cnt > 3 { + if cnt > 4 { return nil } return &p2pmocks.MessengerStub{} diff --git a/factory/interface.go b/factory/interface.go index 698c9046d30..1b3b0665e4f 100644 --- a/factory/interface.go +++ b/factory/interface.go @@ -406,6 +406,8 @@ type ConsensusWorker interface { ResetConsensusMessages() // ResetConsensusRoundState resets the state of the consensus round ResetConsensusRoundState() + // ResetInvalidSignersCache resets the invalid signers cache + ResetInvalidSignersCache() // ReceivedHeader method is a wired method through which worker will receive headers from network ReceivedHeader(headerHandler data.HeaderHandler, headerHash []byte) // ReceivedProof will handle a received proof in consensus worker diff --git a/testscommon/consensus/invalidSignersCacheMock.go b/testscommon/consensus/invalidSignersCacheMock.go new file mode 100644 index 00000000000..f8c51387b70 --- /dev/null +++ b/testscommon/consensus/invalidSignersCacheMock.go @@ -0,0 +1,35 @@ +package consensus + +// InvalidSignersCacheMock - +type InvalidSignersCacheMock struct { + AddInvalidSignersCalled func(headerHash []byte, invalidSigners []byte, invalidPublicKeys []string) + CheckKnownInvalidSignersCalled func(headerHash []byte, invalidSigners []byte) bool + ResetCalled func() +} + +// AddInvalidSigners - +func (mock *InvalidSignersCacheMock) AddInvalidSigners(headerHash []byte, invalidSigners []byte, invalidPublicKeys []string) { + if mock.AddInvalidSignersCalled != nil { + mock.AddInvalidSignersCalled(headerHash, invalidSigners, invalidPublicKeys) + } +} + +// CheckKnownInvalidSigners - +func (mock *InvalidSignersCacheMock) CheckKnownInvalidSigners(headerHash []byte, invalidSigners []byte) bool { + if mock.CheckKnownInvalidSignersCalled != nil { + return mock.CheckKnownInvalidSignersCalled(headerHash, invalidSigners) + } + return false +} + +// Reset - +func (mock *InvalidSignersCacheMock) Reset() { + if mock.ResetCalled != nil { + mock.ResetCalled() + } +} + +// IsInterfaceNil - +func (mock *InvalidSignersCacheMock) IsInterfaceNil() bool { + return mock == nil +} diff --git a/testscommon/consensus/mockTestInitializer.go b/testscommon/consensus/mockTestInitializer.go index 050a6e2538b..85b946c13df 100644 --- a/testscommon/consensus/mockTestInitializer.go +++ b/testscommon/consensus/mockTestInitializer.go @@ -245,6 +245,7 @@ func InitConsensusCoreWithMultiSigner(multiSigner crypto.MultiSigner) *spos.Cons EnableEpochsHandler: enableEpochsHandler, EquivalentProofsPool: equivalentProofsPool, EpochNotifier: epochNotifier, + InvalidSignersCache: &InvalidSignersCacheMock{}, }) return container diff --git a/testscommon/consensus/sposWorkerMock.go b/testscommon/consensus/sposWorkerMock.go index 533809fda4f..657f01ca7ca 100644 --- a/testscommon/consensus/sposWorkerMock.go +++ b/testscommon/consensus/sposWorkerMock.go @@ -34,6 +34,7 @@ type SposWorkerMock struct { ResetConsensusStateCalled func() ReceivedProofCalled func(proofHandler consensus.ProofHandler) ResetConsensusRoundStateCalled func() + ResetInvalidSignersCacheCalled func() } // ResetConsensusRoundState - @@ -173,3 +174,10 @@ func (sposWorkerMock *SposWorkerMock) ReceivedProof(proofHandler consensus.Proof func (sposWorkerMock *SposWorkerMock) IsInterfaceNil() bool { return sposWorkerMock == nil } + +// ResetInvalidSignersCache - +func (sposWorkerMock *SposWorkerMock) ResetInvalidSignersCache() { + if sposWorkerMock.ResetInvalidSignersCacheCalled != nil { + sposWorkerMock.ResetInvalidSignersCacheCalled() + } +}