Skip to content
50 changes: 45 additions & 5 deletions dataRetriever/dataPool/proofsCache/proofsPool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proofscache

import (
"bytes"
"fmt"
"sync"

Expand Down Expand Up @@ -43,22 +44,40 @@ func NewProofsPool(cleanupNonceDelta uint64, bucketSize int) *proofsPool {
}
}

// AddProof will add the provided proof to the pool
// UpsertProof will add the provided proof to the pool. If there is already an existing proof,
// it will overwrite it.
func (pp *proofsPool) UpsertProof(
headerProof data.HeaderProofHandler,
) bool {
if check.IfNilReflect(headerProof) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check.IfNil instead for proofs IsInterfaceNil was added for the HeaderProofHandler

return false
}

return pp.addProof(headerProof)
}

// AddProof will add the provided proof to the pool, if it's not already in the pool.
// It will return true if the proof was added to the pool.
func (pp *proofsPool) AddProof(
headerProof data.HeaderProofHandler,
) bool {
if check.IfNil(headerProof) {
return false
}

shardID := headerProof.GetHeaderShardId()
headerHash := headerProof.GetHeaderHash()

hasProof := pp.HasProof(shardID, headerHash)
hasProof := pp.HasProof(headerProof.GetHeaderShardId(), headerProof.GetHeaderHash())
if hasProof {
return false
}

return pp.addProof(headerProof)
}

func (pp *proofsPool) addProof(
headerProof data.HeaderProofHandler,
) bool {
shardID := headerProof.GetHeaderShardId()

pp.mutCache.Lock()
proofsPerShard, ok := pp.cache[shardID]
if !ok {
Expand All @@ -85,6 +104,27 @@ func (pp *proofsPool) AddProof(
return true
}

// IsProofInPoolEqualTo will check if the provided proof is equal with the already existing proof in the pool
func (pp *proofsPool) IsProofInPoolEqualTo(headerProof data.HeaderProofHandler) bool {
if check.IfNilReflect(headerProof) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check.IfNil instread

return false
}

existingProof, err := pp.GetProof(headerProof.GetHeaderShardId(), headerProof.GetHeaderHash())
if err != nil {
return false
}

if !bytes.Equal(existingProof.GetAggregatedSignature(), headerProof.GetAggregatedSignature()) {
return false
}
if !bytes.Equal(existingProof.GetPubKeysBitmap(), headerProof.GetPubKeysBitmap()) {
return false
}

return true
}

func (pp *proofsPool) callAddedProofSubscribers(headerProof data.HeaderProofHandler) {
pp.mutAddedProofSubscribers.RLock()
defer pp.mutAddedProofSubscribers.RUnlock()
Expand Down
98 changes: 97 additions & 1 deletion dataRetriever/dataPool/proofsCache/proofsPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ func TestProofsPool_ShouldWork(t *testing.T) {

pp := proofscache.NewProofsPool(cleanupDelta, bucketSize)

ok := pp.AddProof(nil)
require.False(t, ok)

_ = pp.AddProof(proof1)
_ = pp.AddProof(proof2)
_ = pp.AddProof(proof3)
_ = pp.AddProof(proof4)

ok := pp.AddProof(proof4)
ok = pp.AddProof(proof4)
require.False(t, ok)

proof, err := pp.GetProof(shardID, []byte("hash3"))
Expand All @@ -90,6 +93,99 @@ func TestProofsPool_ShouldWork(t *testing.T) {
require.Equal(t, proof4, proof)
}

func TestProofsPool_Upsert(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool(cleanupDelta, bucketSize)

ok := pp.UpsertProof(nil)
require.False(t, ok)

ok = pp.UpsertProof(proof1)
require.True(t, ok)

proof, err := pp.GetProof(shardID, []byte("hash1"))
require.Nil(t, err)
require.NotNil(t, proof)

require.Equal(t, proof1.GetAggregatedSignature(), proof.GetAggregatedSignature())
require.Equal(t, proof1.GetPubKeysBitmap(), proof.GetPubKeysBitmap())

newProof1 := &block.HeaderProof{
PubKeysBitmap: []byte("newpubKeysBitmap1"),
AggregatedSignature: []byte("newaggSig1"),
HeaderHash: []byte("hash1"),
HeaderEpoch: 1,
HeaderNonce: 1,
HeaderShardId: shardID,
}

ok = pp.UpsertProof(newProof1)
require.True(t, ok)

proof, err = pp.GetProof(shardID, []byte("hash1"))
require.Nil(t, err)
require.NotNil(t, proof)

require.Equal(t, newProof1.GetAggregatedSignature(), proof.GetAggregatedSignature())
require.Equal(t, newProof1.GetPubKeysBitmap(), proof.GetPubKeysBitmap())
}

func TestProofsPool_IsProofEqual(t *testing.T) {
t.Parallel()

t.Run("not existing proof, should fail", func(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool(cleanupDelta, bucketSize)

ok := pp.IsProofInPoolEqualTo(proof1)
require.False(t, ok)
})

t.Run("nil provided proof, should fail", func(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool(cleanupDelta, bucketSize)

ok := pp.IsProofInPoolEqualTo(nil)
require.False(t, ok)
})

t.Run("same proof, should return true", func(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool(cleanupDelta, bucketSize)

ok := pp.UpsertProof(proof1)
require.True(t, ok)

ok = pp.IsProofInPoolEqualTo(proof1)
require.True(t, ok)
})

t.Run("not equal, should return false", func(t *testing.T) {
t.Parallel()

pp := proofscache.NewProofsPool(cleanupDelta, bucketSize)

ok := pp.UpsertProof(proof1)
require.True(t, ok)

newProof1 := &block.HeaderProof{
PubKeysBitmap: []byte("newpubKeysBitmap1"),
AggregatedSignature: []byte("newaggSig1"),
HeaderHash: []byte("hash1"),
HeaderEpoch: 1,
HeaderNonce: 1,
HeaderShardId: shardID,
}

ok = pp.IsProofInPoolEqualTo(newProof1)
require.False(t, ok)
})
}

func TestProofsPool_RegisterHandler(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 2 additions & 0 deletions dataRetriever/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,11 @@ type PeerAuthenticationPayloadValidator interface {
// ProofsPool defines the behaviour of a proofs pool components
type ProofsPool interface {
AddProof(headerProof data.HeaderProofHandler) bool
UpsertProof(headerProof data.HeaderProofHandler) bool
RegisterHandler(handler func(headerProof data.HeaderProofHandler))
CleanupProofsBehindNonce(shardID uint32, nonce uint64) error
GetProof(shardID uint32, headerHash []byte) (data.HeaderProofHandler, error)
HasProof(shardID uint32, headerHash []byte) bool
IsProofInPoolEqualTo(headerProof data.HeaderProofHandler) bool
IsInterfaceNil() bool
}
1 change: 1 addition & 0 deletions epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error {
HeaderIntegrityVerifier: e.headerIntegrityVerifier,
MetaBlockProcessor: metaBlockProcessor,
InterceptedDataVerifierFactory: e.interceptedDataVerifierFactory,
ProofsPool: e.dataPool.Proofs(),
}
e.epochStartMetaBlockSyncer, err = NewEpochStartMetaSyncer(argsEpochStartSyncer)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions epochStart/bootstrap/storageProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (sesb *storageEpochStartBootstrap) prepareComponentsToSync() error {
HeaderIntegrityVerifier: sesb.headerIntegrityVerifier,
MetaBlockProcessor: metablockProcessor,
InterceptedDataVerifierFactory: sesb.interceptedDataVerifierFactory,
ProofsPool: sesb.dataPool.Proofs(),
}

sesb.epochStartMetaBlockSyncer, err = NewEpochStartMetaSyncer(argsEpochStartSyncer)
Expand Down
8 changes: 7 additions & 1 deletion epochStart/bootstrap/syncEpochStartMeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/epochStart"
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/disabled"
"github.com/multiversx/mx-chain-go/process"
Expand Down Expand Up @@ -46,6 +47,7 @@ type ArgsNewEpochStartMetaSyncer struct {
HeaderIntegrityVerifier process.HeaderIntegrityVerifier
MetaBlockProcessor EpochStartMetaBlockInterceptorProcessor
InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory
ProofsPool dataRetriever.ProofsPool
}

// NewEpochStartMetaSyncer will return a new instance of epochStartMetaSyncer
Expand Down Expand Up @@ -90,8 +92,12 @@ func NewEpochStartMetaSyncer(args ArgsNewEpochStartMetaSyncer) (*epochStartMetaS
EpochStartTrigger: disabled.NewEpochStartTrigger(),
ArgsParser: args.ArgsParser,
}
argsInterceptedMetaHeaderFactory := interceptorsFactory.ArgInterceptedMetaHeaderFactory{
ArgInterceptedDataFactory: argsInterceptedDataFactory,
ProofsPool: args.ProofsPool,
}

interceptedMetaHdrDataFactory, err := interceptorsFactory.NewInterceptedMetaHeaderDataFactory(&argsInterceptedDataFactory)
interceptedMetaHdrDataFactory, err := interceptorsFactory.NewInterceptedMetaHeaderDataFactory(&argsInterceptedMetaHeaderFactory)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions epochStart/bootstrap/syncEpochStartMeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
processMock "github.com/multiversx/mx-chain-go/process/mock"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/multiversx/mx-chain-go/testscommon/cryptoMocks"
"github.com/multiversx/mx-chain-go/testscommon/dataRetriever"
"github.com/multiversx/mx-chain-go/testscommon/economicsmocks"
"github.com/multiversx/mx-chain-go/testscommon/hashingMocks"
"github.com/multiversx/mx-chain-go/testscommon/p2pmocks"
Expand Down Expand Up @@ -171,5 +172,6 @@ func getEpochStartSyncerArgs() ArgsNewEpochStartMetaSyncer {
HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{},
MetaBlockProcessor: &mock.EpochStartMetaBlockProcessorStub{},
InterceptedDataVerifierFactory: &processMock.InterceptedDataVerifierFactoryMock{},
ProofsPool: &dataRetriever.ProofsPoolMock{},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ type ArgInterceptedBlockHeader struct {
ValidityAttester process.ValidityAttester
EpochStartTrigger process.EpochStartTriggerHandler
EnableEpochsHandler common.EnableEpochsHandler
ProofsPool process.ProofsPool
}
44 changes: 9 additions & 35 deletions process/block/interceptedBlocks/common.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package interceptedBlocks

import (
"sync"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
Expand Down Expand Up @@ -125,16 +123,12 @@ func checkMetaShardInfo(
shardInfo []data.ShardDataHandler,
coordinator sharding.Coordinator,
headerSigVerifier process.InterceptedHeaderSigVerifier,
proofs process.ProofsPool,
) error {
if coordinator.SelfId() != core.MetachainShardId {
return nil
}

wgProofsVerification := sync.WaitGroup{}

errChan := make(chan error, len(shardInfo))
defer close(errChan)

for _, sd := range shardInfo {
if sd.GetShardID() >= coordinator.NumberOfShards() && sd.GetShardID() != core.MetachainShardId {
return process.ErrInvalidShardId
Expand All @@ -145,38 +139,18 @@ func checkMetaShardInfo(
return err
}

wgProofsVerification.Add(1)
checkProofAsync(sd.GetPreviousProof(), headerSigVerifier, &wgProofsVerification, errChan)
}

wgProofsVerification.Wait()

return readFromChanNonBlocking(errChan)
}
if proofs.IsProofInPoolEqualTo(sd.GetPreviousProof()) {
continue
}

func readFromChanNonBlocking(errChan chan error) error {
select {
case err := <-errChan:
return err
default:
return nil
}
}
err = checkProof(sd.GetPreviousProof(), headerSigVerifier)
if err != nil {
return err

func checkProofAsync(
proof data.HeaderProofHandler,
headerSigVerifier process.InterceptedHeaderSigVerifier,
wg *sync.WaitGroup,
errChan chan error,
) {
go func(proof data.HeaderProofHandler) {
errCheckProof := checkProof(proof, headerSigVerifier)
if errCheckProof != nil {
errChan <- errCheckProof
}
}

wg.Done()
}(proof)
return nil
}

func checkProof(proof data.HeaderProofHandler, headerSigVerifier process.InterceptedHeaderSigVerifier) error {
Expand Down
Loading