diff --git a/cmd/node/config/enableEpochs.toml b/cmd/node/config/enableEpochs.toml index 819108b99eb..1717a8a6c8b 100644 --- a/cmd/node/config/enableEpochs.toml +++ b/cmd/node/config/enableEpochs.toml @@ -287,6 +287,9 @@ # FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch represents the epoch when the fix for the remaining gas in the SaveKeyValue builtin function is enabled FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch = 3 + # EquivalentMessagesEnableEpoch represents the epoch when the equivalent messages are enabled + EquivalentMessagesEnableEpoch = 3 + # BLSMultiSignerEnableEpoch represents the activation epoch for different types of BLS multi-signers BLSMultiSignerEnableEpoch = [ { EnableEpoch = 0, Type = "no-KOSK" }, diff --git a/common/constants.go b/common/constants.go index 54b1f5eb70e..eb7e10a72ce 100644 --- a/common/constants.go +++ b/common/constants.go @@ -1001,5 +1001,6 @@ const ( NFTStopCreateFlag core.EnableEpochFlag = "NFTStopCreateFlag" FixGasRemainingForSaveKeyValueFlag core.EnableEpochFlag = "FixGasRemainingForSaveKeyValueFlag" IsChangeOwnerAddressCrossShardThroughSCFlag core.EnableEpochFlag = "IsChangeOwnerAddressCrossShardThroughSCFlag" + EquivalentMessagesFlag core.EnableEpochFlag = "EquivalentMessagesFlag" // all new flags must be added to createAllFlagsMap method, as part of enableEpochsHandler allFlagsDefined ) diff --git a/common/enablers/enableEpochsHandler.go b/common/enablers/enableEpochsHandler.go index e5ab0f06100..517287b1f8e 100644 --- a/common/enablers/enableEpochsHandler.go +++ b/common/enablers/enableEpochsHandler.go @@ -695,6 +695,12 @@ func (handler *enableEpochsHandler) createAllFlagsMap() { }, activationEpoch: handler.enableEpochsConfig.ChangeOwnerAddressCrossShardThroughSCEnableEpoch, }, + common.EquivalentMessagesFlag: { + isActiveInEpoch: func(epoch uint32) bool { + return epoch >= handler.enableEpochsConfig.EquivalentMessagesEnableEpoch + }, + activationEpoch: handler.enableEpochsConfig.EquivalentMessagesEnableEpoch, + }, } } diff --git a/common/enablers/enableEpochsHandler_test.go b/common/enablers/enableEpochsHandler_test.go index 75b97c35460..90cf9dfac1e 100644 --- a/common/enablers/enableEpochsHandler_test.go +++ b/common/enablers/enableEpochsHandler_test.go @@ -110,6 +110,7 @@ func createEnableEpochsConfig() config.EnableEpochs { NFTStopCreateEnableEpoch: 92, FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch: 93, ChangeOwnerAddressCrossShardThroughSCEnableEpoch: 94, + EquivalentMessagesEnableEpoch: 95, } } @@ -297,6 +298,7 @@ func TestEnableEpochsHandler_IsFlagEnabled(t *testing.T) { require.True(t, handler.IsFlagEnabled(common.NFTStopCreateFlag)) require.True(t, handler.IsFlagEnabled(common.FixGasRemainingForSaveKeyValueFlag)) require.True(t, handler.IsFlagEnabled(common.IsChangeOwnerAddressCrossShardThroughSCFlag)) + require.True(t, handler.IsFlagEnabled(common.EquivalentMessagesFlag)) } func TestEnableEpochsHandler_GetActivationEpoch(t *testing.T) { @@ -407,6 +409,7 @@ func TestEnableEpochsHandler_GetActivationEpoch(t *testing.T) { require.Equal(t, cfg.NFTStopCreateEnableEpoch, handler.GetActivationEpoch(common.NFTStopCreateFlag)) require.Equal(t, cfg.ChangeOwnerAddressCrossShardThroughSCEnableEpoch, handler.GetActivationEpoch(common.IsChangeOwnerAddressCrossShardThroughSCFlag)) require.Equal(t, cfg.FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch, handler.GetActivationEpoch(common.FixGasRemainingForSaveKeyValueFlag)) + require.Equal(t, cfg.EquivalentMessagesEnableEpoch, handler.GetActivationEpoch(common.EquivalentMessagesFlag)) } func TestEnableEpochsHandler_IsInterfaceNil(t *testing.T) { diff --git a/config/epochConfig.go b/config/epochConfig.go index c591b17c97b..566845e00c6 100644 --- a/config/epochConfig.go +++ b/config/epochConfig.go @@ -108,6 +108,7 @@ type EnableEpochs struct { NFTStopCreateEnableEpoch uint32 ChangeOwnerAddressCrossShardThroughSCEnableEpoch uint32 FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch uint32 + EquivalentMessagesEnableEpoch uint32 BLSMultiSignerEnableEpoch []MultiSignerConfig } diff --git a/config/tomlConfig_test.go b/config/tomlConfig_test.go index 9fe70361280..1068719a4d4 100644 --- a/config/tomlConfig_test.go +++ b/config/tomlConfig_test.go @@ -857,6 +857,9 @@ func TestEnableEpochConfig(t *testing.T) { # FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch represents the epoch when the fix for the remaining gas in the SaveKeyValue builtin function is enabled FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch = 91 + # EquivalentMessagesEnableEpoch represents the epoch when the equivalent messages are enabled + EquivalentMessagesEnableEpoch = 92 + # MaxNodesChangeEnableEpoch holds configuration for changing the maximum number of nodes and the enabling epoch MaxNodesChangeEnableEpoch = [ { EpochEnable = 44, MaxNumNodes = 2169, NodesToShufflePerShard = 80 }, @@ -968,6 +971,7 @@ func TestEnableEpochConfig(t *testing.T) { NFTStopCreateEnableEpoch: 89, ChangeOwnerAddressCrossShardThroughSCEnableEpoch: 90, FixGasRemainingForSaveKeyValueBuiltinFunctionEnableEpoch: 91, + EquivalentMessagesEnableEpoch: 92, MaxNodesChangeEnableEpoch: []MaxNodesChangeConfig{ { EpochEnable: 44, diff --git a/consensus/message.go b/consensus/message.go index f4396c05076..6cdb2522d98 100644 --- a/consensus/message.go +++ b/consensus/message.go @@ -40,3 +40,9 @@ func NewConsensusMessage( InvalidSigners: invalidSigners, } } + +// EquivalentMessageInfo holds information about an equivalent message +type EquivalentMessageInfo struct { + NumMessages uint64 + Validated bool +} diff --git a/consensus/mock/equivalentMessagesDebuggerStub.go b/consensus/mock/equivalentMessagesDebuggerStub.go new file mode 100644 index 00000000000..b0bfba7acde --- /dev/null +++ b/consensus/mock/equivalentMessagesDebuggerStub.go @@ -0,0 +1,22 @@ +package mock + +import ( + "github.com/multiversx/mx-chain-go/consensus" +) + +// EquivalentMessagesDebuggerStub - +type EquivalentMessagesDebuggerStub struct { + DisplayEquivalentMessagesStatisticsCalled func(getDataHandler func() map[string]*consensus.EquivalentMessageInfo) +} + +// DisplayEquivalentMessagesStatistics - +func (stub *EquivalentMessagesDebuggerStub) DisplayEquivalentMessagesStatistics(getDataHandler func() map[string]*consensus.EquivalentMessageInfo) { + if stub.DisplayEquivalentMessagesStatisticsCalled != nil { + stub.DisplayEquivalentMessagesStatisticsCalled(getDataHandler) + } +} + +// IsInterfaceNil - +func (stub *EquivalentMessagesDebuggerStub) IsInterfaceNil() bool { + return stub == nil +} diff --git a/consensus/spos/debug/equivalentMessagesDebugger.go b/consensus/spos/debug/equivalentMessagesDebugger.go new file mode 100644 index 00000000000..6a5b7df028a --- /dev/null +++ b/consensus/spos/debug/equivalentMessagesDebugger.go @@ -0,0 +1,72 @@ +package debug + +import ( + "fmt" + + "github.com/multiversx/mx-chain-core-go/display" + "github.com/multiversx/mx-chain-go/consensus" + logger "github.com/multiversx/mx-chain-logger-go" +) + +var log = logger.GetOrCreate("debug/equivalentmessages") + +type equivalentMessagesDebugger struct { + shouldProcessDataFunc func() bool +} + +// NewEquivalentMessagesDebugger returns a new instance of equivalentMessagesDebugger +func NewEquivalentMessagesDebugger() *equivalentMessagesDebugger { + debugger := &equivalentMessagesDebugger{ + shouldProcessDataFunc: isLogTrace, + } + + return debugger +} + +// DisplayEquivalentMessagesStatistics prints all the equivalent messages +func (debugger *equivalentMessagesDebugger) DisplayEquivalentMessagesStatistics(getDataHandler func() map[string]*consensus.EquivalentMessageInfo) { + if !debugger.shouldProcessDataFunc() { + return + } + if getDataHandler == nil { + return + } + + dataMap := getDataHandler() + log.Trace(fmt.Sprintf("Equivalent messages statistics for current round\n%s", dataToString(dataMap))) +} + +func dataToString(data map[string]*consensus.EquivalentMessageInfo) string { + header := []string{ + "Block header hash", + "Equivalent messages received", + } + + lines := make([]*display.LineData, 0, len(data)) + idx := 0 + for hash, info := range data { + horizontalLineAfter := idx == len(data) + line := []string{ + hash, + fmt.Sprintf("%d", info.NumMessages), + } + lines = append(lines, display.NewLineData(horizontalLineAfter, line)) + idx++ + } + + table, err := display.CreateTableString(header, lines) + if err != nil { + return "error creating p2p stats table: " + err.Error() + } + + return table +} + +func isLogTrace() bool { + return log.GetLevel() == logger.LogTrace +} + +// IsInterfaceNil returns true if there is no value under the interface +func (debugger *equivalentMessagesDebugger) IsInterfaceNil() bool { + return debugger == nil +} diff --git a/consensus/spos/debug/equivalentMessagesDebugger_test.go b/consensus/spos/debug/equivalentMessagesDebugger_test.go new file mode 100644 index 00000000000..af6dc256ed2 --- /dev/null +++ b/consensus/spos/debug/equivalentMessagesDebugger_test.go @@ -0,0 +1,80 @@ +package debug + +import ( + "testing" + + "github.com/multiversx/mx-chain-go/consensus" + "github.com/stretchr/testify/require" +) + +func TestNewEquivalentMessagesDebugger_IsInterfaceNil(t *testing.T) { + t.Parallel() + + var debugger *equivalentMessagesDebugger + require.True(t, debugger.IsInterfaceNil()) + + debugger = NewEquivalentMessagesDebugger() + require.False(t, debugger.IsInterfaceNil()) +} + +func TestEquivalentMessagesDebugger_DisplayEquivalentMessagesStatistics(t *testing.T) { + t.Parallel() + + t.Run("log level not trace should early exit", func(t *testing.T) { + t.Parallel() + + defer func() { + r := recover() + if r != nil { + require.Fail(t, "should have not panicked") + } + }() + + debugger := NewEquivalentMessagesDebugger() + debugger.DisplayEquivalentMessagesStatistics(func() map[string]*consensus.EquivalentMessageInfo { + return make(map[string]*consensus.EquivalentMessageInfo) + }) + }) + t.Run("nil get data handler should early exit", func(t *testing.T) { + t.Parallel() + + defer func() { + r := recover() + if r != nil { + require.Fail(t, "should have not panicked") + } + }() + + debugger := NewEquivalentMessagesDebugger() + debugger.shouldProcessDataFunc = func() bool { + return true + } + + debugger.DisplayEquivalentMessagesStatistics(nil) + }) + t.Run("should work", func(t *testing.T) { + t.Parallel() + + defer func() { + r := recover() + if r != nil { + require.Fail(t, "should have not panicked") + } + }() + + debugger := NewEquivalentMessagesDebugger() + debugger.shouldProcessDataFunc = func() bool { + return true + } + + debugger.DisplayEquivalentMessagesStatistics(func() map[string]*consensus.EquivalentMessageInfo { + return map[string]*consensus.EquivalentMessageInfo{ + "hash1": {NumMessages: 1, Validated: true}, + "hash2": {NumMessages: 2, Validated: true}, + "hash3": {NumMessages: 3, Validated: true}, + "hash4": {NumMessages: 4, Validated: true}, + } + }) + + }) +} diff --git a/consensus/spos/errors.go b/consensus/spos/errors.go index c8b5cede565..7f5ad3a24d0 100644 --- a/consensus/spos/errors.go +++ b/consensus/spos/errors.go @@ -243,3 +243,12 @@ var ErrNilSentSignatureTracker = errors.New("nil sent signature tracker") // ErrNilFunctionHandler signals that a nil function handler was provided var ErrNilFunctionHandler = errors.New("nil function handler") + +// ErrEquivalentMessageAlreadyReceived signals that an equivalent message has been already received +var ErrEquivalentMessageAlreadyReceived = errors.New("equivalent message already received") + +// ErrNilEquivalentMessagesDebugger signals that a nil equivalent messages debugger has been provided +var ErrNilEquivalentMessagesDebugger = errors.New("nil equivalent messages debugger") + +// ErrNilEnableEpochsHandler signals that a nil enable epochs handler has been provided +var ErrNilEnableEpochsHandler = errors.New("nil enable epochs handler") diff --git a/consensus/spos/export_test.go b/consensus/spos/export_test.go index 3a02e7b27fb..e09f0fbda06 100644 --- a/consensus/spos/export_test.go +++ b/consensus/spos/export_test.go @@ -183,6 +183,11 @@ func (wrk *Worker) AppStatusHandler() core.AppStatusHandler { return wrk.appStatusHandler } +// GetEquivalentMessages - +func (wrk *Worker) GetEquivalentMessages() map[string]*consensus.EquivalentMessageInfo { + return wrk.getEquivalentMessages() +} + // CheckConsensusMessageValidity - func (cmv *consensusMessageValidator) CheckConsensusMessageValidity(cnsMsg *consensus.Message, originator core.PeerID) error { return cmv.checkConsensusMessageValidity(cnsMsg, originator) diff --git a/consensus/spos/interface.go b/consensus/spos/interface.go index 235c139d2fb..ce9482630e6 100644 --- a/consensus/spos/interface.go +++ b/consensus/spos/interface.go @@ -178,3 +178,9 @@ type SentSignaturesTracker interface { ReceivedActualSigners(signersPks []string) IsInterfaceNil() bool } + +// EquivalentMessagesDebugger defines the specific debugger for equivalent messages +type EquivalentMessagesDebugger interface { + DisplayEquivalentMessagesStatistics(getDataHandler func() map[string]*consensus.EquivalentMessageInfo) + IsInterfaceNil() bool +} diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index 8fdcca4686f..4517578b1c5 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -51,6 +51,7 @@ type Worker struct { headerSigVerifier HeaderSigVerifier headerIntegrityVerifier process.HeaderIntegrityVerifier appStatusHandler core.AppStatusHandler + enableEpochsHandler common.EnableEpochsHandler networkShardingCollector consensus.NetworkShardingCollector @@ -77,35 +78,41 @@ type Worker struct { nodeRedundancyHandler consensus.NodeRedundancyHandler peerBlacklistHandler consensus.PeerBlacklistHandler closer core.SafeCloser + + mutEquivalentMessages sync.RWMutex + equivalentMessages map[string]*consensus.EquivalentMessageInfo + equivalentMessagesDebugger EquivalentMessagesDebugger } // WorkerArgs holds the consensus worker arguments type WorkerArgs struct { - ConsensusService ConsensusService - BlockChain data.ChainHandler - BlockProcessor process.BlockProcessor - ScheduledProcessor consensus.ScheduledProcessor - Bootstrapper process.Bootstrapper - BroadcastMessenger consensus.BroadcastMessenger - ConsensusState *ConsensusState - ForkDetector process.ForkDetector - Marshalizer marshal.Marshalizer - Hasher hashing.Hasher - RoundHandler consensus.RoundHandler - ShardCoordinator sharding.Coordinator - PeerSignatureHandler crypto.PeerSignatureHandler - SyncTimer ntp.SyncTimer - HeaderSigVerifier HeaderSigVerifier - HeaderIntegrityVerifier process.HeaderIntegrityVerifier - ChainID []byte - NetworkShardingCollector consensus.NetworkShardingCollector - AntifloodHandler consensus.P2PAntifloodHandler - PoolAdder PoolAdder - SignatureSize int - PublicKeySize int - AppStatusHandler core.AppStatusHandler - NodeRedundancyHandler consensus.NodeRedundancyHandler - PeerBlacklistHandler consensus.PeerBlacklistHandler + ConsensusService ConsensusService + BlockChain data.ChainHandler + BlockProcessor process.BlockProcessor + ScheduledProcessor consensus.ScheduledProcessor + Bootstrapper process.Bootstrapper + BroadcastMessenger consensus.BroadcastMessenger + ConsensusState *ConsensusState + ForkDetector process.ForkDetector + Marshalizer marshal.Marshalizer + Hasher hashing.Hasher + RoundHandler consensus.RoundHandler + ShardCoordinator sharding.Coordinator + PeerSignatureHandler crypto.PeerSignatureHandler + SyncTimer ntp.SyncTimer + HeaderSigVerifier HeaderSigVerifier + HeaderIntegrityVerifier process.HeaderIntegrityVerifier + ChainID []byte + NetworkShardingCollector consensus.NetworkShardingCollector + AntifloodHandler consensus.P2PAntifloodHandler + PoolAdder PoolAdder + SignatureSize int + PublicKeySize int + AppStatusHandler core.AppStatusHandler + NodeRedundancyHandler consensus.NodeRedundancyHandler + PeerBlacklistHandler consensus.PeerBlacklistHandler + EquivalentMessagesDebugger EquivalentMessagesDebugger + EnableEpochsHandler common.EnableEpochsHandler } // NewWorker creates a new Worker object @@ -131,29 +138,32 @@ func NewWorker(args *WorkerArgs) (*Worker, error) { } wrk := Worker{ - consensusService: args.ConsensusService, - blockChain: args.BlockChain, - blockProcessor: args.BlockProcessor, - scheduledProcessor: args.ScheduledProcessor, - bootstrapper: args.Bootstrapper, - broadcastMessenger: args.BroadcastMessenger, - consensusState: args.ConsensusState, - forkDetector: args.ForkDetector, - marshalizer: args.Marshalizer, - hasher: args.Hasher, - roundHandler: args.RoundHandler, - shardCoordinator: args.ShardCoordinator, - peerSignatureHandler: args.PeerSignatureHandler, - syncTimer: args.SyncTimer, - headerSigVerifier: args.HeaderSigVerifier, - headerIntegrityVerifier: args.HeaderIntegrityVerifier, - appStatusHandler: args.AppStatusHandler, - networkShardingCollector: args.NetworkShardingCollector, - antifloodHandler: args.AntifloodHandler, - poolAdder: args.PoolAdder, - nodeRedundancyHandler: args.NodeRedundancyHandler, - peerBlacklistHandler: args.PeerBlacklistHandler, - closer: closing.NewSafeChanCloser(), + consensusService: args.ConsensusService, + blockChain: args.BlockChain, + blockProcessor: args.BlockProcessor, + scheduledProcessor: args.ScheduledProcessor, + bootstrapper: args.Bootstrapper, + broadcastMessenger: args.BroadcastMessenger, + consensusState: args.ConsensusState, + forkDetector: args.ForkDetector, + marshalizer: args.Marshalizer, + hasher: args.Hasher, + roundHandler: args.RoundHandler, + shardCoordinator: args.ShardCoordinator, + peerSignatureHandler: args.PeerSignatureHandler, + syncTimer: args.SyncTimer, + headerSigVerifier: args.HeaderSigVerifier, + headerIntegrityVerifier: args.HeaderIntegrityVerifier, + appStatusHandler: args.AppStatusHandler, + networkShardingCollector: args.NetworkShardingCollector, + antifloodHandler: args.AntifloodHandler, + poolAdder: args.PoolAdder, + nodeRedundancyHandler: args.NodeRedundancyHandler, + peerBlacklistHandler: args.PeerBlacklistHandler, + closer: closing.NewSafeChanCloser(), + equivalentMessages: make(map[string]*consensus.EquivalentMessageInfo), + equivalentMessagesDebugger: args.EquivalentMessagesDebugger, + enableEpochsHandler: args.EnableEpochsHandler, } wrk.consensusMessageValidator = consensusMessageValidatorObj @@ -254,6 +264,12 @@ func checkNewWorkerParams(args *WorkerArgs) error { if check.IfNil(args.PeerBlacklistHandler) { return ErrNilPeerBlacklistHandler } + if check.IfNil(args.EquivalentMessagesDebugger) { + return ErrNilEquivalentMessagesDebugger + } + if check.IfNil(args.EnableEpochsHandler) { + return ErrNilEnableEpochsHandler + } return nil } @@ -386,23 +402,14 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP ) } - msgType := consensus.MessageType(cnsMsg.MsgType) - - log.Trace("received message from consensus topic", - "msg type", wrk.consensusService.GetStringValue(msgType), - "from", cnsMsg.PubKey, - "header hash", cnsMsg.BlockHeaderHash, - "round", cnsMsg.RoundIndex, - "size", len(message.Data()), - ) - - err = wrk.consensusMessageValidator.checkConsensusMessageValidity(cnsMsg, message.Peer()) + err = wrk.checkValidityAndProcessEquivalentMessages(cnsMsg, message) if err != nil { return err } wrk.networkShardingCollector.UpdatePeerIDInfo(message.Peer(), cnsMsg.PubKey, wrk.shardCoordinator.SelfId()) + msgType := consensus.MessageType(cnsMsg.MsgType) isMessageWithBlockBody := wrk.consensusService.IsMessageWithBlockBody(msgType) isMessageWithBlockHeader := wrk.consensusService.IsMessageWithBlockHeader(msgType) isMessageWithBlockBodyAndHeader := wrk.consensusService.IsMessageWithBlockBodyAndHeader(msgType) @@ -443,7 +450,8 @@ func (wrk *Worker) shouldBlacklistPeer(err error) bool { errors.Is(err, errorsErd.ErrPIDMismatch) || errors.Is(err, errorsErd.ErrSignatureMismatch) || errors.Is(err, nodesCoordinator.ErrEpochNodesConfigDoesNotExist) || - errors.Is(err, ErrMessageTypeLimitReached) { + errors.Is(err, ErrMessageTypeLimitReached) || + errors.Is(err, crypto.ErrAggSigNotValid) { return false } @@ -677,6 +685,8 @@ func (wrk *Worker) DisplayStatistics() { wrk.mapDisplayHashConsensusMessage = make(map[string][]*consensus.Message) wrk.mutDisplayHashConsensusMessage.Unlock() + + wrk.equivalentMessagesDebugger.DisplayEquivalentMessagesStatistics(wrk.getEquivalentMessages) } // GetConsensusStateChangedChannel gets the channel for the consensusStateChanged @@ -709,6 +719,104 @@ func (wrk *Worker) Close() error { // ResetConsensusMessages resets at the start of each round all the previous consensus messages received func (wrk *Worker) ResetConsensusMessages() { wrk.consensusMessageValidator.resetConsensusMessages() + + wrk.mutEquivalentMessages.Lock() + wrk.equivalentMessages = make(map[string]*consensus.EquivalentMessageInfo) + wrk.mutEquivalentMessages.Unlock() +} + +func (wrk *Worker) checkValidityAndProcessEquivalentMessages(cnsMsg *consensus.Message, p2pMessage p2p.MessageP2P) error { + msgType := consensus.MessageType(cnsMsg.MsgType) + + log.Trace("received message from consensus topic", + "msg type", wrk.consensusService.GetStringValue(msgType), + "from", cnsMsg.PubKey, + "header hash", cnsMsg.BlockHeaderHash, + "round", cnsMsg.RoundIndex, + "size", len(p2pMessage.Data()), + ) + + if !wrk.shouldVerifyEquivalentMessages(msgType) { + return wrk.consensusMessageValidator.checkConsensusMessageValidity(cnsMsg, p2pMessage.Peer()) + } + + wrk.mutEquivalentMessages.Lock() + defer wrk.mutEquivalentMessages.Unlock() + + err := wrk.processEquivalentMessageUnprotected(cnsMsg) + if err != nil { + return err + } + + err = wrk.consensusMessageValidator.checkConsensusMessageValidity(cnsMsg, p2pMessage.Peer()) + if err != nil { + wrk.processInvalidEquivalentMessageUnprotected(cnsMsg.BlockHeaderHash) + return err + } + + return nil +} + +func (wrk *Worker) shouldVerifyEquivalentMessages(msgType consensus.MessageType) bool { + if !wrk.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { + return false + } + + return wrk.consensusService.IsMessageWithFinalInfo(msgType) +} + +func (wrk *Worker) processEquivalentMessageUnprotected(cnsMsg *consensus.Message) error { + hdrHash := string(cnsMsg.BlockHeaderHash) + equivalentMsgInfo, ok := wrk.equivalentMessages[hdrHash] + if !ok { + equivalentMsgInfo = &consensus.EquivalentMessageInfo{} + wrk.equivalentMessages[hdrHash] = equivalentMsgInfo + } + equivalentMsgInfo.NumMessages++ + + if equivalentMsgInfo.Validated { + return ErrEquivalentMessageAlreadyReceived + } + + err := wrk.verifyEquivalentMessageSignature(cnsMsg) + if err != nil { + return err + } + + // TODO[Sorin next PR]: update EquivalentMessageInfo structure to hold also the proof(bitmap+signature) that was received + // then on commit block store this data on blockchain in order to use it on the next block creation + equivalentMsgInfo.Validated = true + + return nil +} + +func (wrk *Worker) verifyEquivalentMessageSignature(_ *consensus.Message) error { + if check.IfNil(wrk.consensusState.Header) { + return ErrNilHeader + } + + header := wrk.consensusState.Header.ShallowClone() + + // TODO[Sorin]: after flag enabled, VerifySignature on previous hash, with the signature and bitmap from the proof on cnsMsg + return wrk.headerSigVerifier.VerifySignature(header) +} + +func (wrk *Worker) processInvalidEquivalentMessageUnprotected(blockHeaderHash []byte) { + hdrHash := string(blockHeaderHash) + delete(wrk.equivalentMessages, hdrHash) +} + +// getEquivalentMessages returns a copy of the equivalent messages +func (wrk *Worker) getEquivalentMessages() map[string]*consensus.EquivalentMessageInfo { + wrk.mutEquivalentMessages.RLock() + defer wrk.mutEquivalentMessages.RUnlock() + + equivalentMessagesCopy := make(map[string]*consensus.EquivalentMessageInfo, len(wrk.equivalentMessages)) + for hash, cnt := range wrk.equivalentMessages { + equivalentMessagesCopy[hash] = cnt + } + + return equivalentMessagesCopy } // IsInterfaceNil returns true if there is no value under the interface diff --git a/consensus/spos/worker_test.go b/consensus/spos/worker_test.go index 37cc36f33c1..9cb95e22d56 100644 --- a/consensus/spos/worker_test.go +++ b/consensus/spos/worker_test.go @@ -3,6 +3,7 @@ package spos_test import ( "bytes" "context" + "crypto/rand" "errors" "fmt" "math/big" @@ -23,6 +24,7 @@ import ( "github.com/multiversx/mx-chain-go/p2p" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/testscommon" + "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" "github.com/multiversx/mx-chain-go/testscommon/p2pmocks" statusHandlerMock "github.com/multiversx/mx-chain-go/testscommon/statusHandler" @@ -89,31 +91,33 @@ func createDefaultWorkerArgs(appStatusHandler core.AppStatusHandler) *spos.Worke peerSigHandler := &mock.PeerSignatureHandler{Signer: singleSignerMock, KeyGen: keyGeneratorMock} workerArgs := &spos.WorkerArgs{ - ConsensusService: blsService, - BlockChain: blockchainMock, - BlockProcessor: blockProcessor, - ScheduledProcessor: scheduledProcessor, - Bootstrapper: bootstrapperMock, - BroadcastMessenger: broadcastMessengerMock, - ConsensusState: consensusState, - ForkDetector: forkDetectorMock, - Marshalizer: marshalizerMock, - Hasher: hasher, - RoundHandler: roundHandlerMock, - ShardCoordinator: shardCoordinatorMock, - PeerSignatureHandler: peerSigHandler, - SyncTimer: syncTimerMock, - HeaderSigVerifier: &mock.HeaderSigVerifierStub{}, - HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, - ChainID: chainID, - NetworkShardingCollector: &p2pmocks.NetworkShardingCollectorStub{}, - AntifloodHandler: createMockP2PAntifloodHandler(), - PoolAdder: poolAdder, - SignatureSize: SignatureSize, - PublicKeySize: PublicKeySize, - AppStatusHandler: appStatusHandler, - NodeRedundancyHandler: &mock.NodeRedundancyHandlerStub{}, - PeerBlacklistHandler: &mock.PeerBlacklistHandlerStub{}, + ConsensusService: blsService, + BlockChain: blockchainMock, + BlockProcessor: blockProcessor, + ScheduledProcessor: scheduledProcessor, + Bootstrapper: bootstrapperMock, + BroadcastMessenger: broadcastMessengerMock, + ConsensusState: consensusState, + ForkDetector: forkDetectorMock, + Marshalizer: marshalizerMock, + Hasher: hasher, + RoundHandler: roundHandlerMock, + ShardCoordinator: shardCoordinatorMock, + PeerSignatureHandler: peerSigHandler, + SyncTimer: syncTimerMock, + HeaderSigVerifier: &mock.HeaderSigVerifierStub{}, + HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, + ChainID: chainID, + NetworkShardingCollector: &p2pmocks.NetworkShardingCollectorStub{}, + AntifloodHandler: createMockP2PAntifloodHandler(), + PoolAdder: poolAdder, + SignatureSize: SignatureSize, + PublicKeySize: PublicKeySize, + AppStatusHandler: appStatusHandler, + NodeRedundancyHandler: &mock.NodeRedundancyHandlerStub{}, + PeerBlacklistHandler: &mock.PeerBlacklistHandlerStub{}, + EquivalentMessagesDebugger: &mock.EquivalentMessagesDebuggerStub{}, + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, } return workerArgs @@ -368,6 +372,28 @@ func TestWorker_NewWorkerNodeRedundancyHandlerShouldFail(t *testing.T) { assert.Equal(t, spos.ErrNilNodeRedundancyHandler, err) } +func TestWorker_NewWorkerPoolEquivalentMessagesDebuggerNilShouldFail(t *testing.T) { + t.Parallel() + + workerArgs := createDefaultWorkerArgs(&statusHandlerMock.AppStatusHandlerStub{}) + workerArgs.EquivalentMessagesDebugger = nil + wrk, err := spos.NewWorker(workerArgs) + + assert.Nil(t, wrk) + assert.Equal(t, spos.ErrNilEquivalentMessagesDebugger, err) +} + +func TestWorker_NewWorkerPoolEnableEpochsHandlerNilShouldFail(t *testing.T) { + t.Parallel() + + workerArgs := createDefaultWorkerArgs(&statusHandlerMock.AppStatusHandlerStub{}) + workerArgs.EnableEpochsHandler = nil + wrk, err := spos.NewWorker(workerArgs) + + assert.Nil(t, wrk) + assert.Equal(t, spos.ErrNilEnableEpochsHandler, err) +} + func TestWorker_NewWorkerShouldWork(t *testing.T) { t.Parallel() @@ -582,6 +608,148 @@ func TestWorker_ProcessReceivedMessageRedundancyNodeShouldResetInactivityIfNeede assert.True(t, wasCalled) } +func TestWorker_ProcessReceivedMessageEquivalentMessage(t *testing.T) { + t.Parallel() + + workerArgs := createDefaultWorkerArgs(&statusHandlerMock.AppStatusHandlerStub{}) + workerArgs.EnableEpochsHandler = enableEpochsHandlerMock.NewEnableEpochsHandlerStub(common.EquivalentMessagesFlag) + wrk, _ := spos.NewWorker(workerArgs) + + equivalentBlockHeaderHash := workerArgs.Hasher.Compute("equivalent block header hash") + pubKey := []byte(wrk.ConsensusState().ConsensusGroup()[0]) + headerBytes := make([]byte, 100) + _, _ = rand.Read(headerBytes) + + bodyBytes := make([]byte, 100) + _, _ = rand.Read(bodyBytes) + + cnsMsg := consensus.NewConsensusMessage( + equivalentBlockHeaderHash, + nil, + nil, + nil, + pubKey, + bytes.Repeat([]byte("a"), SignatureSize), + int(bls.MtBlockHeaderFinalInfo), + 0, + chainID, + []byte("01"), + signature, + signature, + currentPid, + nil, + ) + buff, _ := wrk.Marshalizer().Marshal(cnsMsg) + + cnsMsgEquiv := consensus.NewConsensusMessage( + equivalentBlockHeaderHash, + nil, + nil, + nil, + pubKey, + bytes.Repeat([]byte("b"), SignatureSize), + int(bls.MtBlockHeaderFinalInfo), + 0, + chainID, + []byte("01"), + signature, + signature, + currentPid, + nil, + ) + buffEquiv, _ := wrk.Marshalizer().Marshal(cnsMsgEquiv) + + invalidCnsMsg := consensus.NewConsensusMessage( + []byte("other block header hash"), + nil, + nil, + nil, + pubKey, + bytes.Repeat([]byte("a"), SignatureSize), + int(bls.MtBlockHeaderFinalInfo), + 0, + []byte("invalid chain id"), + []byte("01"), + signature, + signature, + currentPid, + nil, + ) + buffInvalidCnsMsg, _ := wrk.Marshalizer().Marshal(invalidCnsMsg) + + err := wrk.ProcessReceivedMessage( + &p2pmocks.P2PMessageMock{ + DataField: buff, + PeerField: currentPid, + SignatureField: []byte("signature"), + }, + fromConnectedPeerId, + &p2pmocks.MessengerStub{}, + ) + assert.Equal(t, spos.ErrNilHeader, err) + + wrk.ConsensusState().Header = &block.Header{ + ChainID: chainID, + PrevHash: []byte("prev hash"), + PrevRandSeed: []byte("prev rand seed"), + RandSeed: []byte("rand seed"), + RootHash: []byte("roothash"), + SoftwareVersion: []byte("software version"), + AccumulatedFees: big.NewInt(0), + DeveloperFees: big.NewInt(0), + } + err = wrk.ProcessReceivedMessage( + &p2pmocks.P2PMessageMock{ + DataField: buff, + PeerField: currentPid, + SignatureField: []byte("signature"), + }, + fromConnectedPeerId, + &p2pmocks.MessengerStub{}, + ) + assert.NoError(t, err) + + equivalentMessages := wrk.GetEquivalentMessages() + assert.Equal(t, 1, len(equivalentMessages)) + assert.Equal(t, uint64(2), equivalentMessages[string(equivalentBlockHeaderHash)].NumMessages) + + equivMsgFrom := core.PeerID("from other peer id") + err = wrk.ProcessReceivedMessage( + &p2pmocks.P2PMessageMock{ + DataField: buffEquiv, + PeerField: currentPid, + SignatureField: []byte("signatureEquiv"), + }, + equivMsgFrom, + &p2pmocks.MessengerStub{}, + ) + assert.Equal(t, spos.ErrEquivalentMessageAlreadyReceived, err) + + equivalentMessages = wrk.GetEquivalentMessages() + assert.Equal(t, 1, len(equivalentMessages)) + assert.Equal(t, uint64(3), equivalentMessages[string(equivalentBlockHeaderHash)].NumMessages) + + err = wrk.ProcessReceivedMessage( + &p2pmocks.P2PMessageMock{ + DataField: buffInvalidCnsMsg, + PeerField: currentPid, + SignatureField: []byte("signatureEquiv"), + }, + equivMsgFrom, + &p2pmocks.MessengerStub{}, + ) + assert.Error(t, err) + + // same state as before, invalid message should have been dropped + equivalentMessages = wrk.GetEquivalentMessages() + assert.Equal(t, 1, len(equivalentMessages)) + assert.Equal(t, uint64(3), equivalentMessages[string(equivalentBlockHeaderHash)].NumMessages) + + wrk.ResetConsensusMessages() + equivalentMessages = wrk.GetEquivalentMessages() + assert.Equal(t, 0, len(equivalentMessages)) +} + func TestWorker_ProcessReceivedMessageNodeNotInEligibleListShouldErr(t *testing.T) { t.Parallel() wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{}) diff --git a/factory/consensus/consensusComponents.go b/factory/consensus/consensusComponents.go index f74262b4e43..2a7d0610862 100644 --- a/factory/consensus/consensusComponents.go +++ b/factory/consensus/consensusComponents.go @@ -16,6 +16,7 @@ import ( "github.com/multiversx/mx-chain-go/consensus/blacklist" "github.com/multiversx/mx-chain-go/consensus/chronology" "github.com/multiversx/mx-chain-go/consensus/spos" + "github.com/multiversx/mx-chain-go/consensus/spos/debug" "github.com/multiversx/mx-chain-go/consensus/spos/sposFactory" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/errors" @@ -177,31 +178,33 @@ func (ccf *consensusComponentsFactory) Create() (*consensusComponents, error) { } workerArgs := &spos.WorkerArgs{ - ConsensusService: consensusService, - BlockChain: ccf.dataComponents.Blockchain(), - BlockProcessor: ccf.processComponents.BlockProcessor(), - ScheduledProcessor: ccf.scheduledProcessor, - Bootstrapper: cc.bootstrapper, - BroadcastMessenger: cc.broadcastMessenger, - ConsensusState: consensusState, - ForkDetector: ccf.processComponents.ForkDetector(), - PeerSignatureHandler: ccf.cryptoComponents.PeerSignatureHandler(), - Marshalizer: marshalizer, - Hasher: ccf.coreComponents.Hasher(), - RoundHandler: ccf.processComponents.RoundHandler(), - ShardCoordinator: ccf.processComponents.ShardCoordinator(), - SyncTimer: ccf.coreComponents.SyncTimer(), - HeaderSigVerifier: ccf.processComponents.HeaderSigVerifier(), - HeaderIntegrityVerifier: ccf.processComponents.HeaderIntegrityVerifier(), - ChainID: []byte(ccf.coreComponents.ChainID()), - NetworkShardingCollector: ccf.processComponents.PeerShardMapper(), - AntifloodHandler: ccf.networkComponents.InputAntiFloodHandler(), - PoolAdder: ccf.dataComponents.Datapool().MiniBlocks(), - SignatureSize: ccf.config.ValidatorPubkeyConverter.SignatureLength, - PublicKeySize: ccf.config.ValidatorPubkeyConverter.Length, - AppStatusHandler: ccf.statusCoreComponents.AppStatusHandler(), - NodeRedundancyHandler: ccf.processComponents.NodeRedundancyHandler(), - PeerBlacklistHandler: cc.peerBlacklistHandler, + ConsensusService: consensusService, + BlockChain: ccf.dataComponents.Blockchain(), + BlockProcessor: ccf.processComponents.BlockProcessor(), + ScheduledProcessor: ccf.scheduledProcessor, + Bootstrapper: cc.bootstrapper, + BroadcastMessenger: cc.broadcastMessenger, + ConsensusState: consensusState, + ForkDetector: ccf.processComponents.ForkDetector(), + PeerSignatureHandler: ccf.cryptoComponents.PeerSignatureHandler(), + Marshalizer: marshalizer, + Hasher: ccf.coreComponents.Hasher(), + RoundHandler: ccf.processComponents.RoundHandler(), + ShardCoordinator: ccf.processComponents.ShardCoordinator(), + SyncTimer: ccf.coreComponents.SyncTimer(), + HeaderSigVerifier: ccf.processComponents.HeaderSigVerifier(), + HeaderIntegrityVerifier: ccf.processComponents.HeaderIntegrityVerifier(), + ChainID: []byte(ccf.coreComponents.ChainID()), + NetworkShardingCollector: ccf.processComponents.PeerShardMapper(), + AntifloodHandler: ccf.networkComponents.InputAntiFloodHandler(), + PoolAdder: ccf.dataComponents.Datapool().MiniBlocks(), + SignatureSize: ccf.config.ValidatorPubkeyConverter.SignatureLength, + PublicKeySize: ccf.config.ValidatorPubkeyConverter.Length, + AppStatusHandler: ccf.statusCoreComponents.AppStatusHandler(), + NodeRedundancyHandler: ccf.processComponents.NodeRedundancyHandler(), + PeerBlacklistHandler: cc.peerBlacklistHandler, + EquivalentMessagesDebugger: debug.NewEquivalentMessagesDebugger(), + EnableEpochsHandler: ccf.coreComponents.EnableEpochsHandler(), } cc.worker, err = spos.NewWorker(workerArgs) diff --git a/integrationTests/testProcessorNode.go b/integrationTests/testProcessorNode.go index 87651c2adbb..aab052767b7 100644 --- a/integrationTests/testProcessorNode.go +++ b/integrationTests/testProcessorNode.go @@ -3523,6 +3523,7 @@ func GetDefaultEnableEpochsConfig() *config.EnableEpochs { MiniBlockPartialExecutionEnableEpoch: UnreachableEpoch, FailExecutionOnEveryAPIErrorEnableEpoch: UnreachableEpoch, DynamicGasCostForDataTrieStorageLoadEnableEpoch: UnreachableEpoch, + EquivalentMessagesEnableEpoch: UnreachableEpoch, } } diff --git a/integrationTests/testProcessorNodeWithMultisigner.go b/integrationTests/testProcessorNodeWithMultisigner.go index 35c3cf059ae..a2c63e83a7c 100644 --- a/integrationTests/testProcessorNodeWithMultisigner.go +++ b/integrationTests/testProcessorNodeWithMultisigner.go @@ -237,6 +237,7 @@ func CreateNodesWithNodesCoordinatorFactory( MiniBlockPartialExecutionEnableEpoch: UnreachableEpoch, RefactorPeersMiniBlocksEnableEpoch: UnreachableEpoch, DynamicGasCostForDataTrieStorageLoadEnableEpoch: UnreachableEpoch, + EquivalentMessagesEnableEpoch: UnreachableEpoch, } nodesMap := make(map[uint32][]*TestProcessorNode) diff --git a/process/block/interceptedBlocks/interceptedBlockHeader.go b/process/block/interceptedBlocks/interceptedBlockHeader.go index 81d78bef5c0..cf26fa79f00 100644 --- a/process/block/interceptedBlocks/interceptedBlockHeader.go +++ b/process/block/interceptedBlocks/interceptedBlockHeader.go @@ -79,6 +79,7 @@ func (inHdr *InterceptedHeader) CheckValidity() error { return err } + // TODO[Sorin next PR]: after flag enabled, VerifySignature on previous hash with the signature and bitmap from the proof return inHdr.sigVerifier.VerifySignature(inHdr.hdr) } diff --git a/sharding/mock/enableEpochsHandlerMock.go b/sharding/mock/enableEpochsHandlerMock.go index e275c4ea165..00cc6229d73 100644 --- a/sharding/mock/enableEpochsHandlerMock.go +++ b/sharding/mock/enableEpochsHandlerMock.go @@ -46,11 +46,6 @@ func (mock *EnableEpochsHandlerMock) GetCurrentEpoch() uint32 { return mock.CurrentEpoch } -// FixGasRemainingForSaveKeyValueBuiltinFunctionEnabled - -func (mock *EnableEpochsHandlerMock) FixGasRemainingForSaveKeyValueBuiltinFunctionEnabled() bool { - return false -} - // IsInterfaceNil returns true if there is no value under the interface func (mock *EnableEpochsHandlerMock) IsInterfaceNil() bool { return mock == nil