From 450a389ea154b8fd47957eabd3ec89244491c8d8 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 2 Oct 2024 12:31:44 +0300 Subject: [PATCH 1/9] fix data race issues --- consensus/broadcast/delayedBroadcast.go | 5 ++ consensus/spos/bls/blsWorker_test.go | 4 +- consensus/spos/bls/subroundBlock.go | 4 +- consensus/spos/bls/subroundBlock_test.go | 2 +- consensus/spos/bls/subroundEndRound.go | 10 ++-- consensus/spos/bls/subroundEndRound_test.go | 4 +- consensus/spos/bls/subroundSignature.go | 7 +-- consensus/spos/bls/subroundSignature_test.go | 20 +++---- consensus/spos/bls/subroundStartRound.go | 14 ++--- consensus/spos/bls/subroundStartRound_test.go | 2 +- consensus/spos/consensusMessageValidator.go | 8 +-- .../spos/consensusMessageValidator_test.go | 10 ++-- consensus/spos/consensusState.go | 54 ++++++++++++++++++- consensus/spos/consensusState_test.go | 8 +-- consensus/spos/roundConsensus.go | 22 +++++++- consensus/spos/subround.go | 2 +- consensus/spos/subround_test.go | 2 +- consensus/spos/worker.go | 4 +- consensus/spos/worker_test.go | 6 +-- integrationTests/mock/roundHandlerMock.go | 18 ++++++- .../sync/edgeCases/edgeCases_test.go | 2 +- integrationTests/testConsensusNode.go | 11 ++-- integrationTests/testInitializer.go | 2 +- 23 files changed, 158 insertions(+), 63 deletions(-) diff --git a/consensus/broadcast/delayedBroadcast.go b/consensus/broadcast/delayedBroadcast.go index 9b76424c2b9..caead9530bf 100644 --- a/consensus/broadcast/delayedBroadcast.go +++ b/consensus/broadcast/delayedBroadcast.go @@ -174,6 +174,9 @@ func (dbb *delayedBlockBroadcaster) SetHeaderForValidator(vData *shared.Validato return spos.ErrNilHeaderHash } + dbb.mutDataForBroadcast.Lock() + defer dbb.mutDataForBroadcast.Unlock() + log.Trace("delayedBlockBroadcaster.SetHeaderForValidator", "nbDelayedBroadcastData", len(dbb.delayedBroadcastData), "nbValBroadcastData", len(dbb.valBroadcastData), @@ -188,7 +191,9 @@ func (dbb *delayedBlockBroadcaster) SetHeaderForValidator(vData *shared.Validato } duration := validatorDelayPerOrder * time.Duration(vData.Order) + dbb.valHeaderBroadcastData = append(dbb.valHeaderBroadcastData, vData) + alarmID := prefixHeaderAlarm + hex.EncodeToString(vData.HeaderHash) dbb.alarm.Add(dbb.headerAlarmExpired, duration, alarmID) log.Trace("delayedBlockBroadcaster.SetHeaderForValidator: header alarm has been set", diff --git a/consensus/spos/bls/blsWorker_test.go b/consensus/spos/bls/blsWorker_test.go index 75cc8f3b412..33e06535030 100644 --- a/consensus/spos/bls/blsWorker_test.go +++ b/consensus/spos/bls/blsWorker_test.go @@ -91,7 +91,7 @@ func initConsensusStateWithArgsVerifySignature(keysHandler consensus.KeysHandler rstatus, ) cns.Data = []byte("X") - cns.RoundIndex = 0 + cns.SetRoundIndex(0) return cns } @@ -150,7 +150,7 @@ func createConsensusStateWithNodes(eligibleNodesPubKeys map[string]struct{}, con ) cns.Data = []byte("X") - cns.RoundIndex = 0 + cns.SetRoundIndex(0) return cns } diff --git a/consensus/spos/bls/subroundBlock.go b/consensus/spos/bls/subroundBlock.go index cec1c657c41..bd567195a9e 100644 --- a/consensus/spos/bls/subroundBlock.go +++ b/consensus/spos/bls/subroundBlock.go @@ -807,7 +807,7 @@ func (sr *subroundBlock) processBlock( if err != nil { sr.printCancelRoundLogMessage(ctx, err) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) return false } @@ -849,7 +849,7 @@ func (sr *subroundBlock) computeSubroundProcessingMetric(startTime time.Time, me // doBlockConsensusCheck method checks if the consensus in the subround Block is achieved func (sr *subroundBlock) doBlockConsensusCheck() bool { - if sr.RoundCanceled { + if sr.GetRoundCanceled() { return false } diff --git a/consensus/spos/bls/subroundBlock_test.go b/consensus/spos/bls/subroundBlock_test.go index d24713cd413..ee34a1df994 100644 --- a/consensus/spos/bls/subroundBlock_test.go +++ b/consensus/spos/bls/subroundBlock_test.go @@ -1067,7 +1067,7 @@ func TestSubroundBlock_DoBlockConsensusCheckShouldReturnFalseWhenRoundIsCanceled t.Parallel() container := consensusMocks.InitConsensusCore() sr := *initSubroundBlock(nil, container, &statusHandler.AppStatusHandlerStub{}) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) assert.False(t, sr.DoBlockConsensusCheck()) } diff --git a/consensus/spos/bls/subroundEndRound.go b/consensus/spos/bls/subroundEndRound.go index 6bd52cd8adc..ef090a2719e 100644 --- a/consensus/spos/bls/subroundEndRound.go +++ b/consensus/spos/bls/subroundEndRound.go @@ -829,7 +829,7 @@ func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []by } func (sr *subroundEndRound) doEndRoundJobByParticipant(cnsDta *consensus.Message) bool { - if sr.RoundCanceled { + if sr.GetRoundCanceled() { return false } if !sr.IsConsensusDataSet() { @@ -1074,7 +1074,7 @@ func (sr *subroundEndRound) prepareBroadcastBlockDataForValidator() error { // doEndRoundConsensusCheck method checks if the consensus is achieved func (sr *subroundEndRound) doEndRoundConsensusCheck() bool { - if sr.RoundCanceled { + if sr.GetRoundCanceled() { return false } @@ -1119,7 +1119,7 @@ func (sr *subroundEndRound) isOutOfTime() bool { "round", sr.SyncTimer().FormattedCurrentTime(), sr.RoundHandler().Index(), "subround", sr.Name()) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) return true } @@ -1238,7 +1238,7 @@ func (sr *subroundEndRound) waitAllSignatures() { return } - sr.WaitingAllSignaturesTimeOut = true + sr.SetWaitAllSignaturesTimeout(true) select { case sr.ConsensusChannel() <- true: @@ -1336,7 +1336,7 @@ func (sr *subroundEndRound) checkReceivedSignatures() bool { areSignaturesCollected, numSigs := sr.areSignaturesCollected(threshold) areAllSignaturesCollected := numSigs == sr.ConsensusGroupSize() - isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.WaitingAllSignaturesTimeOut) + isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.GetWaitAllSignaturesTimeout()) isSelfJobDone := sr.IsSelfJobDone(SrSignature) diff --git a/consensus/spos/bls/subroundEndRound_test.go b/consensus/spos/bls/subroundEndRound_test.go index b435b1e9f9b..0cb71c7128e 100644 --- a/consensus/spos/bls/subroundEndRound_test.go +++ b/consensus/spos/bls/subroundEndRound_test.go @@ -749,7 +749,7 @@ func TestSubroundEndRound_DoEndRoundConsensusCheckShouldReturnFalseWhenRoundIsCa t.Parallel() sr := initSubroundEndRound(&statusHandler.AppStatusHandlerStub{}) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) ok := sr.DoEndRoundConsensusCheck() assert.False(t, ok) @@ -798,7 +798,7 @@ func TestSubroundEndRound_DoEndRoundJobByParticipant_RoundCanceledShouldReturnFa t.Parallel() sr := initSubroundEndRound(&statusHandler.AppStatusHandlerStub{}) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) cnsData := consensus.Message{} res := sr.DoEndRoundJobByParticipant(&cnsData) diff --git a/consensus/spos/bls/subroundSignature.go b/consensus/spos/bls/subroundSignature.go index f08ab7c8e27..5b27f3b45bf 100644 --- a/consensus/spos/bls/subroundSignature.go +++ b/consensus/spos/bls/subroundSignature.go @@ -238,7 +238,7 @@ func (sr *subroundSignature) receivedSignature(_ context.Context, cnsDta *consen // doSignatureConsensusCheck method checks if the consensus in the subround Signature is achieved func (sr *subroundSignature) doSignatureConsensusCheck() bool { - if sr.RoundCanceled { + if sr.GetRoundCanceled() { return false } @@ -278,7 +278,7 @@ func (sr *subroundSignature) doSignatureConsensusCheck() bool { areSignaturesCollected, numSigs := sr.areSignaturesCollected(threshold) areAllSignaturesCollected := numSigs == sr.ConsensusGroupSize() - isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.WaitingAllSignaturesTimeOut) + isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.GetWaitAllSignaturesTimeout()) isJobDoneByLeader := isSelfLeader && isSignatureCollectionDone isSelfJobDone := sr.IsSelfJobDone(sr.Current()) @@ -346,7 +346,7 @@ func (sr *subroundSignature) waitAllSignatures() { return } - sr.WaitingAllSignaturesTimeOut = true + sr.SetWaitAllSignaturesTimeout(true) select { case sr.ConsensusChannel() <- true: @@ -434,6 +434,7 @@ func (sr *subroundSignature) sendSignatureForManagedKey(idx int, pk string) bool } isCurrentManagedKeyLeader := pk == leader + // TODO[cleanup cns finality]: update the check // with the equivalent messages feature on, signatures from all managed keys must be broadcast, as the aggregation is done by any participant shouldBroadcastSignatureShare := (!isCurrentNodeMultiKeyLeader && !isFlagActive) || diff --git a/consensus/spos/bls/subroundSignature_test.go b/consensus/spos/bls/subroundSignature_test.go index bb76513bfc7..99e1cedc7f3 100644 --- a/consensus/spos/bls/subroundSignature_test.go +++ b/consensus/spos/bls/subroundSignature_test.go @@ -469,14 +469,14 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { }, }) _ = sr.SetJobDone(sr.SelfPubKey(), bls.SrSignature, false) - sr.RoundCanceled = false + sr.SetRoundCanceled(false) leader, err := sr.GetLeader() assert.Nil(t, err) sr.SetSelfPubKey(leader) r = sr.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.RoundCanceled) + assert.False(t, sr.GetRoundCanceled()) }) t.Run("with equivalent messages flag active should work", func(t *testing.T) { t.Parallel() @@ -503,7 +503,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { r := sr.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.RoundCanceled) + assert.False(t, sr.GetRoundCanceled()) assert.Nil(t, err) leaderJobDone, err := sr.JobDone(leader, bls.SrSignature) assert.NoError(t, err) @@ -589,13 +589,13 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { assert.True(t, r) _ = sr.SetJobDone(sr.SelfPubKey(), bls.SrSignature, false) - sr.RoundCanceled = false + sr.SetRoundCanceled(false) leader, err := sr.GetLeader() assert.Nil(t, err) sr.SetSelfPubKey(leader) r = srSignature.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.RoundCanceled) + assert.False(t, sr.GetRoundCanceled()) expectedMap := map[string]struct{}{ "A": {}, "B": {}, @@ -683,7 +683,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { r := srSignature.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.RoundCanceled) + assert.False(t, sr.GetRoundCanceled()) assert.True(t, sr.IsSubroundFinished(bls.SrSignature)) for _, pk := range sr.ConsensusGroup() { @@ -1259,7 +1259,7 @@ func TestSubroundSignature_DoSignatureConsensusCheckShouldReturnFalseWhenRoundIs t.Parallel() sr := *initSubroundSignature() - sr.RoundCanceled = true + sr.SetRoundCanceled(true) assert.False(t, sr.DoSignatureConsensusCheck()) } @@ -1363,7 +1363,7 @@ func testSubroundSignatureDoSignatureConsensusCheck(args argTestSubroundSignatur }, }) sr := *initSubroundSignatureWithContainer(container) - sr.WaitingAllSignaturesTimeOut = args.waitingAllSignaturesTimeOut + sr.SetWaitAllSignaturesTimeout(args.waitingAllSignaturesTimeOut) if !args.flagActive { leader, err := sr.GetLeader() @@ -1394,7 +1394,7 @@ func TestSubroundSignature_DoSignatureConsensusCheckShouldReturnFalseWhenFallbac }, }) sr := *initSubroundSignatureWithContainer(container) - sr.WaitingAllSignaturesTimeOut = false + sr.SetWaitAllSignaturesTimeout(false) leader, err := sr.GetLeader() assert.Nil(t, err) @@ -1417,7 +1417,7 @@ func TestSubroundSignature_DoSignatureConsensusCheckShouldReturnTrueWhenFallback }, }) sr := *initSubroundSignatureWithContainer(container) - sr.WaitingAllSignaturesTimeOut = true + sr.SetWaitAllSignaturesTimeout(true) leader, err := sr.GetLeader() assert.Nil(t, err) diff --git a/consensus/spos/bls/subroundStartRound.go b/consensus/spos/bls/subroundStartRound.go index 6f8c6d03908..40ea074a424 100644 --- a/consensus/spos/bls/subroundStartRound.go +++ b/consensus/spos/bls/subroundStartRound.go @@ -96,7 +96,7 @@ func (sr *subroundStartRound) SetOutportHandler(outportHandler outport.OutportHa // doStartRoundJob method does the job of the subround StartRound func (sr *subroundStartRound) doStartRoundJob(_ context.Context) bool { sr.ResetConsensusState() - sr.RoundIndex = sr.RoundHandler().Index() + sr.SetRoundIndex(sr.RoundHandler().Index()) sr.RoundTimeStamp = sr.RoundHandler().TimeStamp() topic := spos.GetConsensusTopicID(sr.ShardCoordinator()) sr.GetAntiFloodHandler().ResetForTopic(topic) @@ -114,7 +114,7 @@ func (sr *subroundStartRound) doStartRoundJob(_ context.Context) bool { // doStartRoundConsensusCheck method checks if the consensus is achieved in the subround StartRound func (sr *subroundStartRound) doStartRoundConsensusCheck() bool { - if sr.RoundCanceled { + if sr.GetRoundCanceled() { return false } @@ -143,7 +143,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { "round index", sr.RoundHandler().Index(), "error", err.Error()) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) return false } @@ -162,7 +162,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { if err != nil { log.Debug("initCurrentRound.GetLeader", "error", err.Error()) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) return false } @@ -201,7 +201,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { if err != nil { log.Debug("initCurrentRound.Reset", "error", err.Error()) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) return false } @@ -213,7 +213,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { "round", sr.SyncTimer().FormattedCurrentTime(), sr.RoundHandler().Index(), "subround", sr.Name()) - sr.RoundCanceled = true + sr.SetRoundCanceled(true) return false } @@ -313,7 +313,7 @@ func (sr *subroundStartRound) generateNextConsensusGroup(roundIndex int64) error leader, nextConsensusGroup, err := sr.GetNextConsensusGroup( randomSeed, - uint64(sr.RoundIndex), + uint64(sr.GetRoundIndex()), shardId, sr.NodesCoordinator(), currentHeader.GetEpoch(), diff --git a/consensus/spos/bls/subroundStartRound_test.go b/consensus/spos/bls/subroundStartRound_test.go index c87a678857d..93ed3e0c82d 100644 --- a/consensus/spos/bls/subroundStartRound_test.go +++ b/consensus/spos/bls/subroundStartRound_test.go @@ -304,7 +304,7 @@ func TestSubroundStartRound_DoStartRoundConsensusCheckShouldReturnFalseWhenRound sr := *initSubroundStartRound() - sr.RoundCanceled = true + sr.SetRoundCanceled(true) ok := sr.DoStartRoundConsensusCheck() assert.False(t, ok) diff --git a/consensus/spos/consensusMessageValidator.go b/consensus/spos/consensusMessageValidator.go index 93c6977eed9..cdcf507cbbf 100644 --- a/consensus/spos/consensusMessageValidator.go +++ b/consensus/spos/consensusMessageValidator.go @@ -159,13 +159,13 @@ func (cmv *consensusMessageValidator) checkConsensusMessageValidity(cnsMsg *cons msgType := consensus.MessageType(cnsMsg.MsgType) - if cmv.consensusState.RoundIndex+1 < cnsMsg.RoundIndex { + if cmv.consensusState.GetRoundIndex()+1 < cnsMsg.RoundIndex { log.Trace("received message from consensus topic has a future round", "msg type", cmv.consensusService.GetStringValue(msgType), "from", cnsMsg.PubKey, "header hash", cnsMsg.BlockHeaderHash, "msg round", cnsMsg.RoundIndex, - "round", cmv.consensusState.RoundIndex, + "round", cmv.consensusState.GetRoundIndex(), ) return fmt.Errorf("%w : received message from consensus topic has a future round: %d", @@ -173,13 +173,13 @@ func (cmv *consensusMessageValidator) checkConsensusMessageValidity(cnsMsg *cons cnsMsg.RoundIndex) } - if cmv.consensusState.RoundIndex > cnsMsg.RoundIndex { + if cmv.consensusState.GetRoundIndex() > cnsMsg.RoundIndex { log.Trace("received message from consensus topic has a past round", "msg type", cmv.consensusService.GetStringValue(msgType), "from", cnsMsg.PubKey, "header hash", cnsMsg.BlockHeaderHash, "msg round", cnsMsg.RoundIndex, - "round", cmv.consensusState.RoundIndex, + "round", cmv.consensusState.GetRoundIndex(), ) return fmt.Errorf("%w : received message from consensus topic has a past round: %d", diff --git a/consensus/spos/consensusMessageValidator_test.go b/consensus/spos/consensusMessageValidator_test.go index 83dbf12057b..ef46fc9b75e 100644 --- a/consensus/spos/consensusMessageValidator_test.go +++ b/consensus/spos/consensusMessageValidator_test.go @@ -765,7 +765,7 @@ func TestCheckConsensusMessageValidity_ErrMessageForPastRound(t *testing.T) { t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 100 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(100) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) @@ -788,7 +788,7 @@ func TestCheckConsensusMessageValidity_ErrMessageTypeLimitReached(t *testing.T) t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) pubKey := []byte(consensusMessageValidatorArgs.ConsensusState.ConsensusGroup()[0]) @@ -834,7 +834,7 @@ func createMockConsensusMessage(args spos.ArgsConsensusMessageValidator, pubKey MsgType: int64(msgType), PubKey: pubKey, Signature: createDummyByteSlice(SignatureSize), - RoundIndex: args.ConsensusState.RoundIndex, + RoundIndex: args.ConsensusState.GetRoundIndex(), BlockHeaderHash: createDummyByteSlice(args.HeaderHashSize), } } @@ -853,7 +853,7 @@ func TestCheckConsensusMessageValidity_InvalidSignature(t *testing.T) { consensusMessageValidatorArgs.PeerSignatureHandler = &mock.PeerSignatureHandler{ Signer: signer, } - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) @@ -876,7 +876,7 @@ func TestCheckConsensusMessageValidity_Ok(t *testing.T) { t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) diff --git a/consensus/spos/consensusState.go b/consensus/spos/consensusState.go index fa806d9c840..5c07b1f981e 100644 --- a/consensus/spos/consensusState.go +++ b/consensus/spos/consensusState.go @@ -42,6 +42,8 @@ type ConsensusState struct { *roundConsensus *roundThreshold *roundStatus + + mutState sync.RWMutex } // NewConsensusState creates a new ConsensusState object @@ -62,6 +64,54 @@ func NewConsensusState( return &cns } +// GetRoundIndex will return round index +func (cns *ConsensusState) GetRoundIndex() int64 { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + + return cns.RoundIndex +} + +// SetRoundIndex will set round index +func (cns *ConsensusState) SetRoundIndex(index int64) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + + cns.RoundIndex = index +} + +// GetRoundCanceled will return round canceled state +func (cns *ConsensusState) GetRoundCanceled() bool { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + + return cns.RoundCanceled +} + +// SetRoundCanceled will set round canceled +func (cns *ConsensusState) SetRoundCanceled(state bool) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + + cns.RoundCanceled = state +} + +// GetWaitAllSignaturesTimeout will return wait all signatures timeout state +func (cns *ConsensusState) GetWaitAllSignaturesTimeout() bool { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + + return cns.WaitingAllSignaturesTimeOut +} + +// SetWaitAllSignaturesTimeout will set wait all signatures timeout state +func (cns *ConsensusState) SetWaitAllSignaturesTimeout(state bool) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + + cns.WaitingAllSignaturesTimeOut = state +} + // ResetConsensusState method resets all the consensus data func (cns *ConsensusState) ResetConsensusState() { cns.Body = nil @@ -71,9 +121,9 @@ func (cns *ConsensusState) ResetConsensusState() { cns.initReceivedHeaders() cns.initReceivedMessagesWithSig() - cns.RoundCanceled = false + cns.SetRoundCanceled(false) cns.ExtendedCalled = false - cns.WaitingAllSignaturesTimeOut = false + cns.SetWaitAllSignaturesTimeout(false) cns.ResetRoundStatus() cns.ResetRoundState() diff --git a/consensus/spos/consensusState_test.go b/consensus/spos/consensusState_test.go index 1a0a1de6bdd..93b925b1926 100644 --- a/consensus/spos/consensusState_test.go +++ b/consensus/spos/consensusState_test.go @@ -70,13 +70,13 @@ func TestConsensusState_ResetConsensusStateShouldWork(t *testing.T) { t.Parallel() cns := internalInitConsensusState() - cns.RoundCanceled = true + cns.SetRoundCanceled(true) cns.ExtendedCalled = true - cns.WaitingAllSignaturesTimeOut = true + cns.SetWaitAllSignaturesTimeout(true) cns.ResetConsensusState() - assert.False(t, cns.RoundCanceled) + assert.False(t, cns.GetRoundCanceled()) assert.False(t, cns.ExtendedCalled) - assert.False(t, cns.WaitingAllSignaturesTimeOut) + assert.False(t, cns.GetWaitAllSignaturesTimeout()) } func TestConsensusState_IsNodeLeaderInCurrentRoundShouldReturnFalseWhenGetLeaderErr(t *testing.T) { diff --git a/consensus/spos/roundConsensus.go b/consensus/spos/roundConsensus.go index cda20e33224..734825a3000 100644 --- a/consensus/spos/roundConsensus.go +++ b/consensus/spos/roundConsensus.go @@ -66,15 +66,18 @@ func (rcns *roundConsensus) SetEligibleList(eligibleList map[string]struct{}) { // ConsensusGroup returns the consensus group ID's func (rcns *roundConsensus) ConsensusGroup() []string { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + return rcns.consensusGroup } // SetConsensusGroup sets the consensus group ID's func (rcns *roundConsensus) SetConsensusGroup(consensusGroup []string) { - rcns.consensusGroup = consensusGroup - rcns.mut.Lock() + rcns.consensusGroup = consensusGroup + rcns.validatorRoundStates = make(map[string]*roundState) for i := 0; i < len(consensusGroup); i++ { @@ -86,11 +89,17 @@ func (rcns *roundConsensus) SetConsensusGroup(consensusGroup []string) { // Leader returns the leader for the current consensus func (rcns *roundConsensus) Leader() string { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + return rcns.leader } // SetLeader sets the leader for the current consensus func (rcns *roundConsensus) SetLeader(leader string) { + rcns.mut.Lock() + defer rcns.mut.Unlock() + rcns.leader = leader } @@ -156,6 +165,9 @@ func (rcns *roundConsensus) SelfJobDone(subroundId int) (bool, error) { // IsNodeInConsensusGroup method checks if the node is part of consensus group of the current round func (rcns *roundConsensus) IsNodeInConsensusGroup(node string) bool { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + for i := 0; i < len(rcns.consensusGroup); i++ { if rcns.consensusGroup[i] == node { return true @@ -177,6 +189,9 @@ func (rcns *roundConsensus) IsNodeInEligibleList(node string) bool { // ComputeSize method returns the number of messages received from the nodes belonging to the current jobDone group // related to this subround func (rcns *roundConsensus) ComputeSize(subroundId int) int { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + n := 0 for i := 0; i < len(rcns.consensusGroup); i++ { @@ -216,6 +231,9 @@ func (rcns *roundConsensus) ResetRoundState() { // IsMultiKeyInConsensusGroup method checks if one of the nodes which are controlled by this instance // is in consensus group in the current round func (rcns *roundConsensus) IsMultiKeyInConsensusGroup() bool { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + for i := 0; i < len(rcns.consensusGroup); i++ { if rcns.IsKeyManagedBySelf([]byte(rcns.consensusGroup[i])) { return true diff --git a/consensus/spos/subround.go b/consensus/spos/subround.go index 1f06191a2c5..b576da913e5 100644 --- a/consensus/spos/subround.go +++ b/consensus/spos/subround.go @@ -150,7 +150,7 @@ func (sr *Subround) DoWork(ctx context.Context, roundHandler consensus.RoundHand } case <-time.After(roundHandler.RemainingTime(startTime, maxTime)): if sr.Extend != nil { - sr.RoundCanceled = true + sr.SetRoundCanceled(true) sr.Extend(sr.current) } diff --git a/consensus/spos/subround_test.go b/consensus/spos/subround_test.go index 2e28b9a0a9d..6e19a259756 100644 --- a/consensus/spos/subround_test.go +++ b/consensus/spos/subround_test.go @@ -89,7 +89,7 @@ func initConsensusState() *spos.ConsensusState { ) cns.Data = []byte("X") - cns.RoundIndex = 0 + cns.SetRoundIndex(0) return cns } diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index c7ec3124701..9285d617782 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -594,7 +594,7 @@ func (wrk *Worker) checkSelfState(cnsDta *consensus.Message) error { return ErrMessageFromItself } - if wrk.consensusState.RoundCanceled && wrk.consensusState.RoundIndex == cnsDta.RoundIndex { + if wrk.consensusState.GetRoundCanceled() && wrk.consensusState.GetRoundIndex() == cnsDta.RoundIndex { return ErrRoundCanceled } @@ -630,7 +630,7 @@ func (wrk *Worker) executeMessage(cnsDtaList []*consensus.Message) { if cnsDta == nil { continue } - if wrk.consensusState.RoundIndex != cnsDta.RoundIndex { + if wrk.consensusState.GetRoundIndex() != cnsDta.RoundIndex { continue } diff --git a/consensus/spos/worker_test.go b/consensus/spos/worker_test.go index b9eada158f8..0ce7d267a41 100644 --- a/consensus/spos/worker_test.go +++ b/consensus/spos/worker_test.go @@ -1146,7 +1146,7 @@ func TestWorker_ProcessReceivedMessageReceivedMessageIsFromSelfShouldRetNilAndNo func TestWorker_ProcessReceivedMessageWhenRoundIsCanceledShouldRetNilAndNotProcess(t *testing.T) { t.Parallel() wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{}) - wrk.ConsensusState().RoundCanceled = true + wrk.ConsensusState().SetRoundCanceled(true) blk := &block.Body{} blkStr, _ := mock.MarshalizerMock{}.Marshal(blk) cnsMsg := consensus.NewConsensusMessage( @@ -1441,7 +1441,7 @@ func TestWorker_CheckSelfStateShouldErrMessageFromItself(t *testing.T) { func TestWorker_CheckSelfStateShouldErrRoundCanceled(t *testing.T) { t.Parallel() wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{}) - wrk.ConsensusState().RoundCanceled = true + wrk.ConsensusState().SetRoundCanceled(true) cnsMsg := consensus.NewConsensusMessage( nil, nil, @@ -1757,7 +1757,7 @@ func TestWorker_ExtendShouldReturnWhenRoundIsCanceled(t *testing.T) { }, } wrk.SetBootstrapper(bootstrapperMock) - wrk.ConsensusState().RoundCanceled = true + wrk.ConsensusState().SetRoundCanceled(true) wrk.Extend(0) assert.False(t, executed) diff --git a/integrationTests/mock/roundHandlerMock.go b/integrationTests/mock/roundHandlerMock.go index 65a7ef5cc10..897ad105610 100644 --- a/integrationTests/mock/roundHandlerMock.go +++ b/integrationTests/mock/roundHandlerMock.go @@ -1,9 +1,14 @@ package mock -import "time" +import ( + "sync" + "time" +) // RoundHandlerMock - type RoundHandlerMock struct { + mut sync.RWMutex + IndexField int64 TimeStampField time.Time TimeDurationField time.Duration @@ -21,9 +26,20 @@ func (mock *RoundHandlerMock) BeforeGenesis() bool { // Index - func (mock *RoundHandlerMock) Index() int64 { + mock.mut.RLock() + defer mock.mut.RUnlock() + return mock.IndexField } +// SetIndex - +func (mock *RoundHandlerMock) SetIndex(index int64) { + mock.mut.Lock() + defer mock.mut.Unlock() + + mock.IndexField = index +} + // UpdateRound - func (mock *RoundHandlerMock) UpdateRound(time.Time, time.Time) { } diff --git a/integrationTests/sync/edgeCases/edgeCases_test.go b/integrationTests/sync/edgeCases/edgeCases_test.go index 285fed4dd8c..2e668a3aca8 100644 --- a/integrationTests/sync/edgeCases/edgeCases_test.go +++ b/integrationTests/sync/edgeCases/edgeCases_test.go @@ -85,7 +85,7 @@ func TestSyncMetaNodeIsSyncingReceivedHigherRoundBlockFromShard(t *testing.T) { WithSync: true, }) nodes = append(nodes, syncMetaNode) - syncMetaNode.RoundHandler.IndexField = int64(round) + syncMetaNode.RoundHandler.SetIndex(int64(round)) syncNodesSlice := []*integrationTests.TestProcessorNode{syncMetaNode} for _, n := range nodes { diff --git a/integrationTests/testConsensusNode.go b/integrationTests/testConsensusNode.go index 8651045eb7e..c34ede6de76 100644 --- a/integrationTests/testConsensusNode.go +++ b/integrationTests/testConsensusNode.go @@ -17,6 +17,8 @@ import ( mclMultiSig "github.com/multiversx/mx-chain-crypto-go/signing/mcl/multisig" "github.com/multiversx/mx-chain-crypto-go/signing/multisig" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/enablers" + "github.com/multiversx/mx-chain-go/common/forking" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/consensus/round" "github.com/multiversx/mx-chain-go/dataRetriever" @@ -44,7 +46,6 @@ import ( consensusMocks "github.com/multiversx/mx-chain-go/testscommon/consensus" "github.com/multiversx/mx-chain-go/testscommon/cryptoMocks" dataRetrieverMock "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" - "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" testFactory "github.com/multiversx/mx-chain-go/testscommon/factory" "github.com/multiversx/mx-chain-go/testscommon/genesisMocks" "github.com/multiversx/mx-chain-go/testscommon/nodeTypeProviderMock" @@ -189,7 +190,10 @@ func (tcn *TestConsensusNode) initNode(args ArgsTestConsensusNode) { consensusCache, _ := cache.NewLRUCache(10000) pkBytes, _ := tcn.NodeKeys.Pk.ToByteArray() - tcn.initNodesCoordinator(args.ConsensusSize, testHasher, epochStartRegistrationHandler, args.EligibleMap, args.WaitingMap, pkBytes, consensusCache) + genericEpochNotifier := forking.NewGenericEpochNotifier() + enableEpochsHandler, _ := enablers.NewEnableEpochsHandler(args.EnableEpochsConfig, genericEpochNotifier) + + tcn.initNodesCoordinator(args.ConsensusSize, testHasher, epochStartRegistrationHandler, args.EligibleMap, args.WaitingMap, pkBytes, consensusCache, enableEpochsHandler) tcn.MainMessenger = CreateMessengerWithNoDiscovery() tcn.FullArchiveMessenger = &p2pmocks.MessengerStub{} tcn.initBlockChain(testHasher) @@ -371,6 +375,7 @@ func (tcn *TestConsensusNode) initNodesCoordinator( waitingMap map[uint32][]nodesCoordinator.Validator, pkBytes []byte, cache storage.Cacher, + enableEpochsHandler common.EnableEpochsHandler, ) { argumentsNodesCoordinator := nodesCoordinator.ArgNodesCoordinator{ ChainParametersHandler: &chainParameters.ChainParametersHandlerStub{ @@ -395,7 +400,7 @@ func (tcn *TestConsensusNode) initNodesCoordinator( ChanStopNode: endProcess.GetDummyEndProcessChannel(), NodeTypeProvider: &nodeTypeProviderMock.NodeTypeProviderStub{}, IsFullArchive: false, - EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, + EnableEpochsHandler: enableEpochsHandler, ValidatorInfoCacher: &vic.ValidatorInfoCacherStub{}, ShardIDAsObserver: tcn.ShardCoordinator.SelfId(), GenesisNodesSetupHandler: &genesisMocks.NodesSetupStub{}, diff --git a/integrationTests/testInitializer.go b/integrationTests/testInitializer.go index 57af859a8df..2d66889917f 100644 --- a/integrationTests/testInitializer.go +++ b/integrationTests/testInitializer.go @@ -2520,7 +2520,7 @@ func emptyDataPool(sdp dataRetriever.PoolsHolder) { // UpdateRound updates the round for every node func UpdateRound(nodes []*TestProcessorNode, round uint64) { for _, n := range nodes { - n.RoundHandler.IndexField = int64(round) + n.RoundHandler.SetIndex(int64(round)) } // this delay is needed in order for the round to be properly updated in the nodes From 782078fe626d06d0d14e9a059c0e028ff2042b14 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 2 Oct 2024 12:32:21 +0300 Subject: [PATCH 2/9] fix proofs pool init --- process/block/baseProcess.go | 24 ++++++++++++++++-------- testscommon/dataRetriever/poolFactory.go | 3 +++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index d17140573c2..2bace1685cd 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -976,10 +976,14 @@ func (bp *baseProcessor) cleanupPools(headerHandler data.HeaderHandler) { highestPrevFinalBlockNonce, ) - err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce) - if err != nil { - log.Warn("%w: failed to cleanup notarized proofs behind nonce %d on shardID %d", - err, noncesToPrevFinal, bp.shardCoordinator.SelfId()) + if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { + err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce) + if err != nil { + log.Warn("failed to cleanup notarized proofs behind nonce", + "nonce", noncesToPrevFinal, + "shardID", bp.shardCoordinator.SelfId(), + "error", err) + } } if bp.shardCoordinator.SelfId() == core.MetachainShardId { @@ -1011,10 +1015,14 @@ func (bp *baseProcessor) cleanupPoolsForCrossShard( crossNotarizedHeader.GetNonce(), ) - err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal) - if err != nil { - log.Warn("%w: failed to cleanup notarized proofs behind nonce %d on shardID %d", - err, noncesToPrevFinal, shardID) + if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { + err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal) + if err != nil { + log.Warn("failed to cleanup notarized proofs behind nonce", + "nonce", noncesToPrevFinal, + "shardID", shardID, + "error", err) + } } } diff --git a/testscommon/dataRetriever/poolFactory.go b/testscommon/dataRetriever/poolFactory.go index df416a9f56a..d47b91c324e 100644 --- a/testscommon/dataRetriever/poolFactory.go +++ b/testscommon/dataRetriever/poolFactory.go @@ -225,6 +225,8 @@ func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier) heartbeatPool, err := storageunit.NewCache(cacherConfig) panicIfError("CreatePoolsHolderWithTxPool", err) + proofsPool := proofscache.NewProofsPool() + currentBlockTransactions := dataPool.NewCurrentBlockTransactionsPool() currentEpochValidatorInfo := dataPool.NewCurrentEpochValidatorInfoPool() dataPoolArgs := dataPool.DataPoolArgs{ @@ -242,6 +244,7 @@ func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier) PeerAuthentications: peerAuthPool, Heartbeats: heartbeatPool, ValidatorsInfo: validatorsInfo, + Proofs: proofsPool, } holder, err := dataPool.NewDataPool(dataPoolArgs) panicIfError("CreatePoolsHolderWithTxPool", err) From 74789b192612153079388af68f84809c1ce2fa9e Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 3 Oct 2024 13:22:44 +0300 Subject: [PATCH 3/9] added trace logs in proofs pool --- .../dataPool/proofsCache/proofsPool.go | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/dataRetriever/dataPool/proofsCache/proofsPool.go b/dataRetriever/dataPool/proofsCache/proofsPool.go index 2ae8faca4c9..b0de8e005cd 100644 --- a/dataRetriever/dataPool/proofsCache/proofsPool.go +++ b/dataRetriever/dataPool/proofsCache/proofsPool.go @@ -33,7 +33,7 @@ func (pp *proofsPool) AddProof( shardID := headerProof.GetHeaderShardId() headerHash := headerProof.GetHeaderHash() - hasProof := pp.HasProof(shardID, headerProof.GetHeaderHash()) + hasProof := pp.HasProof(shardID, headerHash) if hasProof { log.Trace("there was already a valid proof for header, headerHash: %s", headerHash) return nil @@ -48,6 +48,14 @@ func (pp *proofsPool) AddProof( pp.cache[shardID] = proofsPerShard } + log.Trace("added proof to pool", + "header hash", headerProof.GetHeaderHash(), + "epoch", headerProof.GetHeaderEpoch(), + "nonce", headerProof.GetHeaderNonce(), + "shardID", headerProof.GetHeaderShardId(), + "pubKeys bitmap", headerProof.GetPubKeysBitmap(), + ) + proofsPerShard.addProof(headerProof) return nil @@ -67,6 +75,11 @@ func (pp *proofsPool) CleanupProofsBehindNonce(shardID uint32, nonce uint64) err return fmt.Errorf("%w: proofs cache per shard not found, shard ID: %d", ErrMissingProof, shardID) } + log.Trace("cleanup proofs behind nonce", + "nonce", nonce, + "shardID", shardID, + ) + proofsPerShard.cleanupProofsBehindNonce(nonce) return nil @@ -77,9 +90,18 @@ func (pp *proofsPool) GetProof( shardID uint32, headerHash []byte, ) (data.HeaderProofHandler, error) { + if headerHash == nil { + return nil, fmt.Errorf("nil header hash") + } + pp.mutCache.RLock() defer pp.mutCache.RUnlock() + log.Trace("trying to get proof", + "headerHash", headerHash, + "shardID", shardID, + ) + proofsPerShard, ok := pp.cache[shardID] if !ok { return nil, fmt.Errorf("%w: proofs cache per shard not found, shard ID: %d", ErrMissingProof, shardID) From 01455b2da31aaee4ef4136bb74222142fd7fc7a5 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 3 Oct 2024 13:30:41 +0300 Subject: [PATCH 4/9] Revert "fix proofs pool init" This reverts commit 782078fe626d06d0d14e9a059c0e028ff2042b14. --- process/block/baseProcess.go | 24 ++++++++---------------- testscommon/dataRetriever/poolFactory.go | 3 --- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 2bace1685cd..d17140573c2 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -976,14 +976,10 @@ func (bp *baseProcessor) cleanupPools(headerHandler data.HeaderHandler) { highestPrevFinalBlockNonce, ) - if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { - err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce) - if err != nil { - log.Warn("failed to cleanup notarized proofs behind nonce", - "nonce", noncesToPrevFinal, - "shardID", bp.shardCoordinator.SelfId(), - "error", err) - } + err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce) + if err != nil { + log.Warn("%w: failed to cleanup notarized proofs behind nonce %d on shardID %d", + err, noncesToPrevFinal, bp.shardCoordinator.SelfId()) } if bp.shardCoordinator.SelfId() == core.MetachainShardId { @@ -1015,14 +1011,10 @@ func (bp *baseProcessor) cleanupPoolsForCrossShard( crossNotarizedHeader.GetNonce(), ) - if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { - err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal) - if err != nil { - log.Warn("failed to cleanup notarized proofs behind nonce", - "nonce", noncesToPrevFinal, - "shardID", shardID, - "error", err) - } + err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal) + if err != nil { + log.Warn("%w: failed to cleanup notarized proofs behind nonce %d on shardID %d", + err, noncesToPrevFinal, shardID) } } diff --git a/testscommon/dataRetriever/poolFactory.go b/testscommon/dataRetriever/poolFactory.go index d47b91c324e..df416a9f56a 100644 --- a/testscommon/dataRetriever/poolFactory.go +++ b/testscommon/dataRetriever/poolFactory.go @@ -225,8 +225,6 @@ func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier) heartbeatPool, err := storageunit.NewCache(cacherConfig) panicIfError("CreatePoolsHolderWithTxPool", err) - proofsPool := proofscache.NewProofsPool() - currentBlockTransactions := dataPool.NewCurrentBlockTransactionsPool() currentEpochValidatorInfo := dataPool.NewCurrentEpochValidatorInfoPool() dataPoolArgs := dataPool.DataPoolArgs{ @@ -244,7 +242,6 @@ func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier) PeerAuthentications: peerAuthPool, Heartbeats: heartbeatPool, ValidatorsInfo: validatorsInfo, - Proofs: proofsPool, } holder, err := dataPool.NewDataPool(dataPoolArgs) panicIfError("CreatePoolsHolderWithTxPool", err) From 9c0fa4e4a338f115ecb6856a957d471b8522be0f Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 3 Oct 2024 13:31:17 +0300 Subject: [PATCH 5/9] Revert "Revert "fix proofs pool init"" This reverts commit 01455b2da31aaee4ef4136bb74222142fd7fc7a5. --- process/block/baseProcess.go | 24 ++++++++++++++++-------- testscommon/dataRetriever/poolFactory.go | 3 +++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index d17140573c2..2bace1685cd 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -976,10 +976,14 @@ func (bp *baseProcessor) cleanupPools(headerHandler data.HeaderHandler) { highestPrevFinalBlockNonce, ) - err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce) - if err != nil { - log.Warn("%w: failed to cleanup notarized proofs behind nonce %d on shardID %d", - err, noncesToPrevFinal, bp.shardCoordinator.SelfId()) + if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { + err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce) + if err != nil { + log.Warn("failed to cleanup notarized proofs behind nonce", + "nonce", noncesToPrevFinal, + "shardID", bp.shardCoordinator.SelfId(), + "error", err) + } } if bp.shardCoordinator.SelfId() == core.MetachainShardId { @@ -1011,10 +1015,14 @@ func (bp *baseProcessor) cleanupPoolsForCrossShard( crossNotarizedHeader.GetNonce(), ) - err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal) - if err != nil { - log.Warn("%w: failed to cleanup notarized proofs behind nonce %d on shardID %d", - err, noncesToPrevFinal, shardID) + if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { + err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal) + if err != nil { + log.Warn("failed to cleanup notarized proofs behind nonce", + "nonce", noncesToPrevFinal, + "shardID", shardID, + "error", err) + } } } diff --git a/testscommon/dataRetriever/poolFactory.go b/testscommon/dataRetriever/poolFactory.go index df416a9f56a..d47b91c324e 100644 --- a/testscommon/dataRetriever/poolFactory.go +++ b/testscommon/dataRetriever/poolFactory.go @@ -225,6 +225,8 @@ func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier) heartbeatPool, err := storageunit.NewCache(cacherConfig) panicIfError("CreatePoolsHolderWithTxPool", err) + proofsPool := proofscache.NewProofsPool() + currentBlockTransactions := dataPool.NewCurrentBlockTransactionsPool() currentEpochValidatorInfo := dataPool.NewCurrentEpochValidatorInfoPool() dataPoolArgs := dataPool.DataPoolArgs{ @@ -242,6 +244,7 @@ func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier) PeerAuthentications: peerAuthPool, Heartbeats: heartbeatPool, ValidatorsInfo: validatorsInfo, + Proofs: proofsPool, } holder, err := dataPool.NewDataPool(dataPoolArgs) panicIfError("CreatePoolsHolderWithTxPool", err) From 0619c1937809aa16f1f8cea4178ad16bccd0b61a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 3 Oct 2024 13:31:41 +0300 Subject: [PATCH 6/9] Revert "fix data race issues" This reverts commit 450a389ea154b8fd47957eabd3ec89244491c8d8. --- consensus/broadcast/delayedBroadcast.go | 5 -- consensus/spos/bls/blsWorker_test.go | 4 +- consensus/spos/bls/subroundBlock.go | 4 +- consensus/spos/bls/subroundBlock_test.go | 2 +- consensus/spos/bls/subroundEndRound.go | 10 ++-- consensus/spos/bls/subroundEndRound_test.go | 4 +- consensus/spos/bls/subroundSignature.go | 7 ++- consensus/spos/bls/subroundSignature_test.go | 20 +++---- consensus/spos/bls/subroundStartRound.go | 14 ++--- consensus/spos/bls/subroundStartRound_test.go | 2 +- consensus/spos/consensusMessageValidator.go | 8 +-- .../spos/consensusMessageValidator_test.go | 10 ++-- consensus/spos/consensusState.go | 54 +------------------ consensus/spos/consensusState_test.go | 8 +-- consensus/spos/roundConsensus.go | 22 +------- consensus/spos/subround.go | 2 +- consensus/spos/subround_test.go | 2 +- consensus/spos/worker.go | 4 +- consensus/spos/worker_test.go | 6 +-- integrationTests/mock/roundHandlerMock.go | 18 +------ .../sync/edgeCases/edgeCases_test.go | 2 +- integrationTests/testConsensusNode.go | 11 ++-- integrationTests/testInitializer.go | 2 +- 23 files changed, 63 insertions(+), 158 deletions(-) diff --git a/consensus/broadcast/delayedBroadcast.go b/consensus/broadcast/delayedBroadcast.go index caead9530bf..9b76424c2b9 100644 --- a/consensus/broadcast/delayedBroadcast.go +++ b/consensus/broadcast/delayedBroadcast.go @@ -174,9 +174,6 @@ func (dbb *delayedBlockBroadcaster) SetHeaderForValidator(vData *shared.Validato return spos.ErrNilHeaderHash } - dbb.mutDataForBroadcast.Lock() - defer dbb.mutDataForBroadcast.Unlock() - log.Trace("delayedBlockBroadcaster.SetHeaderForValidator", "nbDelayedBroadcastData", len(dbb.delayedBroadcastData), "nbValBroadcastData", len(dbb.valBroadcastData), @@ -191,9 +188,7 @@ func (dbb *delayedBlockBroadcaster) SetHeaderForValidator(vData *shared.Validato } duration := validatorDelayPerOrder * time.Duration(vData.Order) - dbb.valHeaderBroadcastData = append(dbb.valHeaderBroadcastData, vData) - alarmID := prefixHeaderAlarm + hex.EncodeToString(vData.HeaderHash) dbb.alarm.Add(dbb.headerAlarmExpired, duration, alarmID) log.Trace("delayedBlockBroadcaster.SetHeaderForValidator: header alarm has been set", diff --git a/consensus/spos/bls/blsWorker_test.go b/consensus/spos/bls/blsWorker_test.go index 33e06535030..75cc8f3b412 100644 --- a/consensus/spos/bls/blsWorker_test.go +++ b/consensus/spos/bls/blsWorker_test.go @@ -91,7 +91,7 @@ func initConsensusStateWithArgsVerifySignature(keysHandler consensus.KeysHandler rstatus, ) cns.Data = []byte("X") - cns.SetRoundIndex(0) + cns.RoundIndex = 0 return cns } @@ -150,7 +150,7 @@ func createConsensusStateWithNodes(eligibleNodesPubKeys map[string]struct{}, con ) cns.Data = []byte("X") - cns.SetRoundIndex(0) + cns.RoundIndex = 0 return cns } diff --git a/consensus/spos/bls/subroundBlock.go b/consensus/spos/bls/subroundBlock.go index bd567195a9e..cec1c657c41 100644 --- a/consensus/spos/bls/subroundBlock.go +++ b/consensus/spos/bls/subroundBlock.go @@ -807,7 +807,7 @@ func (sr *subroundBlock) processBlock( if err != nil { sr.printCancelRoundLogMessage(ctx, err) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true return false } @@ -849,7 +849,7 @@ func (sr *subroundBlock) computeSubroundProcessingMetric(startTime time.Time, me // doBlockConsensusCheck method checks if the consensus in the subround Block is achieved func (sr *subroundBlock) doBlockConsensusCheck() bool { - if sr.GetRoundCanceled() { + if sr.RoundCanceled { return false } diff --git a/consensus/spos/bls/subroundBlock_test.go b/consensus/spos/bls/subroundBlock_test.go index ee34a1df994..d24713cd413 100644 --- a/consensus/spos/bls/subroundBlock_test.go +++ b/consensus/spos/bls/subroundBlock_test.go @@ -1067,7 +1067,7 @@ func TestSubroundBlock_DoBlockConsensusCheckShouldReturnFalseWhenRoundIsCanceled t.Parallel() container := consensusMocks.InitConsensusCore() sr := *initSubroundBlock(nil, container, &statusHandler.AppStatusHandlerStub{}) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true assert.False(t, sr.DoBlockConsensusCheck()) } diff --git a/consensus/spos/bls/subroundEndRound.go b/consensus/spos/bls/subroundEndRound.go index ef090a2719e..6bd52cd8adc 100644 --- a/consensus/spos/bls/subroundEndRound.go +++ b/consensus/spos/bls/subroundEndRound.go @@ -829,7 +829,7 @@ func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []by } func (sr *subroundEndRound) doEndRoundJobByParticipant(cnsDta *consensus.Message) bool { - if sr.GetRoundCanceled() { + if sr.RoundCanceled { return false } if !sr.IsConsensusDataSet() { @@ -1074,7 +1074,7 @@ func (sr *subroundEndRound) prepareBroadcastBlockDataForValidator() error { // doEndRoundConsensusCheck method checks if the consensus is achieved func (sr *subroundEndRound) doEndRoundConsensusCheck() bool { - if sr.GetRoundCanceled() { + if sr.RoundCanceled { return false } @@ -1119,7 +1119,7 @@ func (sr *subroundEndRound) isOutOfTime() bool { "round", sr.SyncTimer().FormattedCurrentTime(), sr.RoundHandler().Index(), "subround", sr.Name()) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true return true } @@ -1238,7 +1238,7 @@ func (sr *subroundEndRound) waitAllSignatures() { return } - sr.SetWaitAllSignaturesTimeout(true) + sr.WaitingAllSignaturesTimeOut = true select { case sr.ConsensusChannel() <- true: @@ -1336,7 +1336,7 @@ func (sr *subroundEndRound) checkReceivedSignatures() bool { areSignaturesCollected, numSigs := sr.areSignaturesCollected(threshold) areAllSignaturesCollected := numSigs == sr.ConsensusGroupSize() - isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.GetWaitAllSignaturesTimeout()) + isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.WaitingAllSignaturesTimeOut) isSelfJobDone := sr.IsSelfJobDone(SrSignature) diff --git a/consensus/spos/bls/subroundEndRound_test.go b/consensus/spos/bls/subroundEndRound_test.go index 0cb71c7128e..b435b1e9f9b 100644 --- a/consensus/spos/bls/subroundEndRound_test.go +++ b/consensus/spos/bls/subroundEndRound_test.go @@ -749,7 +749,7 @@ func TestSubroundEndRound_DoEndRoundConsensusCheckShouldReturnFalseWhenRoundIsCa t.Parallel() sr := initSubroundEndRound(&statusHandler.AppStatusHandlerStub{}) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true ok := sr.DoEndRoundConsensusCheck() assert.False(t, ok) @@ -798,7 +798,7 @@ func TestSubroundEndRound_DoEndRoundJobByParticipant_RoundCanceledShouldReturnFa t.Parallel() sr := initSubroundEndRound(&statusHandler.AppStatusHandlerStub{}) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true cnsData := consensus.Message{} res := sr.DoEndRoundJobByParticipant(&cnsData) diff --git a/consensus/spos/bls/subroundSignature.go b/consensus/spos/bls/subroundSignature.go index 5b27f3b45bf..f08ab7c8e27 100644 --- a/consensus/spos/bls/subroundSignature.go +++ b/consensus/spos/bls/subroundSignature.go @@ -238,7 +238,7 @@ func (sr *subroundSignature) receivedSignature(_ context.Context, cnsDta *consen // doSignatureConsensusCheck method checks if the consensus in the subround Signature is achieved func (sr *subroundSignature) doSignatureConsensusCheck() bool { - if sr.GetRoundCanceled() { + if sr.RoundCanceled { return false } @@ -278,7 +278,7 @@ func (sr *subroundSignature) doSignatureConsensusCheck() bool { areSignaturesCollected, numSigs := sr.areSignaturesCollected(threshold) areAllSignaturesCollected := numSigs == sr.ConsensusGroupSize() - isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.GetWaitAllSignaturesTimeout()) + isSignatureCollectionDone := areAllSignaturesCollected || (areSignaturesCollected && sr.WaitingAllSignaturesTimeOut) isJobDoneByLeader := isSelfLeader && isSignatureCollectionDone isSelfJobDone := sr.IsSelfJobDone(sr.Current()) @@ -346,7 +346,7 @@ func (sr *subroundSignature) waitAllSignatures() { return } - sr.SetWaitAllSignaturesTimeout(true) + sr.WaitingAllSignaturesTimeOut = true select { case sr.ConsensusChannel() <- true: @@ -434,7 +434,6 @@ func (sr *subroundSignature) sendSignatureForManagedKey(idx int, pk string) bool } isCurrentManagedKeyLeader := pk == leader - // TODO[cleanup cns finality]: update the check // with the equivalent messages feature on, signatures from all managed keys must be broadcast, as the aggregation is done by any participant shouldBroadcastSignatureShare := (!isCurrentNodeMultiKeyLeader && !isFlagActive) || diff --git a/consensus/spos/bls/subroundSignature_test.go b/consensus/spos/bls/subroundSignature_test.go index 99e1cedc7f3..bb76513bfc7 100644 --- a/consensus/spos/bls/subroundSignature_test.go +++ b/consensus/spos/bls/subroundSignature_test.go @@ -469,14 +469,14 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { }, }) _ = sr.SetJobDone(sr.SelfPubKey(), bls.SrSignature, false) - sr.SetRoundCanceled(false) + sr.RoundCanceled = false leader, err := sr.GetLeader() assert.Nil(t, err) sr.SetSelfPubKey(leader) r = sr.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.GetRoundCanceled()) + assert.False(t, sr.RoundCanceled) }) t.Run("with equivalent messages flag active should work", func(t *testing.T) { t.Parallel() @@ -503,7 +503,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { r := sr.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.GetRoundCanceled()) + assert.False(t, sr.RoundCanceled) assert.Nil(t, err) leaderJobDone, err := sr.JobDone(leader, bls.SrSignature) assert.NoError(t, err) @@ -589,13 +589,13 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { assert.True(t, r) _ = sr.SetJobDone(sr.SelfPubKey(), bls.SrSignature, false) - sr.SetRoundCanceled(false) + sr.RoundCanceled = false leader, err := sr.GetLeader() assert.Nil(t, err) sr.SetSelfPubKey(leader) r = srSignature.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.GetRoundCanceled()) + assert.False(t, sr.RoundCanceled) expectedMap := map[string]struct{}{ "A": {}, "B": {}, @@ -683,7 +683,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { r := srSignature.DoSignatureJob() assert.True(t, r) - assert.False(t, sr.GetRoundCanceled()) + assert.False(t, sr.RoundCanceled) assert.True(t, sr.IsSubroundFinished(bls.SrSignature)) for _, pk := range sr.ConsensusGroup() { @@ -1259,7 +1259,7 @@ func TestSubroundSignature_DoSignatureConsensusCheckShouldReturnFalseWhenRoundIs t.Parallel() sr := *initSubroundSignature() - sr.SetRoundCanceled(true) + sr.RoundCanceled = true assert.False(t, sr.DoSignatureConsensusCheck()) } @@ -1363,7 +1363,7 @@ func testSubroundSignatureDoSignatureConsensusCheck(args argTestSubroundSignatur }, }) sr := *initSubroundSignatureWithContainer(container) - sr.SetWaitAllSignaturesTimeout(args.waitingAllSignaturesTimeOut) + sr.WaitingAllSignaturesTimeOut = args.waitingAllSignaturesTimeOut if !args.flagActive { leader, err := sr.GetLeader() @@ -1394,7 +1394,7 @@ func TestSubroundSignature_DoSignatureConsensusCheckShouldReturnFalseWhenFallbac }, }) sr := *initSubroundSignatureWithContainer(container) - sr.SetWaitAllSignaturesTimeout(false) + sr.WaitingAllSignaturesTimeOut = false leader, err := sr.GetLeader() assert.Nil(t, err) @@ -1417,7 +1417,7 @@ func TestSubroundSignature_DoSignatureConsensusCheckShouldReturnTrueWhenFallback }, }) sr := *initSubroundSignatureWithContainer(container) - sr.SetWaitAllSignaturesTimeout(true) + sr.WaitingAllSignaturesTimeOut = true leader, err := sr.GetLeader() assert.Nil(t, err) diff --git a/consensus/spos/bls/subroundStartRound.go b/consensus/spos/bls/subroundStartRound.go index 40ea074a424..6f8c6d03908 100644 --- a/consensus/spos/bls/subroundStartRound.go +++ b/consensus/spos/bls/subroundStartRound.go @@ -96,7 +96,7 @@ func (sr *subroundStartRound) SetOutportHandler(outportHandler outport.OutportHa // doStartRoundJob method does the job of the subround StartRound func (sr *subroundStartRound) doStartRoundJob(_ context.Context) bool { sr.ResetConsensusState() - sr.SetRoundIndex(sr.RoundHandler().Index()) + sr.RoundIndex = sr.RoundHandler().Index() sr.RoundTimeStamp = sr.RoundHandler().TimeStamp() topic := spos.GetConsensusTopicID(sr.ShardCoordinator()) sr.GetAntiFloodHandler().ResetForTopic(topic) @@ -114,7 +114,7 @@ func (sr *subroundStartRound) doStartRoundJob(_ context.Context) bool { // doStartRoundConsensusCheck method checks if the consensus is achieved in the subround StartRound func (sr *subroundStartRound) doStartRoundConsensusCheck() bool { - if sr.GetRoundCanceled() { + if sr.RoundCanceled { return false } @@ -143,7 +143,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { "round index", sr.RoundHandler().Index(), "error", err.Error()) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true return false } @@ -162,7 +162,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { if err != nil { log.Debug("initCurrentRound.GetLeader", "error", err.Error()) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true return false } @@ -201,7 +201,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { if err != nil { log.Debug("initCurrentRound.Reset", "error", err.Error()) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true return false } @@ -213,7 +213,7 @@ func (sr *subroundStartRound) initCurrentRound() bool { "round", sr.SyncTimer().FormattedCurrentTime(), sr.RoundHandler().Index(), "subround", sr.Name()) - sr.SetRoundCanceled(true) + sr.RoundCanceled = true return false } @@ -313,7 +313,7 @@ func (sr *subroundStartRound) generateNextConsensusGroup(roundIndex int64) error leader, nextConsensusGroup, err := sr.GetNextConsensusGroup( randomSeed, - uint64(sr.GetRoundIndex()), + uint64(sr.RoundIndex), shardId, sr.NodesCoordinator(), currentHeader.GetEpoch(), diff --git a/consensus/spos/bls/subroundStartRound_test.go b/consensus/spos/bls/subroundStartRound_test.go index 93ed3e0c82d..c87a678857d 100644 --- a/consensus/spos/bls/subroundStartRound_test.go +++ b/consensus/spos/bls/subroundStartRound_test.go @@ -304,7 +304,7 @@ func TestSubroundStartRound_DoStartRoundConsensusCheckShouldReturnFalseWhenRound sr := *initSubroundStartRound() - sr.SetRoundCanceled(true) + sr.RoundCanceled = true ok := sr.DoStartRoundConsensusCheck() assert.False(t, ok) diff --git a/consensus/spos/consensusMessageValidator.go b/consensus/spos/consensusMessageValidator.go index cdcf507cbbf..93c6977eed9 100644 --- a/consensus/spos/consensusMessageValidator.go +++ b/consensus/spos/consensusMessageValidator.go @@ -159,13 +159,13 @@ func (cmv *consensusMessageValidator) checkConsensusMessageValidity(cnsMsg *cons msgType := consensus.MessageType(cnsMsg.MsgType) - if cmv.consensusState.GetRoundIndex()+1 < cnsMsg.RoundIndex { + if cmv.consensusState.RoundIndex+1 < cnsMsg.RoundIndex { log.Trace("received message from consensus topic has a future round", "msg type", cmv.consensusService.GetStringValue(msgType), "from", cnsMsg.PubKey, "header hash", cnsMsg.BlockHeaderHash, "msg round", cnsMsg.RoundIndex, - "round", cmv.consensusState.GetRoundIndex(), + "round", cmv.consensusState.RoundIndex, ) return fmt.Errorf("%w : received message from consensus topic has a future round: %d", @@ -173,13 +173,13 @@ func (cmv *consensusMessageValidator) checkConsensusMessageValidity(cnsMsg *cons cnsMsg.RoundIndex) } - if cmv.consensusState.GetRoundIndex() > cnsMsg.RoundIndex { + if cmv.consensusState.RoundIndex > cnsMsg.RoundIndex { log.Trace("received message from consensus topic has a past round", "msg type", cmv.consensusService.GetStringValue(msgType), "from", cnsMsg.PubKey, "header hash", cnsMsg.BlockHeaderHash, "msg round", cnsMsg.RoundIndex, - "round", cmv.consensusState.GetRoundIndex(), + "round", cmv.consensusState.RoundIndex, ) return fmt.Errorf("%w : received message from consensus topic has a past round: %d", diff --git a/consensus/spos/consensusMessageValidator_test.go b/consensus/spos/consensusMessageValidator_test.go index ef46fc9b75e..83dbf12057b 100644 --- a/consensus/spos/consensusMessageValidator_test.go +++ b/consensus/spos/consensusMessageValidator_test.go @@ -765,7 +765,7 @@ func TestCheckConsensusMessageValidity_ErrMessageForPastRound(t *testing.T) { t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(100) + consensusMessageValidatorArgs.ConsensusState.RoundIndex = 100 cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) @@ -788,7 +788,7 @@ func TestCheckConsensusMessageValidity_ErrMessageTypeLimitReached(t *testing.T) t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) + consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) pubKey := []byte(consensusMessageValidatorArgs.ConsensusState.ConsensusGroup()[0]) @@ -834,7 +834,7 @@ func createMockConsensusMessage(args spos.ArgsConsensusMessageValidator, pubKey MsgType: int64(msgType), PubKey: pubKey, Signature: createDummyByteSlice(SignatureSize), - RoundIndex: args.ConsensusState.GetRoundIndex(), + RoundIndex: args.ConsensusState.RoundIndex, BlockHeaderHash: createDummyByteSlice(args.HeaderHashSize), } } @@ -853,7 +853,7 @@ func TestCheckConsensusMessageValidity_InvalidSignature(t *testing.T) { consensusMessageValidatorArgs.PeerSignatureHandler = &mock.PeerSignatureHandler{ Signer: signer, } - consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) + consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) @@ -876,7 +876,7 @@ func TestCheckConsensusMessageValidity_Ok(t *testing.T) { t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) + consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) diff --git a/consensus/spos/consensusState.go b/consensus/spos/consensusState.go index 5c07b1f981e..fa806d9c840 100644 --- a/consensus/spos/consensusState.go +++ b/consensus/spos/consensusState.go @@ -42,8 +42,6 @@ type ConsensusState struct { *roundConsensus *roundThreshold *roundStatus - - mutState sync.RWMutex } // NewConsensusState creates a new ConsensusState object @@ -64,54 +62,6 @@ func NewConsensusState( return &cns } -// GetRoundIndex will return round index -func (cns *ConsensusState) GetRoundIndex() int64 { - cns.mutState.RLock() - defer cns.mutState.RUnlock() - - return cns.RoundIndex -} - -// SetRoundIndex will set round index -func (cns *ConsensusState) SetRoundIndex(index int64) { - cns.mutState.Lock() - defer cns.mutState.Unlock() - - cns.RoundIndex = index -} - -// GetRoundCanceled will return round canceled state -func (cns *ConsensusState) GetRoundCanceled() bool { - cns.mutState.RLock() - defer cns.mutState.RUnlock() - - return cns.RoundCanceled -} - -// SetRoundCanceled will set round canceled -func (cns *ConsensusState) SetRoundCanceled(state bool) { - cns.mutState.Lock() - defer cns.mutState.Unlock() - - cns.RoundCanceled = state -} - -// GetWaitAllSignaturesTimeout will return wait all signatures timeout state -func (cns *ConsensusState) GetWaitAllSignaturesTimeout() bool { - cns.mutState.RLock() - defer cns.mutState.RUnlock() - - return cns.WaitingAllSignaturesTimeOut -} - -// SetWaitAllSignaturesTimeout will set wait all signatures timeout state -func (cns *ConsensusState) SetWaitAllSignaturesTimeout(state bool) { - cns.mutState.Lock() - defer cns.mutState.Unlock() - - cns.WaitingAllSignaturesTimeOut = state -} - // ResetConsensusState method resets all the consensus data func (cns *ConsensusState) ResetConsensusState() { cns.Body = nil @@ -121,9 +71,9 @@ func (cns *ConsensusState) ResetConsensusState() { cns.initReceivedHeaders() cns.initReceivedMessagesWithSig() - cns.SetRoundCanceled(false) + cns.RoundCanceled = false cns.ExtendedCalled = false - cns.SetWaitAllSignaturesTimeout(false) + cns.WaitingAllSignaturesTimeOut = false cns.ResetRoundStatus() cns.ResetRoundState() diff --git a/consensus/spos/consensusState_test.go b/consensus/spos/consensusState_test.go index 93b925b1926..1a0a1de6bdd 100644 --- a/consensus/spos/consensusState_test.go +++ b/consensus/spos/consensusState_test.go @@ -70,13 +70,13 @@ func TestConsensusState_ResetConsensusStateShouldWork(t *testing.T) { t.Parallel() cns := internalInitConsensusState() - cns.SetRoundCanceled(true) + cns.RoundCanceled = true cns.ExtendedCalled = true - cns.SetWaitAllSignaturesTimeout(true) + cns.WaitingAllSignaturesTimeOut = true cns.ResetConsensusState() - assert.False(t, cns.GetRoundCanceled()) + assert.False(t, cns.RoundCanceled) assert.False(t, cns.ExtendedCalled) - assert.False(t, cns.GetWaitAllSignaturesTimeout()) + assert.False(t, cns.WaitingAllSignaturesTimeOut) } func TestConsensusState_IsNodeLeaderInCurrentRoundShouldReturnFalseWhenGetLeaderErr(t *testing.T) { diff --git a/consensus/spos/roundConsensus.go b/consensus/spos/roundConsensus.go index 734825a3000..cda20e33224 100644 --- a/consensus/spos/roundConsensus.go +++ b/consensus/spos/roundConsensus.go @@ -66,18 +66,15 @@ func (rcns *roundConsensus) SetEligibleList(eligibleList map[string]struct{}) { // ConsensusGroup returns the consensus group ID's func (rcns *roundConsensus) ConsensusGroup() []string { - rcns.mut.RLock() - defer rcns.mut.RUnlock() - return rcns.consensusGroup } // SetConsensusGroup sets the consensus group ID's func (rcns *roundConsensus) SetConsensusGroup(consensusGroup []string) { - rcns.mut.Lock() - rcns.consensusGroup = consensusGroup + rcns.mut.Lock() + rcns.validatorRoundStates = make(map[string]*roundState) for i := 0; i < len(consensusGroup); i++ { @@ -89,17 +86,11 @@ func (rcns *roundConsensus) SetConsensusGroup(consensusGroup []string) { // Leader returns the leader for the current consensus func (rcns *roundConsensus) Leader() string { - rcns.mut.RLock() - defer rcns.mut.RUnlock() - return rcns.leader } // SetLeader sets the leader for the current consensus func (rcns *roundConsensus) SetLeader(leader string) { - rcns.mut.Lock() - defer rcns.mut.Unlock() - rcns.leader = leader } @@ -165,9 +156,6 @@ func (rcns *roundConsensus) SelfJobDone(subroundId int) (bool, error) { // IsNodeInConsensusGroup method checks if the node is part of consensus group of the current round func (rcns *roundConsensus) IsNodeInConsensusGroup(node string) bool { - rcns.mut.RLock() - defer rcns.mut.RUnlock() - for i := 0; i < len(rcns.consensusGroup); i++ { if rcns.consensusGroup[i] == node { return true @@ -189,9 +177,6 @@ func (rcns *roundConsensus) IsNodeInEligibleList(node string) bool { // ComputeSize method returns the number of messages received from the nodes belonging to the current jobDone group // related to this subround func (rcns *roundConsensus) ComputeSize(subroundId int) int { - rcns.mut.RLock() - defer rcns.mut.RUnlock() - n := 0 for i := 0; i < len(rcns.consensusGroup); i++ { @@ -231,9 +216,6 @@ func (rcns *roundConsensus) ResetRoundState() { // IsMultiKeyInConsensusGroup method checks if one of the nodes which are controlled by this instance // is in consensus group in the current round func (rcns *roundConsensus) IsMultiKeyInConsensusGroup() bool { - rcns.mut.RLock() - defer rcns.mut.RUnlock() - for i := 0; i < len(rcns.consensusGroup); i++ { if rcns.IsKeyManagedBySelf([]byte(rcns.consensusGroup[i])) { return true diff --git a/consensus/spos/subround.go b/consensus/spos/subround.go index b576da913e5..1f06191a2c5 100644 --- a/consensus/spos/subround.go +++ b/consensus/spos/subround.go @@ -150,7 +150,7 @@ func (sr *Subround) DoWork(ctx context.Context, roundHandler consensus.RoundHand } case <-time.After(roundHandler.RemainingTime(startTime, maxTime)): if sr.Extend != nil { - sr.SetRoundCanceled(true) + sr.RoundCanceled = true sr.Extend(sr.current) } diff --git a/consensus/spos/subround_test.go b/consensus/spos/subround_test.go index 6e19a259756..2e28b9a0a9d 100644 --- a/consensus/spos/subround_test.go +++ b/consensus/spos/subround_test.go @@ -89,7 +89,7 @@ func initConsensusState() *spos.ConsensusState { ) cns.Data = []byte("X") - cns.SetRoundIndex(0) + cns.RoundIndex = 0 return cns } diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index 9285d617782..c7ec3124701 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -594,7 +594,7 @@ func (wrk *Worker) checkSelfState(cnsDta *consensus.Message) error { return ErrMessageFromItself } - if wrk.consensusState.GetRoundCanceled() && wrk.consensusState.GetRoundIndex() == cnsDta.RoundIndex { + if wrk.consensusState.RoundCanceled && wrk.consensusState.RoundIndex == cnsDta.RoundIndex { return ErrRoundCanceled } @@ -630,7 +630,7 @@ func (wrk *Worker) executeMessage(cnsDtaList []*consensus.Message) { if cnsDta == nil { continue } - if wrk.consensusState.GetRoundIndex() != cnsDta.RoundIndex { + if wrk.consensusState.RoundIndex != cnsDta.RoundIndex { continue } diff --git a/consensus/spos/worker_test.go b/consensus/spos/worker_test.go index 0ce7d267a41..b9eada158f8 100644 --- a/consensus/spos/worker_test.go +++ b/consensus/spos/worker_test.go @@ -1146,7 +1146,7 @@ func TestWorker_ProcessReceivedMessageReceivedMessageIsFromSelfShouldRetNilAndNo func TestWorker_ProcessReceivedMessageWhenRoundIsCanceledShouldRetNilAndNotProcess(t *testing.T) { t.Parallel() wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{}) - wrk.ConsensusState().SetRoundCanceled(true) + wrk.ConsensusState().RoundCanceled = true blk := &block.Body{} blkStr, _ := mock.MarshalizerMock{}.Marshal(blk) cnsMsg := consensus.NewConsensusMessage( @@ -1441,7 +1441,7 @@ func TestWorker_CheckSelfStateShouldErrMessageFromItself(t *testing.T) { func TestWorker_CheckSelfStateShouldErrRoundCanceled(t *testing.T) { t.Parallel() wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{}) - wrk.ConsensusState().SetRoundCanceled(true) + wrk.ConsensusState().RoundCanceled = true cnsMsg := consensus.NewConsensusMessage( nil, nil, @@ -1757,7 +1757,7 @@ func TestWorker_ExtendShouldReturnWhenRoundIsCanceled(t *testing.T) { }, } wrk.SetBootstrapper(bootstrapperMock) - wrk.ConsensusState().SetRoundCanceled(true) + wrk.ConsensusState().RoundCanceled = true wrk.Extend(0) assert.False(t, executed) diff --git a/integrationTests/mock/roundHandlerMock.go b/integrationTests/mock/roundHandlerMock.go index 897ad105610..65a7ef5cc10 100644 --- a/integrationTests/mock/roundHandlerMock.go +++ b/integrationTests/mock/roundHandlerMock.go @@ -1,14 +1,9 @@ package mock -import ( - "sync" - "time" -) +import "time" // RoundHandlerMock - type RoundHandlerMock struct { - mut sync.RWMutex - IndexField int64 TimeStampField time.Time TimeDurationField time.Duration @@ -26,20 +21,9 @@ func (mock *RoundHandlerMock) BeforeGenesis() bool { // Index - func (mock *RoundHandlerMock) Index() int64 { - mock.mut.RLock() - defer mock.mut.RUnlock() - return mock.IndexField } -// SetIndex - -func (mock *RoundHandlerMock) SetIndex(index int64) { - mock.mut.Lock() - defer mock.mut.Unlock() - - mock.IndexField = index -} - // UpdateRound - func (mock *RoundHandlerMock) UpdateRound(time.Time, time.Time) { } diff --git a/integrationTests/sync/edgeCases/edgeCases_test.go b/integrationTests/sync/edgeCases/edgeCases_test.go index 2e668a3aca8..285fed4dd8c 100644 --- a/integrationTests/sync/edgeCases/edgeCases_test.go +++ b/integrationTests/sync/edgeCases/edgeCases_test.go @@ -85,7 +85,7 @@ func TestSyncMetaNodeIsSyncingReceivedHigherRoundBlockFromShard(t *testing.T) { WithSync: true, }) nodes = append(nodes, syncMetaNode) - syncMetaNode.RoundHandler.SetIndex(int64(round)) + syncMetaNode.RoundHandler.IndexField = int64(round) syncNodesSlice := []*integrationTests.TestProcessorNode{syncMetaNode} for _, n := range nodes { diff --git a/integrationTests/testConsensusNode.go b/integrationTests/testConsensusNode.go index c34ede6de76..8651045eb7e 100644 --- a/integrationTests/testConsensusNode.go +++ b/integrationTests/testConsensusNode.go @@ -17,8 +17,6 @@ import ( mclMultiSig "github.com/multiversx/mx-chain-crypto-go/signing/mcl/multisig" "github.com/multiversx/mx-chain-crypto-go/signing/multisig" "github.com/multiversx/mx-chain-go/common" - "github.com/multiversx/mx-chain-go/common/enablers" - "github.com/multiversx/mx-chain-go/common/forking" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/consensus/round" "github.com/multiversx/mx-chain-go/dataRetriever" @@ -46,6 +44,7 @@ import ( consensusMocks "github.com/multiversx/mx-chain-go/testscommon/consensus" "github.com/multiversx/mx-chain-go/testscommon/cryptoMocks" dataRetrieverMock "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" + "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" testFactory "github.com/multiversx/mx-chain-go/testscommon/factory" "github.com/multiversx/mx-chain-go/testscommon/genesisMocks" "github.com/multiversx/mx-chain-go/testscommon/nodeTypeProviderMock" @@ -190,10 +189,7 @@ func (tcn *TestConsensusNode) initNode(args ArgsTestConsensusNode) { consensusCache, _ := cache.NewLRUCache(10000) pkBytes, _ := tcn.NodeKeys.Pk.ToByteArray() - genericEpochNotifier := forking.NewGenericEpochNotifier() - enableEpochsHandler, _ := enablers.NewEnableEpochsHandler(args.EnableEpochsConfig, genericEpochNotifier) - - tcn.initNodesCoordinator(args.ConsensusSize, testHasher, epochStartRegistrationHandler, args.EligibleMap, args.WaitingMap, pkBytes, consensusCache, enableEpochsHandler) + tcn.initNodesCoordinator(args.ConsensusSize, testHasher, epochStartRegistrationHandler, args.EligibleMap, args.WaitingMap, pkBytes, consensusCache) tcn.MainMessenger = CreateMessengerWithNoDiscovery() tcn.FullArchiveMessenger = &p2pmocks.MessengerStub{} tcn.initBlockChain(testHasher) @@ -375,7 +371,6 @@ func (tcn *TestConsensusNode) initNodesCoordinator( waitingMap map[uint32][]nodesCoordinator.Validator, pkBytes []byte, cache storage.Cacher, - enableEpochsHandler common.EnableEpochsHandler, ) { argumentsNodesCoordinator := nodesCoordinator.ArgNodesCoordinator{ ChainParametersHandler: &chainParameters.ChainParametersHandlerStub{ @@ -400,7 +395,7 @@ func (tcn *TestConsensusNode) initNodesCoordinator( ChanStopNode: endProcess.GetDummyEndProcessChannel(), NodeTypeProvider: &nodeTypeProviderMock.NodeTypeProviderStub{}, IsFullArchive: false, - EnableEpochsHandler: enableEpochsHandler, + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ValidatorInfoCacher: &vic.ValidatorInfoCacherStub{}, ShardIDAsObserver: tcn.ShardCoordinator.SelfId(), GenesisNodesSetupHandler: &genesisMocks.NodesSetupStub{}, diff --git a/integrationTests/testInitializer.go b/integrationTests/testInitializer.go index 2d66889917f..57af859a8df 100644 --- a/integrationTests/testInitializer.go +++ b/integrationTests/testInitializer.go @@ -2520,7 +2520,7 @@ func emptyDataPool(sdp dataRetriever.PoolsHolder) { // UpdateRound updates the round for every node func UpdateRound(nodes []*TestProcessorNode, round uint64) { for _, n := range nodes { - n.RoundHandler.SetIndex(int64(round)) + n.RoundHandler.IndexField = int64(round) } // this delay is needed in order for the round to be properly updated in the nodes From 627b9411cb6243fcabb2d86b82d9bf0fc6b6ade6 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 3 Oct 2024 14:03:28 +0300 Subject: [PATCH 7/9] fix data race issues --- consensus/broadcast/delayedBroadcast.go | 3 +++ consensus/spos/consensusMessageValidator.go | 8 ++++---- .../spos/consensusMessageValidator_test.go | 10 +++++----- consensus/spos/consensusState.go | 20 +++++++++++++++++++ consensus/spos/consensusState_test.go | 8 ++++---- consensus/spos/roundConsensus.go | 19 ++++++++++++++++-- consensus/spos/subround_test.go | 2 +- consensus/spos/worker.go | 6 +++--- .../consensus/initializers/initializers.go | 4 ++-- 9 files changed, 59 insertions(+), 21 deletions(-) diff --git a/consensus/broadcast/delayedBroadcast.go b/consensus/broadcast/delayedBroadcast.go index 9b76424c2b9..743dd621d1f 100644 --- a/consensus/broadcast/delayedBroadcast.go +++ b/consensus/broadcast/delayedBroadcast.go @@ -174,6 +174,9 @@ func (dbb *delayedBlockBroadcaster) SetHeaderForValidator(vData *shared.Validato return spos.ErrNilHeaderHash } + dbb.mutDataForBroadcast.Lock() + defer dbb.mutDataForBroadcast.Unlock() + log.Trace("delayedBlockBroadcaster.SetHeaderForValidator", "nbDelayedBroadcastData", len(dbb.delayedBroadcastData), "nbValBroadcastData", len(dbb.valBroadcastData), diff --git a/consensus/spos/consensusMessageValidator.go b/consensus/spos/consensusMessageValidator.go index 93c6977eed9..cdcf507cbbf 100644 --- a/consensus/spos/consensusMessageValidator.go +++ b/consensus/spos/consensusMessageValidator.go @@ -159,13 +159,13 @@ func (cmv *consensusMessageValidator) checkConsensusMessageValidity(cnsMsg *cons msgType := consensus.MessageType(cnsMsg.MsgType) - if cmv.consensusState.RoundIndex+1 < cnsMsg.RoundIndex { + if cmv.consensusState.GetRoundIndex()+1 < cnsMsg.RoundIndex { log.Trace("received message from consensus topic has a future round", "msg type", cmv.consensusService.GetStringValue(msgType), "from", cnsMsg.PubKey, "header hash", cnsMsg.BlockHeaderHash, "msg round", cnsMsg.RoundIndex, - "round", cmv.consensusState.RoundIndex, + "round", cmv.consensusState.GetRoundIndex(), ) return fmt.Errorf("%w : received message from consensus topic has a future round: %d", @@ -173,13 +173,13 @@ func (cmv *consensusMessageValidator) checkConsensusMessageValidity(cnsMsg *cons cnsMsg.RoundIndex) } - if cmv.consensusState.RoundIndex > cnsMsg.RoundIndex { + if cmv.consensusState.GetRoundIndex() > cnsMsg.RoundIndex { log.Trace("received message from consensus topic has a past round", "msg type", cmv.consensusService.GetStringValue(msgType), "from", cnsMsg.PubKey, "header hash", cnsMsg.BlockHeaderHash, "msg round", cnsMsg.RoundIndex, - "round", cmv.consensusState.RoundIndex, + "round", cmv.consensusState.GetRoundIndex(), ) return fmt.Errorf("%w : received message from consensus topic has a past round: %d", diff --git a/consensus/spos/consensusMessageValidator_test.go b/consensus/spos/consensusMessageValidator_test.go index 83dbf12057b..ef46fc9b75e 100644 --- a/consensus/spos/consensusMessageValidator_test.go +++ b/consensus/spos/consensusMessageValidator_test.go @@ -765,7 +765,7 @@ func TestCheckConsensusMessageValidity_ErrMessageForPastRound(t *testing.T) { t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 100 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(100) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) @@ -788,7 +788,7 @@ func TestCheckConsensusMessageValidity_ErrMessageTypeLimitReached(t *testing.T) t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) pubKey := []byte(consensusMessageValidatorArgs.ConsensusState.ConsensusGroup()[0]) @@ -834,7 +834,7 @@ func createMockConsensusMessage(args spos.ArgsConsensusMessageValidator, pubKey MsgType: int64(msgType), PubKey: pubKey, Signature: createDummyByteSlice(SignatureSize), - RoundIndex: args.ConsensusState.RoundIndex, + RoundIndex: args.ConsensusState.GetRoundIndex(), BlockHeaderHash: createDummyByteSlice(args.HeaderHashSize), } } @@ -853,7 +853,7 @@ func TestCheckConsensusMessageValidity_InvalidSignature(t *testing.T) { consensusMessageValidatorArgs.PeerSignatureHandler = &mock.PeerSignatureHandler{ Signer: signer, } - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) @@ -876,7 +876,7 @@ func TestCheckConsensusMessageValidity_Ok(t *testing.T) { t.Parallel() consensusMessageValidatorArgs := createDefaultConsensusMessageValidatorArgs() - consensusMessageValidatorArgs.ConsensusState.RoundIndex = 10 + consensusMessageValidatorArgs.ConsensusState.SetRoundIndex(10) cmv, _ := spos.NewConsensusMessageValidator(consensusMessageValidatorArgs) headerBytes := make([]byte, 100) diff --git a/consensus/spos/consensusState.go b/consensus/spos/consensusState.go index a7a8ee3de65..8904717b7ea 100644 --- a/consensus/spos/consensusState.go +++ b/consensus/spos/consensusState.go @@ -42,6 +42,8 @@ type ConsensusState struct { *roundConsensus *roundThreshold *roundStatus + + mutState sync.RWMutex } // NewConsensusState creates a new ConsensusState object @@ -392,21 +394,33 @@ func (cns *ConsensusState) ResetRoundsWithoutReceivedMessages(pkBytes []byte, pi // GetRoundCanceled returns the state of the current round func (cns *ConsensusState) GetRoundCanceled() bool { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + return cns.RoundCanceled } // SetRoundCanceled sets the state of the current round func (cns *ConsensusState) SetRoundCanceled(roundCanceled bool) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + cns.RoundCanceled = roundCanceled } // GetRoundIndex returns the index of the current round func (cns *ConsensusState) GetRoundIndex() int64 { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + return cns.RoundIndex } // SetRoundIndex sets the index of the current round func (cns *ConsensusState) SetRoundIndex(roundIndex int64) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + cns.RoundIndex = roundIndex } @@ -447,11 +461,17 @@ func (cns *ConsensusState) GetHeader() data.HeaderHandler { // GetWaitingAllSignaturesTimeOut returns the state of the waiting all signatures time out func (cns *ConsensusState) GetWaitingAllSignaturesTimeOut() bool { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + return cns.WaitingAllSignaturesTimeOut } // SetWaitingAllSignaturesTimeOut sets the state of the waiting all signatures time out func (cns *ConsensusState) SetWaitingAllSignaturesTimeOut(waitingAllSignaturesTimeOut bool) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + cns.WaitingAllSignaturesTimeOut = waitingAllSignaturesTimeOut } diff --git a/consensus/spos/consensusState_test.go b/consensus/spos/consensusState_test.go index 1a0a1de6bdd..6125c4091c4 100644 --- a/consensus/spos/consensusState_test.go +++ b/consensus/spos/consensusState_test.go @@ -70,12 +70,12 @@ func TestConsensusState_ResetConsensusStateShouldWork(t *testing.T) { t.Parallel() cns := internalInitConsensusState() - cns.RoundCanceled = true - cns.ExtendedCalled = true - cns.WaitingAllSignaturesTimeOut = true + cns.SetRoundCanceled(true) + cns.SetExtendedCalled(true) + cns.SetWaitingAllSignaturesTimeOut(true) cns.ResetConsensusState() assert.False(t, cns.RoundCanceled) - assert.False(t, cns.ExtendedCalled) + assert.False(t, cns.GetExtendedCalled()) assert.False(t, cns.WaitingAllSignaturesTimeOut) } diff --git a/consensus/spos/roundConsensus.go b/consensus/spos/roundConsensus.go index 503eb0b2a2a..dfe6eb88d29 100644 --- a/consensus/spos/roundConsensus.go +++ b/consensus/spos/roundConsensus.go @@ -66,15 +66,18 @@ func (rcns *roundConsensus) SetEligibleList(eligibleList map[string]struct{}) { // ConsensusGroup returns the consensus group ID's func (rcns *roundConsensus) ConsensusGroup() []string { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + return rcns.consensusGroup } // SetConsensusGroup sets the consensus group ID's func (rcns *roundConsensus) SetConsensusGroup(consensusGroup []string) { - rcns.consensusGroup = consensusGroup - rcns.mut.Lock() + rcns.consensusGroup = consensusGroup + rcns.validatorRoundStates = make(map[string]*roundState) for i := 0; i < len(consensusGroup); i++ { @@ -86,11 +89,17 @@ func (rcns *roundConsensus) SetConsensusGroup(consensusGroup []string) { // Leader returns the leader for the current consensus func (rcns *roundConsensus) Leader() string { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + return rcns.leader } // SetLeader sets the leader for the current consensus func (rcns *roundConsensus) SetLeader(leader string) { + rcns.mut.Lock() + defer rcns.mut.Unlock() + rcns.leader = leader } @@ -156,6 +165,9 @@ func (rcns *roundConsensus) SelfJobDone(subroundId int) (bool, error) { // IsNodeInConsensusGroup method checks if the node is part of consensus group of the current round func (rcns *roundConsensus) IsNodeInConsensusGroup(node string) bool { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + for i := 0; i < len(rcns.consensusGroup); i++ { if rcns.consensusGroup[i] == node { return true @@ -177,6 +189,9 @@ func (rcns *roundConsensus) IsNodeInEligibleList(node string) bool { // ComputeSize method returns the number of messages received from the nodes belonging to the current jobDone group // related to this subround func (rcns *roundConsensus) ComputeSize(subroundId int) int { + rcns.mut.RLock() + defer rcns.mut.RUnlock() + n := 0 for i := 0; i < len(rcns.consensusGroup); i++ { diff --git a/consensus/spos/subround_test.go b/consensus/spos/subround_test.go index cd54782643c..8eb3e8e568d 100644 --- a/consensus/spos/subround_test.go +++ b/consensus/spos/subround_test.go @@ -90,7 +90,7 @@ func initConsensusState() *spos.ConsensusState { ) cns.Data = []byte("X") - cns.RoundIndex = 0 + cns.SetRoundIndex(0) return cns } diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index dffa665c6b9..d027ba07e0f 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -594,7 +594,7 @@ func (wrk *Worker) checkSelfState(cnsDta *consensus.Message) error { return ErrMessageFromItself } - if wrk.consensusState.RoundCanceled && wrk.consensusState.RoundIndex == cnsDta.RoundIndex { + if wrk.consensusState.GetRoundCanceled() && wrk.consensusState.GetRoundIndex() == cnsDta.RoundIndex { return ErrRoundCanceled } @@ -630,7 +630,7 @@ func (wrk *Worker) executeMessage(cnsDtaList []*consensus.Message) { if cnsDta == nil { continue } - if wrk.consensusState.RoundIndex != cnsDta.RoundIndex { + if wrk.consensusState.GetRoundIndex() != cnsDta.RoundIndex { continue } @@ -681,7 +681,7 @@ func (wrk *Worker) checkChannels(ctx context.Context) { // Extend does an extension for the subround with subroundId func (wrk *Worker) Extend(subroundId int) { - wrk.consensusState.ExtendedCalled = true + wrk.consensusState.SetExtendedCalled(true) log.Debug("extend function is called", "subround", wrk.consensusService.GetSubroundName(subroundId)) diff --git a/testscommon/consensus/initializers/initializers.go b/testscommon/consensus/initializers/initializers.go index aa3381281de..187c8f02892 100644 --- a/testscommon/consensus/initializers/initializers.go +++ b/testscommon/consensus/initializers/initializers.go @@ -92,7 +92,7 @@ func InitConsensusStateWithArgsVerifySignature(keysHandler consensus.KeysHandler rstatus, ) cns.Data = []byte("X") - cns.RoundIndex = 0 + cns.SetRoundIndex(0) return cns } @@ -151,6 +151,6 @@ func createConsensusStateWithNodes(eligibleNodesPubKeys map[string]struct{}, con ) cns.Data = []byte("X") - cns.RoundIndex = 0 + cns.SetRoundIndex(0) return cns } From bb31c75cccf6c0b8f6c63d7a57e5088b2c1defd9 Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 3 Oct 2024 17:40:35 +0300 Subject: [PATCH 8/9] Update process/block/baseProcess.go Co-authored-by: Sorin Stanculeanu <34831323+sstanculeanu@users.noreply.github.com> --- process/block/baseProcess.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 2bace1685cd..84872710c3c 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -976,7 +976,7 @@ func (bp *baseProcessor) cleanupPools(headerHandler data.HeaderHandler) { highestPrevFinalBlockNonce, ) - if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { + if bp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, headerHandler.GetEpoch()) { err := bp.dataPool.Proofs().CleanupProofsBehindNonce(bp.shardCoordinator.SelfId(), highestPrevFinalBlockNonce) if err != nil { log.Warn("failed to cleanup notarized proofs behind nonce", From c908cb2da03a20d136a27edd1714d8c09ea5187d Mon Sep 17 00:00:00 2001 From: Darius Date: Thu, 3 Oct 2024 17:40:42 +0300 Subject: [PATCH 9/9] Update process/block/baseProcess.go Co-authored-by: Sorin Stanculeanu <34831323+sstanculeanu@users.noreply.github.com> --- process/block/baseProcess.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 84872710c3c..5ddb0608b1e 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -1015,7 +1015,7 @@ func (bp *baseProcessor) cleanupPoolsForCrossShard( crossNotarizedHeader.GetNonce(), ) - if bp.enableEpochsHandler.IsFlagEnabled(common.EquivalentMessagesFlag) { + if bp.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, crossNotarizedHeader.GetEpoch()) { err = bp.dataPool.Proofs().CleanupProofsBehindNonce(shardID, noncesToPrevFinal) if err != nil { log.Warn("failed to cleanup notarized proofs behind nonce",