Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions consensus/spos/bls/v2/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,6 @@ func (sr *subroundEndRound) ReceivedProof(proof consensus.ProofHandler) {
sr.receivedProof(proof)
}

// IsConsensusHeaderReceived calls the unexported isConsensusHeaderReceived function
func (sr *subroundEndRound) IsConsensusHeaderReceived() (bool, data.HeaderHandler) {
return sr.isConsensusHeaderReceived()
}

// IsOutOfTime calls the unexported isOutOfTime function
func (sr *subroundEndRound) IsOutOfTime() bool {
return sr.isOutOfTime()
Expand Down Expand Up @@ -340,3 +335,8 @@ func (sr *subroundSignature) SendSignatureForManagedKey(idx int, pk string) bool
func (sr *subroundSignature) DoSignatureJobForManagedKeys(ctx context.Context) bool {
return sr.doSignatureJobForManagedKeys(ctx)
}

// ReceivedSignature method is called when a signature is received through the signature channel
func (sr *subroundEndRound) ReceivedSignature(cnsDta *consensus.Message) bool {
return sr.receivedSignature(context.Background(), cnsDta)
}
177 changes: 21 additions & 156 deletions consensus/spos/bls/v2/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type subroundEndRound struct {
sentSignatureTracker spos.SentSignaturesTracker
worker spos.WorkerHandler
signatureThrottler core.Throttler
blockCommitted bool
}

// NewSubroundEndRound creates a subroundEndRound object
Expand Down Expand Up @@ -70,7 +69,6 @@ func NewSubroundEndRound(
sentSignatureTracker: sentSignatureTracker,
worker: worker,
signatureThrottler: signatureThrottler,
blockCommitted: false,
}
srEndRound.Job = srEndRound.doEndRoundJob
srEndRound.Check = srEndRound.doEndRoundConsensusCheck
Expand Down Expand Up @@ -116,7 +114,7 @@ func (sr *subroundEndRound) receivedProof(proof consensus.ProofHandler) {
return
}

// no need to re-verify the proof if as it was already verified when it was added to the proofs pool
// no need to re-verify the proof since it was already verified when it was added to the proofs pool
log.Debug("step 3: block header final info has been received",
"PubKeysBitmap", proof.GetPubKeysBitmap(),
"AggregateSignature", proof.GetAggregatedSignature(),
Expand Down Expand Up @@ -224,11 +222,7 @@ func (sr *subroundEndRound) doEndRoundJob(_ context.Context) bool {
return sr.doEndRoundJobByNode()
}

func (sr *subroundEndRound) commitBlock(proof data.HeaderProofHandler) error {
if sr.blockCommitted {
return nil
}

func (sr *subroundEndRound) commitBlock() error {
startTime := time.Now()
err := sr.BlockProcessor().CommitBlock(sr.GetHeader(), sr.GetBody())
elapsedTime := time.Since(startTime)
Expand All @@ -242,14 +236,6 @@ func (sr *subroundEndRound) commitBlock(proof data.HeaderProofHandler) error {
return err
}

if proof != nil {
err = sr.EquivalentProofsPool().AddProof(proof)
if err != nil {
log.Debug("doEndRoundJobByNode.AddProof", "error", err)
return err
}
}

return nil
}

Expand All @@ -262,11 +248,21 @@ func (sr *subroundEndRound) doEndRoundJobByNode() bool {
defer sr.mutProcessingEndRound.Unlock()

proof := sr.sendProof()
err := sr.commitBlock(proof)

err := sr.commitBlock()
if err != nil {
return false
}

// if proof not nil, it was created and broadcasted so it has to be added to the pool
if proof != nil {
err = sr.EquivalentProofsPool().AddProof(proof)
if err != nil {
log.Debug("doEndRoundJobByNode.AddProof", "error", err)
return false
}
}

sr.SetStatus(sr.Current(), spos.SsFinished)

sr.worker.DisplayStatistics()
Expand Down Expand Up @@ -314,7 +310,7 @@ func (sr *subroundEndRound) sendProof() data.HeaderProofHandler {
return nil
}

// broadcast header and final info section
// broadcast header proof
return sr.createAndBroadcastProof(sig, bitmap)
}

Expand All @@ -341,6 +337,7 @@ func (sr *subroundEndRound) aggregateSigsAndHandleInvalidSigners(bitmap []byte)
return nil, nil, err
}

// the header (hash) verified here is with leader signature on it
err = sr.SigningHandler().Verify(sr.GetData(), bitmap, sr.GetHeader().GetEpoch())
if err != nil {
log.Debug("doEndRoundJobByNode.Verify", "error", err.Error())
Expand Down Expand Up @@ -470,15 +467,16 @@ func (sr *subroundEndRound) handleInvalidSignersOnAggSigFail() ([]byte, []byte,
return nil, nil, err
}

invalidSigners, err := sr.getFullMessagesForInvalidSigners(invalidPubKeys)
_, err = sr.getFullMessagesForInvalidSigners(invalidPubKeys)
if err != nil {
log.Debug("doEndRoundJobByNode.getFullMessagesForInvalidSigners", "error", err.Error())
return nil, nil, err
}

if len(invalidSigners) > 0 {
sr.createAndBroadcastInvalidSigners(invalidSigners)
}
// TODO: handle invalid signers broadcast without flooding the network
// if len(invalidSigners) > 0 {
// sr.createAndBroadcastInvalidSigners(invalidSigners)
// }

bitmap, sig, err := sr.computeAggSigOnValidNodes()
if err != nil {
Expand Down Expand Up @@ -544,7 +542,7 @@ func (sr *subroundEndRound) createAndBroadcastProof(signature []byte, bitmap []b
}

func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []byte) {
if !sr.IsSelfLeader() {
if !sr.ShouldConsiderSelfKeyInConsensus() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: we need to revisit this, as it could enable flooding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return
}

Expand Down Expand Up @@ -580,115 +578,13 @@ func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []by
log.Debug("step 3: invalid signers info has been sent")
}

func (sr *subroundEndRound) isConsensusHeaderReceived() (bool, data.HeaderHandler) {
if check.IfNil(sr.GetHeader()) {
return false, nil
}

consensusHeaderHash, err := core.CalculateHash(sr.Marshalizer(), sr.Hasher(), sr.GetHeader())
if err != nil {
log.Debug("isConsensusHeaderReceived: calculate consensus header hash", "error", err.Error())
return false, nil
}

receivedHeaders := sr.GetReceivedHeaders()

var receivedHeaderHash []byte
for index := range receivedHeaders {
// TODO[cleanup cns finality]: remove this
receivedHeader := receivedHeaders[index].ShallowClone()
if !sr.EnableEpochsHandler().IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, receivedHeader.GetEpoch()) {
err = receivedHeader.SetLeaderSignature(nil)
if err != nil {
log.Debug("isConsensusHeaderReceived - SetLeaderSignature", "error", err.Error())
return false, nil
}

err = receivedHeader.SetPubKeysBitmap(nil)
if err != nil {
log.Debug("isConsensusHeaderReceived - SetPubKeysBitmap", "error", err.Error())
return false, nil
}

err = receivedHeader.SetSignature(nil)
if err != nil {
log.Debug("isConsensusHeaderReceived - SetSignature", "error", err.Error())
return false, nil
}
}

receivedHeaderHash, err = core.CalculateHash(sr.Marshalizer(), sr.Hasher(), receivedHeader)
if err != nil {
log.Debug("isConsensusHeaderReceived: calculate received header hash", "error", err.Error())
return false, nil
}

if bytes.Equal(receivedHeaderHash, consensusHeaderHash) {
return true, receivedHeaders[index]
}
}

return false, nil
}

func (sr *subroundEndRound) signBlockHeader(leader []byte) ([]byte, error) {
headerClone := sr.GetHeader().ShallowClone()
err := headerClone.SetLeaderSignature(nil)
if err != nil {
return nil, err
}

marshalizedHdr, err := sr.Marshalizer().Marshal(headerClone)
if err != nil {
return nil, err
}

return sr.SigningHandler().CreateSignatureForPublicKey(marshalizedHdr, leader)
}

func (sr *subroundEndRound) updateMetricsForLeader() {
// TODO: decide if we keep these metrics the same way
sr.appStatusHandler.Increment(common.MetricCountAcceptedBlocks)
sr.appStatusHandler.SetStringValue(common.MetricConsensusRoundState,
fmt.Sprintf("valid block produced in %f sec", time.Since(sr.RoundHandler().TimeStamp()).Seconds()))
}

func (sr *subroundEndRound) broadcastBlockDataLeader(sender []byte) error {
// TODO[cleanup cns finality]: remove this method, block data was already broadcast during subroundBlock
if sr.EnableEpochsHandler().IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, sr.GetHeader().GetEpoch()) {
return nil
}

miniBlocks, transactions, err := sr.BlockProcessor().MarshalizedDataToBroadcast(sr.GetHeader(), sr.GetBody())
if err != nil {
return err
}

return sr.BroadcastMessenger().BroadcastBlockDataLeader(sr.GetHeader(), miniBlocks, transactions, sender)
}

func (sr *subroundEndRound) setHeaderForValidator(header data.HeaderHandler) error {
idx, pk, miniBlocks, transactions, err := sr.getIndexPkAndDataToBroadcast()
if err != nil {
return err
}

go sr.BroadcastMessenger().PrepareBroadcastHeaderValidator(header, miniBlocks, transactions, idx, pk)

return nil
}

func (sr *subroundEndRound) prepareBroadcastBlockDataForValidator() error {
idx, pk, miniBlocks, transactions, err := sr.getIndexPkAndDataToBroadcast()
if err != nil {
return err
}

go sr.BroadcastMessenger().PrepareBroadcastBlockDataValidator(sr.GetHeader(), miniBlocks, transactions, idx, pk)

return nil
}

// doEndRoundConsensusCheck method checks if the consensus is achieved
func (sr *subroundEndRound) doEndRoundConsensusCheck() bool {
if sr.GetRoundCanceled() {
Expand Down Expand Up @@ -776,37 +672,6 @@ func (sr *subroundEndRound) isOutOfTime() bool {
return false
}

func (sr *subroundEndRound) getIndexPk() (int, []byte, error) {
minIdx := sr.getMinConsensusGroupIndexOfManagedKeys()

idx, err := sr.SelfConsensusGroupIndex()
if err == nil {
if idx < minIdx {
minIdx = idx
}
}

if minIdx == sr.ConsensusGroupSize() {
return -1, nil, err
}

return minIdx, []byte(sr.ConsensusGroup()[minIdx]), nil
}

func (sr *subroundEndRound) getIndexPkAndDataToBroadcast() (int, []byte, map[uint32][]byte, map[string][][]byte, error) {
minIdx, pk, err := sr.getIndexPk()
if err != nil {
return -1, nil, nil, nil, err
}

miniBlocks, transactions, err := sr.BlockProcessor().MarshalizedDataToBroadcast(sr.GetHeader(), sr.GetBody())
if err != nil {
return -1, nil, nil, nil, err
}

return minIdx, pk, miniBlocks, transactions, nil
}

func (sr *subroundEndRound) getMinConsensusGroupIndexOfManagedKeys() int {
minIdx := sr.ConsensusGroupSize()

Expand Down
Loading
Loading