diff --git a/consensus/spos/bls/v2/subroundEndRound.go b/consensus/spos/bls/v2/subroundEndRound.go index b02d6015a07..cf23daa3ae4 100644 --- a/consensus/spos/bls/v2/subroundEndRound.go +++ b/consensus/spos/bls/v2/subroundEndRound.go @@ -11,7 +11,6 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" - "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/display" @@ -247,10 +246,36 @@ func (sr *subroundEndRound) doEndRoundJobByNode() bool { if !sr.waitForSignalSync() { return false } + sr.sendProof() } - proof, ok := sr.sendProof() - if !ok { + return sr.finalizeConfirmedBlock() +} + +func (sr *subroundEndRound) waitForProof() bool { + shardID := sr.ShardCoordinator().SelfId() + headerHash := sr.GetData() + if sr.EquivalentProofsPool().HasProof(shardID, headerHash) { + return true + } + + ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration()) + defer cancel() + + for { + select { + case <-time.After(time.Millisecond): + if sr.EquivalentProofsPool().HasProof(shardID, headerHash) { + return true + } + case <-ctx.Done(): + return false + } + } +} + +func (sr *subroundEndRound) finalizeConfirmedBlock() bool { + if !sr.waitForProof() { return false } @@ -259,14 +284,6 @@ func (sr *subroundEndRound) doEndRoundJobByNode() bool { return false } - // if proof not nil, it was created and broadcasted so it has to be added to the pool - if proof != nil { - ok := sr.EquivalentProofsPool().AddProof(proof) - if !ok { - log.Trace("doEndRoundJobByNode.AddProof", "added", ok) - } - } - sr.SetStatus(sr.Current(), spos.SsFinished) sr.worker.DisplayStatistics() @@ -281,29 +298,29 @@ func (sr *subroundEndRound) doEndRoundJobByNode() bool { return true } -func (sr *subroundEndRound) sendProof() (data.HeaderProofHandler, bool) { +func (sr *subroundEndRound) sendProof() { if !sr.shouldSendProof() { - return nil, true + return } bitmap := sr.GenerateBitmap(bls.SrSignature) err := sr.checkSignaturesValidity(bitmap) if err != nil { log.Debug("sendProof.checkSignaturesValidity", "error", err.Error()) - return nil, false + return } // Aggregate signatures, handle invalid signers and send final info if needed bitmap, sig, err := sr.aggregateSigsAndHandleInvalidSigners(bitmap) if err != nil { log.Debug("sendProof.aggregateSigsAndHandleInvalidSigners", "error", err.Error()) - return nil, false + return } ok := sr.ScheduledProcessor().IsProcessedOKWithTimeout() // placeholder for subroundEndRound.doEndRoundJobByLeader script if !ok { - return nil, false + return } roundHandler := sr.RoundHandler() @@ -311,12 +328,14 @@ func (sr *subroundEndRound) sendProof() (data.HeaderProofHandler, bool) { log.Debug("sendProof: time is out -> cancel broadcasting final info and header", "round time stamp", roundHandler.TimeStamp(), "current time", time.Now()) - return nil, false + return } // broadcast header proof - proof, err := sr.createAndBroadcastProof(sig, bitmap) - return proof, err == nil + err = sr.createAndBroadcastProof(sig, bitmap) + if err != nil { + log.Warn("sendProof.createAndBroadcastProof", "error", err.Error()) + } } func (sr *subroundEndRound) shouldSendProof() bool { @@ -524,7 +543,7 @@ func (sr *subroundEndRound) computeAggSigOnValidNodes() ([]byte, []byte, error) return bitmap, sig, nil } -func (sr *subroundEndRound) createAndBroadcastProof(signature []byte, bitmap []byte) (*block.HeaderProof, error) { +func (sr *subroundEndRound) createAndBroadcastProof(signature []byte, bitmap []byte) error { headerProof := &block.HeaderProof{ PubKeysBitmap: bitmap, AggregatedSignature: signature, @@ -538,14 +557,14 @@ func (sr *subroundEndRound) createAndBroadcastProof(signature []byte, bitmap []b err := sr.BroadcastMessenger().BroadcastEquivalentProof(headerProof, []byte(sr.SelfPubKey())) if err != nil { - return nil, err + return err } log.Debug("step 3: block header proof has been sent", "PubKeysBitmap", bitmap, "AggregateSignature", signature) - return headerProof, nil + return nil } func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []byte) { diff --git a/consensus/spos/bls/v2/subroundEndRound_test.go b/consensus/spos/bls/v2/subroundEndRound_test.go index 394df2b8d20..966d7fcbea3 100644 --- a/consensus/spos/bls/v2/subroundEndRound_test.go +++ b/consensus/spos/bls/v2/subroundEndRound_test.go @@ -572,6 +572,11 @@ func TestSubroundEndRound_DoEndRoundJobAllOK(t *testing.T) { t.Parallel() container := consensusMocks.InitConsensusCore() + container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + return true + }, + }) sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) sr.SetSelfPubKey("A") @@ -1176,6 +1181,16 @@ func TestSubroundEndRound_DoEndRoundJobByNode(t *testing.T) { t.Parallel() container := consensusMocks.InitConsensusCore() + numCalls := 0 + container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + if numCalls <= 1 { + numCalls++ + return false + } + return true + }, + }) sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{}) verifySigShareNumCalls := 0 @@ -1227,14 +1242,17 @@ func TestSubroundEndRound_DoEndRoundJobByNode(t *testing.T) { t.Run("should work with equivalent messages flag active", func(t *testing.T) { t.Parallel() - providedPrevSig := []byte("prev sig") - providedPrevBitmap := []byte{1, 1, 1, 1, 1, 1, 1, 1, 1} container := consensusMocks.InitConsensusCore() container.SetBlockchain(&testscommon.ChainHandlerStub{ GetGenesisHeaderCalled: func() data.HeaderHandler { return &block.HeaderV2{} }, }) + container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + return true + }, + }) enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { return flag == common.EquivalentMessagesFlag @@ -1242,16 +1260,6 @@ func TestSubroundEndRound_DoEndRoundJobByNode(t *testing.T) { } container.SetEnableEpochsHandler(enableEpochsHandler) - wasSetCurrentHeaderProofCalled := false - container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{ - AddProofCalled: func(headerProof data.HeaderProofHandler) bool { - wasSetCurrentHeaderProofCalled = true - require.NotEqual(t, providedPrevSig, headerProof.GetAggregatedSignature()) - require.NotEqual(t, providedPrevBitmap, headerProof.GetPubKeysBitmap()) - return true - }, - }) - ch := make(chan bool, 1) consensusState := initializers.InitConsensusState() sr, _ := spos.NewSubround( @@ -1295,7 +1303,6 @@ func TestSubroundEndRound_DoEndRoundJobByNode(t *testing.T) { r := srEndRound.DoEndRoundJobByNode() require.True(t, r) - require.True(t, wasSetCurrentHeaderProofCalled) }) } diff --git a/process/sync/baseForkDetector.go b/process/sync/baseForkDetector.go index 77f8c8841f4..3fe907d954b 100644 --- a/process/sync/baseForkDetector.go +++ b/process/sync/baseForkDetector.go @@ -760,11 +760,13 @@ func (bfd *baseForkDetector) processReceivedBlock( bfd.setHighestNonceReceived(header.GetNonce()) if state == process.BHProposed || !hasProof { + log.Trace("forkDetector.processReceivedBlock: block is proposed or has no proof", "state", state, "has proof", hasProof) return } isHeaderReceivedTooLate := bfd.isHeaderReceivedTooLate(header, state, process.BlockFinality) if isHeaderReceivedTooLate { + log.Trace("forkDetector.processReceivedBlock: block is received too late", "initial state", state) state = process.BHReceivedTooLate } @@ -778,6 +780,7 @@ func (bfd *baseForkDetector) processReceivedBlock( } if !bfd.append(hInfo) { + log.Trace("forkDetector.processReceivedBlock: header not appended", "nonce", hInfo.nonce, "hash", hInfo.hash) return } diff --git a/process/sync/metaForkDetector.go b/process/sync/metaForkDetector.go index f6a285fb3bc..cb45f2fdadc 100644 --- a/process/sync/metaForkDetector.go +++ b/process/sync/metaForkDetector.go @@ -108,7 +108,11 @@ func (mfd *metaForkDetector) doJobOnBHProcessed( _ [][]byte, ) { mfd.setFinalCheckpoint(mfd.lastCheckpoint()) - mfd.addCheckpoint(&checkpointInfo{nonce: header.GetNonce(), round: header.GetRound(), hash: headerHash}) + newCheckpoint := &checkpointInfo{nonce: header.GetNonce(), round: header.GetRound(), hash: headerHash} + mfd.addCheckpoint(newCheckpoint) + if mfd.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.GetEpoch()) { + mfd.setFinalCheckpoint(newCheckpoint) + } mfd.removePastOrInvalidRecords() } diff --git a/process/sync/shardForkDetector.go b/process/sync/shardForkDetector.go index d688c548a2e..3a4e0bde2af 100644 --- a/process/sync/shardForkDetector.go +++ b/process/sync/shardForkDetector.go @@ -112,7 +112,11 @@ func (sfd *shardForkDetector) doJobOnBHProcessed( ) { _ = sfd.appendSelfNotarizedHeaders(selfNotarizedHeaders, selfNotarizedHeadersHashes, core.MetachainShardId) sfd.computeFinalCheckpoint() - sfd.addCheckpoint(&checkpointInfo{nonce: header.GetNonce(), round: header.GetRound(), hash: headerHash}) + newCheckpoint := &checkpointInfo{nonce: header.GetNonce(), round: header.GetRound(), hash: headerHash} + sfd.addCheckpoint(newCheckpoint) + if sfd.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.GetEpoch()) { + sfd.setFinalCheckpoint(newCheckpoint) + } sfd.removePastOrInvalidRecords() }