Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions consensus/broadcast/commonMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/core/partitioning"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/hashing"
"github.com/multiversx/mx-chain-core-go/marshal"
crypto "github.com/multiversx/mx-chain-crypto-go"
Expand Down Expand Up @@ -186,11 +187,15 @@ func (cm *commonMessenger) BroadcastBlockData(
}
}

// PrepareBroadcastFinalConsensusMessage prepares the validator final info data broadcast for when its turn comes
func (cm *commonMessenger) PrepareBroadcastFinalConsensusMessage(message *consensus.Message, consensusIndex int) {
err := cm.delayedBlockBroadcaster.SetFinalConsensusMessageForValidator(message, consensusIndex)
// PrepareBroadcastEquivalentProof sets the proof into the delayed block broadcaster
func (cm *commonMessenger) PrepareBroadcastEquivalentProof(
proof *block.HeaderProof,
consensusIndex int,
pkBytes []byte,
) {
err := cm.delayedBlockBroadcaster.SetFinalProofForValidator(proof, consensusIndex, pkBytes)
if err != nil {
log.Error("commonMessenger.PrepareBroadcastFinalConsensusMessage", "error", err)
log.Error("commonMessenger.PrepareBroadcastEquivalentProof", "error", err)
}
}

Expand Down Expand Up @@ -240,3 +245,18 @@ func (cm *commonMessenger) broadcast(topic string, data []byte, pkBytes []byte)

cm.messenger.BroadcastUsingPrivateKey(topic, data, pid, skBytes)
}

func (cm *commonMessenger) broadcastEquivalentProof(proof *block.HeaderProof, pkBytes []byte, topic string) error {
if check.IfNilReflect(proof) {
return spos.ErrNilHeaderProof
}

msgProof, err := cm.marshalizer.Marshal(proof)
if err != nil {
return err
}

cm.broadcast(topic, msgProof, pkBytes)

return nil
}
157 changes: 89 additions & 68 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,33 @@ type headerDataForValidator struct {
prevRandSeed []byte
}

type validatorProof struct {
proof *block.HeaderProof
pkBytes []byte
}

type delayedBlockBroadcaster struct {
alarm timersScheduler
interceptorsContainer process.InterceptorsContainer
shardCoordinator sharding.Coordinator
headersSubscriber consensus.HeadersPoolSubscriber
valHeaderBroadcastData []*shared.ValidatorHeaderBroadcastData
valBroadcastData []*shared.DelayedBroadcastData
delayedBroadcastData []*shared.DelayedBroadcastData
maxDelayCacheSize uint32
maxValidatorDelayCacheSize uint32
mutDataForBroadcast sync.RWMutex
broadcastMiniblocksData func(mbData map[uint32][]byte, pkBytes []byte) error
broadcastTxsData func(txData map[string][][]byte, pkBytes []byte) error
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
broadcastConsensusMessage func(message *consensus.Message) error
cacheHeaders storage.Cacher
mutHeadersCache sync.RWMutex
config config.ConsensusGradualBroadcastConfig
mutBroadcastConsensusMessage sync.RWMutex
valBroadcastConsensusMessage map[string]*consensus.Message
cacheConsensusMessages storage.Cacher
alarm timersScheduler
interceptorsContainer process.InterceptorsContainer
shardCoordinator sharding.Coordinator
headersSubscriber consensus.HeadersPoolSubscriber
valHeaderBroadcastData []*shared.ValidatorHeaderBroadcastData
valBroadcastData []*shared.DelayedBroadcastData
delayedBroadcastData []*shared.DelayedBroadcastData
maxDelayCacheSize uint32
maxValidatorDelayCacheSize uint32
mutDataForBroadcast sync.RWMutex
broadcastMiniblocksData func(mbData map[uint32][]byte, pkBytes []byte) error
broadcastTxsData func(txData map[string][][]byte, pkBytes []byte) error
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
broadcastEquivalentProof func(proof *block.HeaderProof, pkBytes []byte) error
broadcastConsensusMessage func(message *consensus.Message) error
cacheHeaders storage.Cacher
mutHeadersCache sync.RWMutex
config config.ConsensusGradualBroadcastConfig
mutBroadcastFinalProof sync.RWMutex
valBroadcastFinalProof map[string]*validatorProof
cacheConsensusMessages storage.Cacher
}

// NewDelayedBlockBroadcaster create a new instance of a delayed block data broadcaster
Expand Down Expand Up @@ -102,21 +108,21 @@ func NewDelayedBlockBroadcaster(args *ArgsDelayedBlockBroadcaster) (*delayedBloc
}

dbb := &delayedBlockBroadcaster{
alarm: args.AlarmScheduler,
shardCoordinator: args.ShardCoordinator,
interceptorsContainer: args.InterceptorsContainer,
headersSubscriber: args.HeadersSubscriber,
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
valBroadcastConsensusMessage: make(map[string]*consensus.Message, 0),
maxDelayCacheSize: args.LeaderCacheSize,
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
mutDataForBroadcast: sync.RWMutex{},
cacheHeaders: cacheHeaders,
mutHeadersCache: sync.RWMutex{},
config: args.Config,
cacheConsensusMessages: cacheConsensusMessages,
alarm: args.AlarmScheduler,
shardCoordinator: args.ShardCoordinator,
interceptorsContainer: args.InterceptorsContainer,
headersSubscriber: args.HeadersSubscriber,
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
valBroadcastFinalProof: make(map[string]*validatorProof, 0),
maxDelayCacheSize: args.LeaderCacheSize,
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
mutDataForBroadcast: sync.RWMutex{},
cacheHeaders: cacheHeaders,
mutHeadersCache: sync.RWMutex{},
config: args.Config,
cacheConsensusMessages: cacheConsensusMessages,
}

dbb.headersSubscriber.RegisterHandler(dbb.headerReceived)
Expand Down Expand Up @@ -245,37 +251,50 @@ func (dbb *delayedBlockBroadcaster) SetValidatorData(broadcastData *shared.Delay
return nil
}

// SetFinalConsensusMessageForValidator sets the consensus message to be broadcast by validator when its turn comes
func (dbb *delayedBlockBroadcaster) SetFinalConsensusMessageForValidator(message *consensus.Message, consensusIndex int) error {
if message == nil {
return spos.ErrNilConsensusMessage
// SetFinalProofForValidator sets the header proof to be broadcast by validator when its turn comes
func (dbb *delayedBlockBroadcaster) SetFinalProofForValidator(
proof *block.HeaderProof,
consensusIndex int,
pkBytes []byte,
) error {
if proof == nil {
return spos.ErrNilHeaderProof
}

// set alarm only for validators that are aware that the block was finalized
if len(message.AggregateSignature) > 0 && len(message.PubKeysBitmap) > 0 {
if dbb.cacheConsensusMessages.Has(message.BlockHeaderHash) {
return nil
}
isProofValid := len(proof.AggregatedSignature) > 0 &&
len(proof.PubKeysBitmap) > 0 &&
len(proof.HeaderHash) > 0
if !isProofValid {
log.Trace("delayedBlockBroadcaster.SetFinalProofForValidator: consensus message alarm has not been set",
"validatorConsensusOrder", consensusIndex,
)

duration := dbb.getBroadcastDelayForIndex(consensusIndex)
alarmID := prefixConsensusMessageAlarm + hex.EncodeToString(message.BlockHeaderHash)
return nil
}

dbb.mutBroadcastConsensusMessage.Lock()
dbb.valBroadcastConsensusMessage[alarmID] = message
dbb.mutBroadcastConsensusMessage.Unlock()
if dbb.cacheConsensusMessages.Has(proof.HeaderHash) {
return nil
}

dbb.alarm.Add(dbb.consensusMessageAlarmExpired, duration, alarmID)
log.Trace("delayedBlockBroadcaster.SetFinalConsensusMessageForValidator: consensus message alarm has been set",
"validatorConsensusOrder", consensusIndex,
"headerHash", message.BlockHeaderHash,
"alarmID", alarmID,
"duration", duration,
)
} else {
log.Trace("delayedBlockBroadcaster.SetFinalConsensusMessageForValidator: consensus message alarm has not been set",
"validatorConsensusOrder", consensusIndex,
)
duration := dbb.getBroadcastDelayForIndex(consensusIndex)
alarmID := prefixConsensusMessageAlarm + hex.EncodeToString(proof.HeaderHash)

vProof := &validatorProof{
proof: proof,
pkBytes: pkBytes,
}
dbb.mutBroadcastFinalProof.Lock()
dbb.valBroadcastFinalProof[alarmID] = vProof
dbb.mutBroadcastFinalProof.Unlock()

dbb.alarm.Add(dbb.finalProofAlarmExpired, duration, alarmID)
log.Trace("delayedBlockBroadcaster.SetFinalProofForValidator: final proof alarm has been set",
"validatorConsensusOrder", consensusIndex,
"headerHash", proof.HeaderHash,
"alarmID", alarmID,
"duration", duration,
)

return nil
}
Expand All @@ -285,6 +304,7 @@ func (dbb *delayedBlockBroadcaster) SetBroadcastHandlers(
mbBroadcast func(mbData map[uint32][]byte, pkBytes []byte) error,
txBroadcast func(txData map[string][][]byte, pkBytes []byte) error,
headerBroadcast func(header data.HeaderHandler, pkBytes []byte) error,
equivalentProofBroadcast func(proof *block.HeaderProof, pkBytes []byte) error,
consensusMessageBroadcast func(message *consensus.Message) error,
) error {
if mbBroadcast == nil || txBroadcast == nil || headerBroadcast == nil || consensusMessageBroadcast == nil {
Expand All @@ -297,6 +317,7 @@ func (dbb *delayedBlockBroadcaster) SetBroadcastHandlers(
dbb.broadcastMiniblocksData = mbBroadcast
dbb.broadcastTxsData = txBroadcast
dbb.broadcastHeader = headerBroadcast
dbb.broadcastEquivalentProof = equivalentProofBroadcast
dbb.broadcastConsensusMessage = consensusMessageBroadcast

return nil
Expand Down Expand Up @@ -808,34 +829,34 @@ func (dbb *delayedBlockBroadcaster) getBroadcastDelayForIndex(index int) time.Du
return 0
}

func (dbb *delayedBlockBroadcaster) consensusMessageAlarmExpired(alarmID string) {
func (dbb *delayedBlockBroadcaster) finalProofAlarmExpired(alarmID string) {
headerHash, err := hex.DecodeString(strings.TrimPrefix(alarmID, prefixConsensusMessageAlarm))
if err != nil {
log.Error("delayedBlockBroadcaster.consensusMessageAlarmExpired", "error", err.Error(),
log.Error("delayedBlockBroadcaster.finalProofAlarmExpired", "error", err.Error(),
"headerHash", headerHash,
"alarmID", alarmID,
)
return
}

dbb.mutBroadcastConsensusMessage.Lock()
defer dbb.mutBroadcastConsensusMessage.Unlock()
dbb.mutBroadcastFinalProof.Lock()
defer dbb.mutBroadcastFinalProof.Unlock()
if dbb.cacheConsensusMessages.Has(headerHash) {
delete(dbb.valBroadcastConsensusMessage, alarmID)
delete(dbb.valBroadcastFinalProof, alarmID)
return
}

consensusMessage, ok := dbb.valBroadcastConsensusMessage[alarmID]
vProof, ok := dbb.valBroadcastFinalProof[alarmID]
if !ok {
return
}

err = dbb.broadcastConsensusMessage(consensusMessage)
err = dbb.broadcastEquivalentProof(vProof.proof, vProof.pkBytes)
if err != nil {
log.Error("consensusMessageAlarmExpired.broadcastConsensusMessage", "error", err)
log.Error("finalProofAlarmExpired.broadcastEquivalentProof", "error", err)
}

delete(dbb.valBroadcastConsensusMessage, alarmID)
delete(dbb.valBroadcastFinalProof, alarmID)
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
Loading
Loading