Skip to content

Commit 0295c81

Browse files
authored
concurrent tree insert (#41)
1 parent f013f1b commit 0295c81

File tree

1 file changed

+64
-78
lines changed

1 file changed

+64
-78
lines changed

cmd/geth/converkle.go

Lines changed: 64 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ import (
2626
"net/http"
2727
_ "net/http/pprof"
2828
"os"
29+
"runtime"
2930
"sort"
3031
"sync"
32+
"sync/atomic"
3133
"time"
3234
"unsafe"
3335

@@ -403,7 +405,7 @@ func doFileSorting(ctx *cli.Context) error {
403405
for id := 0; ; id++ {
404406
idxFile := fmt.Sprintf("index-%02d.verkle", id)
405407
if _, err := os.Stat(idxFile); err != nil {
406-
return err
408+
break
407409
}
408410
log.Info("Processing indexfile", "name", idxFile)
409411
data, err := os.ReadFile(idxFile)
@@ -420,75 +422,37 @@ func doFileSorting(ctx *cli.Context) error {
420422
return nil
421423
}
422424

423-
func readDataDump(itemCh chan group, abortCh chan struct{}) error {
425+
func readDataDump(itemCh chan group, abortCh chan struct{}, cpuNumber int) error {
424426
dataFile, err := os.Open("dump-00.verkle")
425427
if err != nil {
426428
return err
427429
}
428430
defer dataFile.Close()
429431

430-
var (
431-
indexFiles []*os.File
432-
recordList []Index
433-
eofList []bool
434-
//count = 0
435-
)
436432
// open all the files and read the first record of each
437-
for i := 0; ; i++ {
438-
idxFile := fmt.Sprintf("index-%02d.verkle", i)
439-
if _, err := os.Stat(idxFile); err != nil {
440-
break // no more files
441-
}
442-
if f, err := os.Open(idxFile); err != nil {
443-
return err
444-
} else {
445-
indexFiles = append(indexFiles, f)
446-
eofList = append(eofList, false)
447-
recordList = append(recordList, Index{})
448-
}
449-
err = binary.Read(indexFiles[i], binary.LittleEndian, &recordList[i])
450-
eofList[i] = err == io.EOF
433+
idxFile, err := os.Open(fmt.Sprintf("index-%02d.verkle", cpuNumber))
434+
if err != nil {
435+
return err
451436
}
452-
defer func() {
453-
for _, f := range indexFiles {
454-
f.Close()
455-
}
456-
}()
457-
458-
for {
459-
smallest := -1
460-
done := true
461-
for i, _ := range indexFiles {
462-
if eofList[i] {
463-
continue
464-
}
465-
done = false
466-
if smallest == -1 || bytes.Compare(recordList[i].Stem[:], recordList[smallest].Stem[:]) < 0 {
467-
smallest = i
468-
}
469-
}
470-
if done {
471-
break
472-
}
473-
dataFile.Seek(int64(recordList[smallest].Offset), io.SeekStart)
474-
valuesSerializedCompressed := make([]byte, recordList[smallest].Size)
437+
defer idxFile.Close()
438+
var idx Index
439+
err = binary.Read(idxFile, binary.LittleEndian, &idx)
440+
for err != io.EOF {
441+
dataFile.Seek(int64(idx.Offset), io.SeekStart)
442+
valuesSerializedCompressed := make([]byte, idx.Size)
475443
n, err := dataFile.Read(valuesSerializedCompressed)
476-
if err != nil || uint32(n) != recordList[smallest].Size {
477-
return fmt.Errorf("error reading data: %w size=%d != %d", err, n, recordList[smallest].Size)
444+
if err != nil || uint32(n) != idx.Size {
445+
return fmt.Errorf("error reading data: %w size=%d != %d", err, n, idx.Size)
478446
}
479447
data, err := snappy.Decode(nil, valuesSerializedCompressed)
480448
var element group
481449
rlp.DecodeBytes(data, &element.values)
482450

483-
copy(element.stem[:], recordList[smallest].Stem[:])
451+
copy(element.stem[:], idx.Stem[:])
484452
// pass the data
485453
itemCh <- element
486454
// read next index
487-
err = binary.Read(indexFiles[smallest], binary.LittleEndian, &recordList[smallest])
488-
if err != nil && err != io.EOF {
489-
return err
490-
}
491-
eofList[smallest] = err == io.EOF
455+
err = binary.Read(idxFile, binary.LittleEndian, &idx)
492456

493457
select {
494458
case <-abortCh:
@@ -506,25 +470,29 @@ func doInsertion(ctx *cli.Context) error {
506470
var (
507471
start = time.Now()
508472
lastReport time.Time
509-
itemCh = make(chan group, 1000)
473+
itemChs = make([]chan group, runtime.NumCPU())
510474
abortCh = make(chan struct{})
511475
wg sync.WaitGroup
512-
count = 0
513-
root = verkle.New()
476+
count = uint64(0)
514477
)
515-
wg.Add(1)
516-
go func() {
517-
defer wg.Done()
518-
if err := readDataDump(itemCh, abortCh); err != nil {
519-
log.Error("Error reading data", "err", err)
520-
}
521-
close(itemCh)
522-
}()
523478
defer close(abortCh)
479+
wg.Add(runtime.NumCPU())
480+
for numCPU := range itemChs {
481+
itemChs[numCPU] = make(chan group, 1000)
482+
itemCh := itemChs[numCPU]
483+
i := numCPU
484+
go func() {
485+
defer wg.Done()
486+
if err := readDataDump(itemCh, abortCh, i); err != nil {
487+
log.Error("Error reading data", "err", err)
488+
}
489+
close(itemCh)
490+
}()
491+
}
524492

525493
convdb, err := rawdb.NewLevelDBDatabase("verkle", 128, 128, "", false)
526494
if err != nil {
527-
panic(err)
495+
return err
528496
}
529497

530498
flushCh := make(chan verkle.VerkleNode)
@@ -545,21 +513,39 @@ func doInsertion(ctx *cli.Context) error {
545513
}
546514
}()
547515

548-
for elem := range itemCh {
516+
subRoots := make([]*verkle.InternalNode, runtime.NumCPU())
517+
for i := range itemChs {
518+
wg.Add(1)
519+
subRoots[i] = verkle.New().(*verkle.InternalNode)
520+
521+
// save references for the goroutine to capture
522+
root := subRoots[i]
523+
itemCh := itemChs[i]
524+
525+
go func() {
526+
for elem := range itemCh {
527+
var st = make([]byte, 31)
528+
copy(st, elem.stem[:])
529+
leaf := verkle.NewLeafNode(st, elem.values)
530+
leaf.ComputeCommitment()
531+
err = root.InsertStemOrdered(st, leaf, saveverkle)
532+
if err != nil {
533+
panic(err)
534+
}
535+
atomic.AddUint64(&count, 1)
536+
if time.Since(lastReport) > time.Second*8 {
537+
log.Info("Traversing state", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
538+
lastReport = time.Now()
539+
}
540+
}
549541

550-
if time.Since(lastReport) > time.Second*8 {
551-
log.Info("Inserting nodes", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
552-
lastReport = time.Now()
553-
}
554-
var st = make([]byte, 31)
555-
copy(st, elem.stem[:])
556-
leaf := verkle.NewLeafNode(st, elem.values)
557-
if err := root.(*verkle.InternalNode).InsertStemOrdered(st, leaf, saveverkle); err != nil {
558-
log.Warn("Error during insert", "stem", fmt.Sprintf("%x", elem.stem), err, err)
559-
return err
560-
}
561-
count++
542+
wg.Done()
543+
}()
562544
}
545+
wg.Wait()
546+
root := verkle.MergeTrees(subRoots)
547+
root.ComputeCommitment()
548+
root.(*verkle.InternalNode).Flush(saveverkle)
563549
close(flushCh)
564550
log.Info("Insertion done", "elems", count, "root commitment", fmt.Sprintf("%x", root.ComputeCommitment().Bytes()), "elapsed", common.PrettyDuration(time.Since(start)))
565551
return nil

0 commit comments

Comments
 (0)