Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,26 @@ type headerDataForValidator struct {
}

type delayedBlockBroadcaster struct {
alarm timersScheduler
interceptorsContainer process.InterceptorsContainer
shardCoordinator sharding.Coordinator
headersSubscriber consensus.HeadersPoolSubscriber
valHeaderBroadcastData []*shared.ValidatorHeaderBroadcastData
valBroadcastData []*shared.DelayedBroadcastData
delayedBroadcastData []*shared.DelayedBroadcastData
maxDelayCacheSize uint32
maxValidatorDelayCacheSize uint32
mutDataForBroadcast sync.RWMutex
broadcastMiniblocksData func(mbData map[uint32][]byte, pkBytes []byte) error
broadcastTxsData func(txData map[string][][]byte, pkBytes []byte) error
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
alarm timersScheduler
interceptorsContainer process.InterceptorsContainer
shardCoordinator sharding.Coordinator
headersSubscriber consensus.HeadersPoolSubscriber
valHeaderBroadcastData []*shared.ValidatorHeaderBroadcastData
valBroadcastData []*shared.DelayedBroadcastData
delayedBroadcastData []*shared.DelayedBroadcastData
maxDelayCacheSize uint32
maxValidatorDelayCacheSize uint32
mutDataForBroadcast sync.RWMutex
broadcastMiniblocksData func(mbData map[uint32][]byte, pkBytes []byte) error
broadcastTxsData func(txData map[string][][]byte, pkBytes []byte) error
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
broadcastConsensusMessage func(message *consensus.Message) error
cacheHeaders storage.Cacher
mutHeadersCache sync.RWMutex
cacheHeaders storage.Cacher
mutHeadersCache sync.RWMutex
config config.ConsensusGradualBroadcastConfig
mutBroadcastConsensusMessage sync.RWMutex
valBroadcastConsensusMessage map[string]*consensus.Message
cacheConsensusMessages storage.Cacher

}

// NewDelayedBlockBroadcaster create a new instance of a delayed block data broadcaster
Expand Down Expand Up @@ -103,19 +102,19 @@ func NewDelayedBlockBroadcaster(args *ArgsDelayedBlockBroadcaster) (*delayedBloc
}

dbb := &delayedBlockBroadcaster{
alarm: args.AlarmScheduler,
shardCoordinator: args.ShardCoordinator,
interceptorsContainer: args.InterceptorsContainer,
headersSubscriber: args.HeadersSubscriber,
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
alarm: args.AlarmScheduler,
shardCoordinator: args.ShardCoordinator,
interceptorsContainer: args.InterceptorsContainer,
headersSubscriber: args.HeadersSubscriber,
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
valBroadcastConsensusMessage: make(map[string]*consensus.Message, 0),
maxDelayCacheSize: args.LeaderCacheSize,
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
mutDataForBroadcast: sync.RWMutex{},
cacheHeaders: cacheHeaders,
mutHeadersCache: sync.RWMutex{},
maxDelayCacheSize: args.LeaderCacheSize,
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
mutDataForBroadcast: sync.RWMutex{},
cacheHeaders: cacheHeaders,
mutHeadersCache: sync.RWMutex{},
config: args.Config,
cacheConsensusMessages: cacheConsensusMessages,
}
Expand Down Expand Up @@ -674,7 +673,15 @@ func (dbb *delayedBlockBroadcaster) interceptedHeader(_ string, headerHash []byt
dbb.cacheHeaders.Put(headerHash, struct{}{}, 0)
dbb.mutHeadersCache.Unlock()

aggSig, bitmap := headerHandler.GetPreviousAggregatedSignatureAndBitmap()
// TODO: should be handled from interceptor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was previously used by validators in consensus to find out if the leader propagates the block with final data on, and if not to do the propagation themselves in the predefined order.

This won't be needed anymore as the block in its final form is propagated at the beginning of the round, and if not propagated, then there is nothing to do.

So after equivalent proofs activation, this method will not be needed.

proof := headerHandler.GetPreviousProof()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for further PRs(probably for myself): this method should be either called from interceptor, either the logic should be moved on the header interceptor, as the header will be broadcast on a new topic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added TODO for this


var aggSig, bitmap []byte
if proof != nil {
aggSig, bitmap = proof.GetAggregatedSignature(), proof.GetPubKeysBitmap()
}

// TODO: add common check for verifying proof validity
isFinalInfo := len(aggSig) > 0 && len(bitmap) > 0
if isFinalInfo {
dbb.cacheConsensusMessages.Put(headerHash, struct{}{}, 0)
Expand Down
31 changes: 17 additions & 14 deletions consensus/broadcast/delayedBroadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,24 @@ func createValidatorDelayArgs(index int) *validatorDelayArgs {
iStr := strconv.Itoa(index)
return &validatorDelayArgs{
headerHash: []byte("header hash" + iStr),
header: &block.Header{
PrevRandSeed: []byte("prev rand seed" + iStr),
Round: uint64(0),
MiniBlockHeaders: []block.MiniBlockHeader{
{
Hash: []byte("miniBlockHash0" + iStr),
SenderShardID: 0,
ReceiverShardID: 0,
},
{
Hash: []byte("miniBlockHash1" + iStr),
SenderShardID: 0,
ReceiverShardID: 1,
header: &block.HeaderV2{
Header: &block.Header{
PrevRandSeed: []byte("prev rand seed" + iStr),
Round: uint64(0),
MiniBlockHeaders: []block.MiniBlockHeader{
{
Hash: []byte("miniBlockHash0" + iStr),
SenderShardID: 0,
ReceiverShardID: 0,
},
{
Hash: []byte("miniBlockHash1" + iStr),
SenderShardID: 0,
ReceiverShardID: 1,
},
},
},
PreviousHeaderProof: &block.HeaderProof{},
},
miniBlocks: map[uint32][]byte{0: []byte("miniblock data sh0" + iStr), 1: []byte("miniblock data sh1" + iStr)},
miniBlockHashes: map[string]map[string]struct{}{"txBlockBodies_0": {"miniBlockHash0" + iStr: struct{}{}}, "txBlockBodies_0_1": {"miniBlockHash1" + iStr: struct{}{}}},
Expand Down Expand Up @@ -1516,7 +1519,7 @@ func TestDelayedBlockBroadcaster_SetFinalConsensusMessageForValidator(t *testing
providedHash := []byte("hdr hash")
dbb.InterceptedHeaderData("", providedHash, &block.HeaderV2{
Header: &block.Header{},
PreviousHeaderProof: &block.PreviousHeaderProof{
PreviousHeaderProof: &block.HeaderProof{
PubKeysBitmap: []byte("bitmap"),
AggregatedSignature: []byte("agg sig"),
},
Expand Down
8 changes: 8 additions & 0 deletions consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,11 @@ type KeysHandler interface {
GetRedundancyStepInReason() string
IsInterfaceNil() bool
}

// EquivalentProofsPool defines the behaviour of a proofs pool components
type EquivalentProofsPool interface {
AddProof(headerProof data.HeaderProofHandler) error
GetProof(shardID uint32, headerHash []byte) (data.HeaderProofHandler, error)
HasProof(shardID uint32, headerHash []byte) bool
IsInterfaceNil() bool
}
8 changes: 0 additions & 8 deletions consensus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package consensus

import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data"
)

// MessageType specifies what type of message was received
Expand Down Expand Up @@ -43,10 +42,3 @@ func NewConsensusMessage(
InvalidSigners: invalidSigners,
}
}

// EquivalentMessageInfo holds information about an equivalent message
type EquivalentMessageInfo struct {
NumMessages uint64
Validated bool
Proof data.HeaderProof
}
26 changes: 0 additions & 26 deletions consensus/mock/sposWorkerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ type SposWorkerMock struct {
ReceivedHeaderCalled func(headerHandler data.HeaderHandler, headerHash []byte)
SetAppStatusHandlerCalled func(ash core.AppStatusHandler) error
ResetConsensusMessagesCalled func(currentHash []byte, prevHash []byte)
HasEquivalentMessageCalled func(headerHash []byte) bool
GetEquivalentProofCalled func(headerHash []byte) (data.HeaderProof, error)
SetValidEquivalentProofCalled func(headerHash []byte, proof data.HeaderProof)
}

// AddReceivedMessageCall -
Expand Down Expand Up @@ -113,29 +110,6 @@ func (sposWorkerMock *SposWorkerMock) ResetConsensusMessages(currentHash []byte,
}
}

// HasEquivalentMessage -
func (sposWorkerMock *SposWorkerMock) HasEquivalentMessage(headerHash []byte) bool {
if sposWorkerMock.HasEquivalentMessageCalled != nil {
return sposWorkerMock.HasEquivalentMessageCalled(headerHash)
}
return false
}

// GetEquivalentProof -
func (sposWorkerMock *SposWorkerMock) GetEquivalentProof(headerHash []byte) (data.HeaderProof, error) {
if sposWorkerMock.GetEquivalentProofCalled != nil {
return sposWorkerMock.GetEquivalentProofCalled(headerHash)
}
return data.HeaderProof{}, nil
}

// SetValidEquivalentProof -
func (sposWorkerMock *SposWorkerMock) SetValidEquivalentProof(headerHash []byte, proof data.HeaderProof) {
if sposWorkerMock.SetValidEquivalentProofCalled != nil {
sposWorkerMock.SetValidEquivalentProofCalled(headerHash, proof)
}
}

// IsInterfaceNil returns true if there is no value under the interface
func (sposWorkerMock *SposWorkerMock) IsInterfaceNil() bool {
return sposWorkerMock == nil
Expand Down
43 changes: 32 additions & 11 deletions consensus/spos/bls/subroundBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/consensus"
"github.com/multiversx/mx-chain-go/consensus/spos"
Expand Down Expand Up @@ -472,9 +473,13 @@ func (sr *subroundBlock) addProofOnHeader(header data.HeaderHandler) bool {
return true
}

prevBlockProof := sr.Blockchain().GetCurrentHeaderProof()
prevBlockProof, err := sr.EquivalentProofsPool().GetProof(sr.ShardCoordinator().SelfId(), sr.GetData())
if err != nil {
return false
}

if !isProofEmpty(prevBlockProof) {
header.SetPreviousAggregatedSignatureAndBitmap(prevBlockProof.AggregatedSignature, prevBlockProof.PubKeysBitmap)
header.SetPreviousProof(prevBlockProof)
return true
}

Expand All @@ -491,16 +496,26 @@ func (sr *subroundBlock) addProofOnHeader(header data.HeaderHandler) bool {

isFlagEnabledForCurrentHeader := sr.EnableEpochsHandler().IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, currentHeader.GetEpoch())
if !isFlagEnabledForCurrentHeader {
header.SetPreviousAggregatedSignatureAndBitmap(currentHeader.GetSignature(), currentHeader.GetPubKeysBitmap())
proof := &block.HeaderProof{
PubKeysBitmap: currentHeader.GetSignature(),
AggregatedSignature: currentHeader.GetPubKeysBitmap(),
HeaderHash: sr.Blockchain().GetCurrentBlockHeaderHash(),
HeaderEpoch: currentHeader.GetEpoch(),
HeaderNonce: currentHeader.GetNonce(),
HeaderShardId: currentHeader.GetShardID(),
}
header.SetPreviousProof(proof)
return true
}

log.Debug("leader after sync, no proof for current header, will wait one round")
return false
}

func isProofEmpty(proof data.HeaderProof) bool {
return len(proof.AggregatedSignature) == 0 || len(proof.PubKeysBitmap) == 0
func isProofEmpty(proof data.HeaderProofHandler) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should only be an extra validation right?
I mean we only add proofs to the proof tracker if the proof is valid, so if it is already present, then it should have already been validated.

return len(proof.GetAggregatedSignature()) == 0 ||
len(proof.GetPubKeysBitmap()) == 0 ||
len(proof.GetHeaderHash()) == 0
}

// receivedBlockBodyAndHeader method is called when a block body and a block header is received
Expand Down Expand Up @@ -587,17 +602,23 @@ func (sr *subroundBlock) saveProofForPreviousHeaderIfNeeded() {
return
}

proof := sr.Blockchain().GetCurrentHeaderProof()
proof, err := sr.EquivalentProofsPool().GetProof(sr.ShardCoordinator().SelfId(), sr.GetData())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method - saveProofForPreviousHeaderIfNeeded is required here, as it is already saved on the regular interceptor.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, should not be needed anymore

if err != nil {
log.Debug("saveProofForPreviousHeaderIfNeeded: do not set proof since it was not found")
return
}

if !isProofEmpty(proof) {
log.Debug("saveProofForPreviousHeaderIfNeeded: no need to set proof since it is already saved")
return
}

prevAggSig, prevBitmap := sr.Header.GetPreviousAggregatedSignatureAndBitmap()
proof = data.HeaderProof{
AggregatedSignature: prevAggSig,
PubKeysBitmap: prevBitmap,
proof = sr.Header.GetPreviousProof()
err = sr.EquivalentProofsPool().AddProof(proof)
if err != nil {
log.Debug("saveProofForPreviousHeaderIfNeeded: failed to add proof, %w", err)
return
}
sr.Blockchain().SetCurrentHeaderProof(proof)
}

func (sr *subroundBlock) saveLeaderSignature(nodeKey []byte, signature []byte) error {
Expand Down
45 changes: 28 additions & 17 deletions consensus/spos/bls/subroundBlock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/multiversx/mx-chain-go/consensus/spos/bls"
"github.com/multiversx/mx-chain-go/testscommon"
consensusMocks "github.com/multiversx/mx-chain-go/testscommon/consensus"
"github.com/multiversx/mx-chain-go/testscommon/dataRetriever"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/hashingMocks"
"github.com/multiversx/mx-chain-go/testscommon/statusHandler"
Expand Down Expand Up @@ -475,14 +476,19 @@ func TestSubroundBlock_DoBlockJob(t *testing.T) {
GetCurrentBlockHeaderCalled: func() data.HeaderHandler {
return providedHeadr
},
GetCurrentHeaderProofCalled: func() data.HeaderProof {
return data.HeaderProof{
AggregatedSignature: providedSignature,
PubKeysBitmap: providedBitmap,
}
},
}
sr := *initSubroundBlock(chainHandler, container, &statusHandler.AppStatusHandlerStub{})
container.SetBlockchain(chainHandler)

consensusState := initConsensusStateWithNodesCoordinator(container.NodesCoordinator())
ch := make(chan bool, 1)

baseSr, _ := defaultSubroundForSRBlock(consensusState, ch, container, &statusHandler.AppStatusHandlerStub{})
srBlock, _ := bls.NewSubroundBlock(
baseSr,
bls.ProcessingThresholdPercent,
&mock.SposWorkerMock{},
)
sr := *srBlock

providedLeaderSignature := []byte("leader signature")
container.SetSigningHandler(&consensusMocks.SigningHandlerStub{
Expand Down Expand Up @@ -529,13 +535,23 @@ func TestSubroundBlock_DoBlockJob(t *testing.T) {
container.SetRoundHandler(&mock.RoundHandlerMock{
RoundIndex: 1,
})
container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{
GetProofCalled: func(shardID uint32, headerHash []byte) (data.HeaderProofHandler, error) {
return &block.HeaderProof{
HeaderHash: headerHash,
AggregatedSignature: providedSignature,
PubKeysBitmap: providedBitmap,
}, nil
},
})

r := sr.DoBlockJob()
assert.True(t, r)
assert.Equal(t, uint64(1), sr.Header.GetNonce())

sig, bitmap := sr.Header.GetPreviousAggregatedSignatureAndBitmap()
assert.Equal(t, providedSignature, sig)
assert.Equal(t, providedBitmap, bitmap)
proof := sr.Header.GetPreviousProof()
assert.Equal(t, providedSignature, proof.GetAggregatedSignature())
assert.Equal(t, providedBitmap, proof.GetPubKeysBitmap())
})
t.Run("should work, equivalent messages flag not enabled", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -736,14 +752,10 @@ func TestSubroundBlock_ReceivedBlockBodyAndHeaderOK(t *testing.T) {
},
}
container.SetEnableEpochsHandler(enableEpochsHandler)
wasSetCurrentHeaderProofCalled := false
chainHandler := &testscommon.ChainHandlerStub{
GetCurrentBlockHeaderCalled: func() data.HeaderHandler {
return &block.HeaderV2{}
},
SetCurrentHeaderProofCalled: func(proof data.HeaderProof) {
wasSetCurrentHeaderProofCalled = true
},
}
sr := *initSubroundBlock(chainHandler, container, &statusHandler.AppStatusHandlerStub{})
blkBody := &block.Body{}
Expand All @@ -752,7 +764,7 @@ func TestSubroundBlock_ReceivedBlockBodyAndHeaderOK(t *testing.T) {
ScheduledDeveloperFees: big.NewInt(1),
ScheduledAccumulatedFees: big.NewInt(1),
ScheduledRootHash: []byte("scheduled root hash"),
PreviousHeaderProof: &block.PreviousHeaderProof{
PreviousHeaderProof: &block.HeaderProof{
PubKeysBitmap: []byte{1, 1, 1, 1},
AggregatedSignature: []byte("sig"),
},
Expand All @@ -765,7 +777,6 @@ func TestSubroundBlock_ReceivedBlockBodyAndHeaderOK(t *testing.T) {
sr.Data = nil
r := sr.ReceivedBlockBodyAndHeader(cnsMsg)
assert.True(t, r)
assert.True(t, wasSetCurrentHeaderProofCalled)
})
}

Expand Down Expand Up @@ -934,7 +945,7 @@ func TestSubroundBlock_ReceivedBlockShouldWorkWithEquivalentMessagesFlagEnabled(
ScheduledRootHash: []byte("sch root hash"),
ScheduledAccumulatedFees: big.NewInt(0),
ScheduledDeveloperFees: big.NewInt(0),
PreviousHeaderProof: &block.PreviousHeaderProof{
PreviousHeaderProof: &block.HeaderProof{
PubKeysBitmap: []byte{1, 1, 1, 1},
AggregatedSignature: []byte("sig"),
},
Expand Down
Loading