Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@

[ProofsPoolConfig]
CleanupNonceDelta = 3
BucketSize = 100

[BadBlocksCache]
Name = "BadBlocksCache"
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type HeadersPoolConfig struct {
// ProofsPoolConfig will map the proofs cache configuration
type ProofsPoolConfig struct {
CleanupNonceDelta uint64
BucketSize int
}

// DBConfig will map the database configuration
Expand Down
43 changes: 43 additions & 0 deletions dataRetriever/dataPool/proofsCache/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package proofscache

import "github.com/multiversx/mx-chain-core-go/data"

// NewProofsCache -
func NewProofsCache(bucketSize int) *proofsCache {
return newProofsCache(bucketSize)
}

// HeadBucketSize -
func (pc *proofsCache) HeadBucketSize() int {
if len(pc.proofsByNonceBuckets) > 0 {
return len(pc.proofsByNonceBuckets[0].proofsByNonce)
}

return 0
}

// HeadBucketSize -
func (pc *proofsCache) FullProofsByNonceSize() int {
size := 0

for _, bucket := range pc.proofsByNonceBuckets {
size += bucket.size()
}

return size
}

// ProofsByHashSize -
func (pc *proofsCache) ProofsByHashSize() int {
return len(pc.proofsByHash)
}

// AddProof -
func (pc *proofsCache) AddProof(proof data.HeaderProofHandler) {
pc.addProof(proof)
}

// CleanupProofsBehindNonce -
func (pc *proofsCache) CleanupProofsBehindNonce(nonce uint64) {
pc.cleanupProofsBehindNonce(nonce)
}
44 changes: 44 additions & 0 deletions dataRetriever/dataPool/proofsCache/proofsBucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package proofscache

import "github.com/multiversx/mx-chain-core-go/data"

type proofNonceBucket struct {
maxNonce uint64
proofsByNonce []*proofNonceMapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use a map then the upsert would not need to iterate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use map

bucketSize int
}

func newProofBucket(bucketSize int) *proofNonceBucket {
return &proofNonceBucket{
proofsByNonce: make([]*proofNonceMapping, 0),
bucketSize: bucketSize,
}
}

func (p *proofNonceBucket) size() int {
return len(p.proofsByNonce)
}

func (p *proofNonceBucket) isFull() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use fixed size and deterministic range for each bucket.
e.g a proof with nonce x should be added into the bucket with first nonce x/bucketSize * bucketSize (integer division)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use fixed size buckets

return len(p.proofsByNonce) >= p.bucketSize
}

func (p *proofNonceBucket) insertInNew(proof data.HeaderProofHandler) {
p.insert(proof)
p.maxNonce = proof.GetHeaderNonce()
}

func (p *proofNonceBucket) insertInExisting(proof data.HeaderProofHandler) {
p.insert(proof)

if proof.GetHeaderNonce() > p.maxNonce {
p.maxNonce = proof.GetHeaderNonce()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge them into a single insert function or does it break the logic? If it was done for the purpose of optimizing the nonce check - the if proof.GetHeaderNonce() > p.maxNonce { is just a quick comparison (should be negligible).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outdated comment, fixed upon Sorin's suggestion.


func (p *proofNonceBucket) insert(proof data.HeaderProofHandler) {
p.proofsByNonce = append(p.proofsByNonce, &proofNonceMapping{
headerHash: string(proof.GetHeaderHash()),
nonce: proof.GetHeaderNonce(),
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move

if proof.GetHeaderNonce() > p.maxNonce {
	p.maxNonce = proof.GetHeaderNonce()
}

into insert method and only keep one method for all 3 operations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, pushed

96 changes: 68 additions & 28 deletions dataRetriever/dataPool/proofsCache/proofsCache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package proofscache

import (
"sort"
"sync"

"github.com/multiversx/mx-chain-core-go/data"
Expand All @@ -13,22 +12,26 @@ type proofNonceMapping struct {
}

type proofsCache struct {
mutProofsCache sync.RWMutex
proofsByNonce []*proofNonceMapping
proofsByHash map[string]data.HeaderProofHandler
mutProofsByNonce sync.RWMutex
proofsByNonceBuckets []*proofNonceBucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, maybe sync.Map is useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way proofsByNonceBuckets is implemented right now, it needs order, so it doesn't fit very well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, you are right. No maps here 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be a map as well with the key the lowest nonce that the bucket would hold

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use sync.Map

bucketSize int

proofsByHash map[string]data.HeaderProofHandler
mutProofsByHash sync.RWMutex
}

func newProofsCache() *proofsCache {
func newProofsCache(bucketSize int) *proofsCache {

return &proofsCache{
mutProofsCache: sync.RWMutex{},
proofsByNonce: make([]*proofNonceMapping, 0),
proofsByHash: make(map[string]data.HeaderProofHandler),
proofsByNonceBuckets: make([]*proofNonceBucket, 0),
bucketSize: bucketSize,
proofsByHash: make(map[string]data.HeaderProofHandler),
}
}

func (pc *proofsCache) getProofByHash(headerHash []byte) (data.HeaderProofHandler, error) {
pc.mutProofsCache.RLock()
defer pc.mutProofsCache.RUnlock()
pc.mutProofsByHash.RLock()
defer pc.mutProofsByHash.RUnlock()

proof, ok := pc.proofsByHash[string(headerHash)]
if !ok {
Expand All @@ -43,39 +46,76 @@ func (pc *proofsCache) addProof(proof data.HeaderProofHandler) {
return
}

pc.mutProofsCache.Lock()
defer pc.mutProofsCache.Unlock()
pc.insertProofByNonce(proof)

pc.proofsByNonce = append(pc.proofsByNonce, &proofNonceMapping{
headerHash: string(proof.GetHeaderHash()),
nonce: proof.GetHeaderNonce(),
})
pc.mutProofsByHash.Lock()
pc.proofsByHash[string(proof.GetHeaderHash())] = proof
pc.mutProofsByHash.Unlock()
Copy link
Contributor

@andreibancioiu andreibancioiu Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having these two locks separated does not seem to be an optimization (at least, not at a first glance). Since:

lock A
do ~trivial (& with short duration) logic around insertion in bucket
unlock A
lock B
do ~trivial (& with short duration) operation of adding a map entry
unlock B

Is not necessarily better than:

lock C
do ~trivial (& with short duration) logic around insertion in bucket
do ~trivial (& with short duration) operation of adding a map entry
unlock C

Especially if extreme concurrency (e.g. a lot of new insertions bootstrapped during each of the ~trivial two parts) isn't expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, updated to keep only one mutex

}

sort.Slice(pc.proofsByNonce, func(i, j int) bool {
return pc.proofsByNonce[i].nonce < pc.proofsByNonce[j].nonce
})
func (pc *proofsCache) insertProofByNonce(proof data.HeaderProofHandler) {
pc.mutProofsByNonce.Lock()
defer pc.mutProofsByNonce.Unlock()

pc.proofsByHash[string(proof.GetHeaderHash())] = proof
if len(pc.proofsByNonceBuckets) == 0 {
pc.insertInNewBucket(proof)
return
}

headBucket := pc.proofsByNonceBuckets[0]

if headBucket.isFull() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of syncing node this will cause it to keep accumulating new nonces in each old nonces buckets, which will delay their cleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new model with range buckets should cover this case

pc.insertInNewBucket(proof)
return
}

headBucket.insertInExisting(proof)
}

func (pc *proofsCache) insertInNewBucket(proof data.HeaderProofHandler) {
bucket := newProofBucket(pc.bucketSize)
bucket.insertInNew(proof)

pc.proofsByNonceBuckets = append([]*proofNonceBucket{bucket}, pc.proofsByNonceBuckets...)
}

func (pc *proofsCache) cleanupProofsBehindNonce(nonce uint64) {
if nonce == 0 {
return
}

pc.mutProofsCache.Lock()
defer pc.mutProofsCache.Unlock()
pc.mutProofsByNonce.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If cleanup procedure does not happen extremely often (~rare event), maybe both locks can happen one near the other - e.g. lock & unlock mutProofsByHash for all buckets at once (I think this was Sorin's internal suggestion).

defer pc.mutProofsByNonce.Unlock()

buckets := make([]*proofNonceBucket, 0)

wg := &sync.WaitGroup{}

for _, bucket := range pc.proofsByNonceBuckets {
if nonce > bucket.maxNonce {
wg.Add(1)

proofsByNonce := make([]*proofNonceMapping, 0)
go func(bucket *proofNonceBucket) {
pc.cleanupProofsInBucket(bucket)
wg.Done()
}(bucket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe allow the cleanup to happen synchronously? I see the workload is lock, delete from map in a loop, unlock. Do we have a speed gain using concurrency in this context (since map deletions are not quite subject to parallelization)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed concurrency here


for _, proofInfo := range pc.proofsByNonce {
if proofInfo.nonce < nonce {
delete(pc.proofsByHash, proofInfo.headerHash)
continue
}

proofsByNonce = append(proofsByNonce, proofInfo)
buckets = append(buckets, bucket)
}

pc.proofsByNonce = proofsByNonce
wg.Wait()

pc.proofsByNonceBuckets = buckets
}

func (pc *proofsCache) cleanupProofsInBucket(bucket *proofNonceBucket) {
pc.mutProofsByHash.Lock()
defer pc.mutProofsByHash.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller function also locks (sorry if I'm mistaken).

Copy link
Contributor Author

@ssd04 ssd04 Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 2 separate locks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, my bad / overlooked!


for _, proofInfo := range bucket.proofsByNonce {
delete(pc.proofsByHash, proofInfo.headerHash)
}
}
66 changes: 66 additions & 0 deletions dataRetriever/dataPool/proofsCache/proofsCache_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package proofscache_test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move the test to package proofscache, maybe we can drop some code from the export_test file. Can also stay as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would keep it in _test package since there are other useful functions from test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we plan to make this component more generic and to export it in other packages as well (for headers pool)


import (
"fmt"
"testing"

proofscache "github.com/multiversx/mx-chain-go/dataRetriever/dataPool/proofsCache"
)

func Benchmark_AddProof_Bucket10_Pool1000(b *testing.B) {
benchmarkAddProof(b, 10, 1000)
}

func Benchmark_AddProof_Bucket100_Pool10000(b *testing.B) {
benchmarkAddProof(b, 100, 10000)
}

func Benchmark_AddProof_Bucket1000_Pool100000(b *testing.B) {
benchmarkAddProof(b, 1000, 100000)
}
Comment on lines +10 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add some output / statistics in the PR description.


func benchmarkAddProof(b *testing.B, bucketSize int, nonceRange int) {
pc := proofscache.NewProofsCache(bucketSize)

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
proof := generateProof()
nonce := generateRandomNonce(int64(nonceRange))

proof.HeaderNonce = nonce
proof.HeaderHash = []byte("hash_" + fmt.Sprintf("%d", nonce))
b.StartTimer()

pc.AddProof(proof)
}
}

func Benchmark_CleanupProofs_Bucket10_Pool1000(b *testing.B) {
benchmarkCleanupProofs(b, 10, 1000)
}

func Benchmark_CleanupProofs_Bucket100_Pool10000(b *testing.B) {
benchmarkCleanupProofs(b, 100, 10000)
}

func Benchmark_CleanupProofs_Bucket1000_Pool100000(b *testing.B) {
benchmarkCleanupProofs(b, 1000, 100000)
}

func benchmarkCleanupProofs(b *testing.B, bucketSize int, nonceRange int) {
pc := proofscache.NewProofsCache(bucketSize)

for i := uint64(0); i < uint64(nonceRange); i++ {
proof := generateProof()
proof.HeaderNonce = i
proof.HeaderHash = []byte("hash_" + fmt.Sprintf("%d", i))

pc.AddProof(proof)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
pc.CleanupProofsBehindNonce(uint64(nonceRange))
}
}
Loading