Skip to content

Commit eccd1bf

Browse files
authored
Merge pull request #6516 from multiversx/send_proofs_on_topic
final proof is now sent on the common topic with META
2 parents 5624d0c + 1ca0a01 commit eccd1bf

26 files changed

+462
-671
lines changed

consensus/broadcast/commonMessenger.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/multiversx/mx-chain-core-go/core"
88
"github.com/multiversx/mx-chain-core-go/core/check"
99
"github.com/multiversx/mx-chain-core-go/core/partitioning"
10+
"github.com/multiversx/mx-chain-core-go/data/block"
1011
"github.com/multiversx/mx-chain-core-go/hashing"
1112
"github.com/multiversx/mx-chain-core-go/marshal"
1213
crypto "github.com/multiversx/mx-chain-crypto-go"
@@ -186,11 +187,15 @@ func (cm *commonMessenger) BroadcastBlockData(
186187
}
187188
}
188189

189-
// PrepareBroadcastFinalConsensusMessage prepares the validator final info data broadcast for when its turn comes
190-
func (cm *commonMessenger) PrepareBroadcastFinalConsensusMessage(message *consensus.Message, consensusIndex int) {
191-
err := cm.delayedBlockBroadcaster.SetFinalConsensusMessageForValidator(message, consensusIndex)
190+
// PrepareBroadcastEquivalentProof sets the proof into the delayed block broadcaster
191+
func (cm *commonMessenger) PrepareBroadcastEquivalentProof(
192+
proof *block.HeaderProof,
193+
consensusIndex int,
194+
pkBytes []byte,
195+
) {
196+
err := cm.delayedBlockBroadcaster.SetFinalProofForValidator(proof, consensusIndex, pkBytes)
192197
if err != nil {
193-
log.Error("commonMessenger.PrepareBroadcastFinalConsensusMessage", "error", err)
198+
log.Error("commonMessenger.PrepareBroadcastEquivalentProof", "error", err)
194199
}
195200
}
196201

@@ -240,3 +245,18 @@ func (cm *commonMessenger) broadcast(topic string, data []byte, pkBytes []byte)
240245

241246
cm.messenger.BroadcastUsingPrivateKey(topic, data, pid, skBytes)
242247
}
248+
249+
func (cm *commonMessenger) broadcastEquivalentProof(proof *block.HeaderProof, pkBytes []byte, topic string) error {
250+
if check.IfNilReflect(proof) {
251+
return spos.ErrNilHeaderProof
252+
}
253+
254+
msgProof, err := cm.marshalizer.Marshal(proof)
255+
if err != nil {
256+
return err
257+
}
258+
259+
cm.broadcast(topic, msgProof, pkBytes)
260+
261+
return nil
262+
}

consensus/broadcast/delayedBroadcast.go

Lines changed: 89 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -53,27 +53,33 @@ type headerDataForValidator struct {
5353
prevRandSeed []byte
5454
}
5555

56+
type validatorProof struct {
57+
proof *block.HeaderProof
58+
pkBytes []byte
59+
}
60+
5661
type delayedBlockBroadcaster struct {
57-
alarm timersScheduler
58-
interceptorsContainer process.InterceptorsContainer
59-
shardCoordinator sharding.Coordinator
60-
headersSubscriber consensus.HeadersPoolSubscriber
61-
valHeaderBroadcastData []*shared.ValidatorHeaderBroadcastData
62-
valBroadcastData []*shared.DelayedBroadcastData
63-
delayedBroadcastData []*shared.DelayedBroadcastData
64-
maxDelayCacheSize uint32
65-
maxValidatorDelayCacheSize uint32
66-
mutDataForBroadcast sync.RWMutex
67-
broadcastMiniblocksData func(mbData map[uint32][]byte, pkBytes []byte) error
68-
broadcastTxsData func(txData map[string][][]byte, pkBytes []byte) error
69-
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
70-
broadcastConsensusMessage func(message *consensus.Message) error
71-
cacheHeaders storage.Cacher
72-
mutHeadersCache sync.RWMutex
73-
config config.ConsensusGradualBroadcastConfig
74-
mutBroadcastConsensusMessage sync.RWMutex
75-
valBroadcastConsensusMessage map[string]*consensus.Message
76-
cacheConsensusMessages storage.Cacher
62+
alarm timersScheduler
63+
interceptorsContainer process.InterceptorsContainer
64+
shardCoordinator sharding.Coordinator
65+
headersSubscriber consensus.HeadersPoolSubscriber
66+
valHeaderBroadcastData []*shared.ValidatorHeaderBroadcastData
67+
valBroadcastData []*shared.DelayedBroadcastData
68+
delayedBroadcastData []*shared.DelayedBroadcastData
69+
maxDelayCacheSize uint32
70+
maxValidatorDelayCacheSize uint32
71+
mutDataForBroadcast sync.RWMutex
72+
broadcastMiniblocksData func(mbData map[uint32][]byte, pkBytes []byte) error
73+
broadcastTxsData func(txData map[string][][]byte, pkBytes []byte) error
74+
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
75+
broadcastEquivalentProof func(proof *block.HeaderProof, pkBytes []byte) error
76+
broadcastConsensusMessage func(message *consensus.Message) error
77+
cacheHeaders storage.Cacher
78+
mutHeadersCache sync.RWMutex
79+
config config.ConsensusGradualBroadcastConfig
80+
mutBroadcastFinalProof sync.RWMutex
81+
valBroadcastFinalProof map[string]*validatorProof
82+
cacheConsensusMessages storage.Cacher
7783
}
7884

7985
// NewDelayedBlockBroadcaster create a new instance of a delayed block data broadcaster
@@ -102,21 +108,21 @@ func NewDelayedBlockBroadcaster(args *ArgsDelayedBlockBroadcaster) (*delayedBloc
102108
}
103109

104110
dbb := &delayedBlockBroadcaster{
105-
alarm: args.AlarmScheduler,
106-
shardCoordinator: args.ShardCoordinator,
107-
interceptorsContainer: args.InterceptorsContainer,
108-
headersSubscriber: args.HeadersSubscriber,
109-
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
110-
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
111-
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
112-
valBroadcastConsensusMessage: make(map[string]*consensus.Message, 0),
113-
maxDelayCacheSize: args.LeaderCacheSize,
114-
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
115-
mutDataForBroadcast: sync.RWMutex{},
116-
cacheHeaders: cacheHeaders,
117-
mutHeadersCache: sync.RWMutex{},
118-
config: args.Config,
119-
cacheConsensusMessages: cacheConsensusMessages,
111+
alarm: args.AlarmScheduler,
112+
shardCoordinator: args.ShardCoordinator,
113+
interceptorsContainer: args.InterceptorsContainer,
114+
headersSubscriber: args.HeadersSubscriber,
115+
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
116+
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
117+
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
118+
valBroadcastFinalProof: make(map[string]*validatorProof, 0),
119+
maxDelayCacheSize: args.LeaderCacheSize,
120+
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
121+
mutDataForBroadcast: sync.RWMutex{},
122+
cacheHeaders: cacheHeaders,
123+
mutHeadersCache: sync.RWMutex{},
124+
config: args.Config,
125+
cacheConsensusMessages: cacheConsensusMessages,
120126
}
121127

122128
dbb.headersSubscriber.RegisterHandler(dbb.headerReceived)
@@ -245,37 +251,50 @@ func (dbb *delayedBlockBroadcaster) SetValidatorData(broadcastData *shared.Delay
245251
return nil
246252
}
247253

248-
// SetFinalConsensusMessageForValidator sets the consensus message to be broadcast by validator when its turn comes
249-
func (dbb *delayedBlockBroadcaster) SetFinalConsensusMessageForValidator(message *consensus.Message, consensusIndex int) error {
250-
if message == nil {
251-
return spos.ErrNilConsensusMessage
254+
// SetFinalProofForValidator sets the header proof to be broadcast by validator when its turn comes
255+
func (dbb *delayedBlockBroadcaster) SetFinalProofForValidator(
256+
proof *block.HeaderProof,
257+
consensusIndex int,
258+
pkBytes []byte,
259+
) error {
260+
if proof == nil {
261+
return spos.ErrNilHeaderProof
252262
}
253263

254264
// set alarm only for validators that are aware that the block was finalized
255-
if len(message.AggregateSignature) > 0 && len(message.PubKeysBitmap) > 0 {
256-
if dbb.cacheConsensusMessages.Has(message.BlockHeaderHash) {
257-
return nil
258-
}
265+
isProofValid := len(proof.AggregatedSignature) > 0 &&
266+
len(proof.PubKeysBitmap) > 0 &&
267+
len(proof.HeaderHash) > 0
268+
if !isProofValid {
269+
log.Trace("delayedBlockBroadcaster.SetFinalProofForValidator: consensus message alarm has not been set",
270+
"validatorConsensusOrder", consensusIndex,
271+
)
259272

260-
duration := dbb.getBroadcastDelayForIndex(consensusIndex)
261-
alarmID := prefixConsensusMessageAlarm + hex.EncodeToString(message.BlockHeaderHash)
273+
return nil
274+
}
262275

263-
dbb.mutBroadcastConsensusMessage.Lock()
264-
dbb.valBroadcastConsensusMessage[alarmID] = message
265-
dbb.mutBroadcastConsensusMessage.Unlock()
276+
if dbb.cacheConsensusMessages.Has(proof.HeaderHash) {
277+
return nil
278+
}
266279

267-
dbb.alarm.Add(dbb.consensusMessageAlarmExpired, duration, alarmID)
268-
log.Trace("delayedBlockBroadcaster.SetFinalConsensusMessageForValidator: consensus message alarm has been set",
269-
"validatorConsensusOrder", consensusIndex,
270-
"headerHash", message.BlockHeaderHash,
271-
"alarmID", alarmID,
272-
"duration", duration,
273-
)
274-
} else {
275-
log.Trace("delayedBlockBroadcaster.SetFinalConsensusMessageForValidator: consensus message alarm has not been set",
276-
"validatorConsensusOrder", consensusIndex,
277-
)
280+
duration := dbb.getBroadcastDelayForIndex(consensusIndex)
281+
alarmID := prefixConsensusMessageAlarm + hex.EncodeToString(proof.HeaderHash)
282+
283+
vProof := &validatorProof{
284+
proof: proof,
285+
pkBytes: pkBytes,
278286
}
287+
dbb.mutBroadcastFinalProof.Lock()
288+
dbb.valBroadcastFinalProof[alarmID] = vProof
289+
dbb.mutBroadcastFinalProof.Unlock()
290+
291+
dbb.alarm.Add(dbb.finalProofAlarmExpired, duration, alarmID)
292+
log.Trace("delayedBlockBroadcaster.SetFinalProofForValidator: final proof alarm has been set",
293+
"validatorConsensusOrder", consensusIndex,
294+
"headerHash", proof.HeaderHash,
295+
"alarmID", alarmID,
296+
"duration", duration,
297+
)
279298

280299
return nil
281300
}
@@ -285,6 +304,7 @@ func (dbb *delayedBlockBroadcaster) SetBroadcastHandlers(
285304
mbBroadcast func(mbData map[uint32][]byte, pkBytes []byte) error,
286305
txBroadcast func(txData map[string][][]byte, pkBytes []byte) error,
287306
headerBroadcast func(header data.HeaderHandler, pkBytes []byte) error,
307+
equivalentProofBroadcast func(proof *block.HeaderProof, pkBytes []byte) error,
288308
consensusMessageBroadcast func(message *consensus.Message) error,
289309
) error {
290310
if mbBroadcast == nil || txBroadcast == nil || headerBroadcast == nil || consensusMessageBroadcast == nil {
@@ -297,6 +317,7 @@ func (dbb *delayedBlockBroadcaster) SetBroadcastHandlers(
297317
dbb.broadcastMiniblocksData = mbBroadcast
298318
dbb.broadcastTxsData = txBroadcast
299319
dbb.broadcastHeader = headerBroadcast
320+
dbb.broadcastEquivalentProof = equivalentProofBroadcast
300321
dbb.broadcastConsensusMessage = consensusMessageBroadcast
301322

302323
return nil
@@ -808,34 +829,34 @@ func (dbb *delayedBlockBroadcaster) getBroadcastDelayForIndex(index int) time.Du
808829
return 0
809830
}
810831

811-
func (dbb *delayedBlockBroadcaster) consensusMessageAlarmExpired(alarmID string) {
832+
func (dbb *delayedBlockBroadcaster) finalProofAlarmExpired(alarmID string) {
812833
headerHash, err := hex.DecodeString(strings.TrimPrefix(alarmID, prefixConsensusMessageAlarm))
813834
if err != nil {
814-
log.Error("delayedBlockBroadcaster.consensusMessageAlarmExpired", "error", err.Error(),
835+
log.Error("delayedBlockBroadcaster.finalProofAlarmExpired", "error", err.Error(),
815836
"headerHash", headerHash,
816837
"alarmID", alarmID,
817838
)
818839
return
819840
}
820841

821-
dbb.mutBroadcastConsensusMessage.Lock()
822-
defer dbb.mutBroadcastConsensusMessage.Unlock()
842+
dbb.mutBroadcastFinalProof.Lock()
843+
defer dbb.mutBroadcastFinalProof.Unlock()
823844
if dbb.cacheConsensusMessages.Has(headerHash) {
824-
delete(dbb.valBroadcastConsensusMessage, alarmID)
845+
delete(dbb.valBroadcastFinalProof, alarmID)
825846
return
826847
}
827848

828-
consensusMessage, ok := dbb.valBroadcastConsensusMessage[alarmID]
849+
vProof, ok := dbb.valBroadcastFinalProof[alarmID]
829850
if !ok {
830851
return
831852
}
832853

833-
err = dbb.broadcastConsensusMessage(consensusMessage)
854+
err = dbb.broadcastEquivalentProof(vProof.proof, vProof.pkBytes)
834855
if err != nil {
835-
log.Error("consensusMessageAlarmExpired.broadcastConsensusMessage", "error", err)
856+
log.Error("finalProofAlarmExpired.broadcastEquivalentProof", "error", err)
836857
}
837858

838-
delete(dbb.valBroadcastConsensusMessage, alarmID)
859+
delete(dbb.valBroadcastFinalProof, alarmID)
839860
}
840861

841862
// IsInterfaceNil returns true if there is no value under the interface

0 commit comments

Comments
 (0)