From c278fab5cba09131c600408abfd6b32580d7db0c Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 19 Nov 2024 15:46:46 -0800 Subject: [PATCH 01/18] same as coreth trie-prefetcher-alt --- core/blockchain.go | 9 +- core/state/prefetcher_database.go | 208 +++++++++ core/state/statedb.go | 4 +- core/state/trie_prefetcher.go | 533 ++++++----------------- core/state/trie_prefetcher_extra_test.go | 199 +++++++++ core/state/trie_prefetcher_test.go | 9 +- miner/worker.go | 2 +- 7 files changed, 566 insertions(+), 398 deletions(-) create mode 100644 core/state/prefetcher_database.go create mode 100644 core/state/trie_prefetcher_extra_test.go diff --git a/core/blockchain.go b/core/blockchain.go index 7fc6c62688..689d07e6a2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1377,7 +1377,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism) + statedb.StartPrefetcher("chain") activeState = statedb // Process block using the parent state as reference point @@ -1736,7 +1736,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism) + statedb.StartPrefetcher("chain") defer func() { statedb.StopPrefetcher() }() @@ -2134,7 +2134,10 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error { bc.hc.SetCurrentHeader(block.Header()) lastAcceptedHash := block.Hash() - bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb) + bc.stateCache = state.WithPrefetcher( + state.NewDatabaseWithNodeDB(bc.db, bc.triedb), + bc.cacheConfig.TriePrefetcherParallelism, + ) if err := bc.loadLastState(lastAcceptedHash); err != nil { return err diff --git a/core/state/prefetcher_database.go b/core/state/prefetcher_database.go new file mode 100644 index 0000000000..438cb1ebab --- /dev/null +++ b/core/state/prefetcher_database.go @@ -0,0 +1,208 @@ +// (c) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package state + +import ( + "sync" + + "github.com/ava-labs/subnet-evm/utils" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +// PrefetcherDB is an interface that extends Database with additional methods +// used in trie_prefetcher. This includes specific methods for prefetching +// accounts and storage slots, (which may be non-blocking and/or parallelized) +// and methods to wait for pending prefetches. +type PrefetcherDB interface { + // From Database + OpenTrie(root common.Hash) (Trie, error) + OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, trie Trie) (Trie, error) + CopyTrie(t Trie) Trie + + // Additional methods + PrefetchAccount(t Trie, address common.Address) + PrefetchStorage(t Trie, address common.Address, key []byte) + CanPrefetchDuringShutdown() bool + WaitTrie(t Trie) + Close() +} + +// withPrefetcher is an optional interface that a Database can implement to +// signal PrefetcherDB() should be called to get a Database for use in +// trie_prefetcher. Each call to PrefetcherDB() should return a new +// PrefetcherDB instance. +type withPrefetcherDB interface { + PrefetcherDB() PrefetcherDB +} + +type withPrefetcher struct { + Database + maxConcurrency int +} + +func (db *withPrefetcher) PrefetcherDB() PrefetcherDB { + return newPrefetcherDatabase(db.Database, db.maxConcurrency) +} + +func WithPrefetcher(db Database, maxConcurrency int) Database { + return &withPrefetcher{db, maxConcurrency} +} + +// withPrefetcherDefaults extends Database and implements PrefetcherDB by adding +// default implementations for PrefetchAccount and PrefetchStorage that read the +// account and storage slot from the trie. +type withPrefetcherDefaults struct { + Database +} + +func (withPrefetcherDefaults) PrefetchAccount(t Trie, address common.Address) { + _, _ = t.GetAccount(address) +} + +func (withPrefetcherDefaults) PrefetchStorage(t Trie, address common.Address, key []byte) { + _, _ = t.GetStorage(address, key) +} + +func (withPrefetcherDefaults) CanPrefetchDuringShutdown() bool { return false } +func (withPrefetcherDefaults) WaitTrie(Trie) {} +func (withPrefetcherDefaults) Close() {} + +type prefetcherDatabase struct { + Database + + maxConcurrency int + workers *utils.BoundedWorkers +} + +func newPrefetcherDatabase(db Database, maxConcurrency int) *prefetcherDatabase { + return &prefetcherDatabase{ + Database: db, + maxConcurrency: maxConcurrency, + workers: utils.NewBoundedWorkers(maxConcurrency), + } +} + +func (p *prefetcherDatabase) OpenTrie(root common.Hash) (Trie, error) { + trie, err := p.Database.OpenTrie(root) + return newPrefetcherTrie(p, trie), err +} + +func (p *prefetcherDatabase) OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, trie Trie) (Trie, error) { + storageTrie, err := p.Database.OpenStorageTrie(stateRoot, address, root, trie) + return newPrefetcherTrie(p, storageTrie), err +} + +func (p *prefetcherDatabase) CopyTrie(t Trie) Trie { + switch t := t.(type) { + case *prefetcherTrie: + return t.getCopy() + default: + return p.Database.CopyTrie(t) + } +} + +// PrefetchAccount should only be called on a trie returned from OpenTrie or OpenStorageTrie +func (*prefetcherDatabase) PrefetchAccount(t Trie, address common.Address) { + t.(*prefetcherTrie).PrefetchAccount(address) +} + +// PrefetchStorage should only be called on a trie returned from OpenTrie or OpenStorageTrie +func (*prefetcherDatabase) PrefetchStorage(t Trie, address common.Address, key []byte) { + t.(*prefetcherTrie).PrefetchStorage(address, key) +} + +// WaitTrie should only be called on a trie returned from OpenTrie or OpenStorageTrie +func (*prefetcherDatabase) WaitTrie(t Trie) { + t.(*prefetcherTrie).Wait() +} + +func (p *prefetcherDatabase) Close() { + p.workers.Wait() +} + +func (p *prefetcherDatabase) CanPrefetchDuringShutdown() bool { + return true +} + +type prefetcherTrie struct { + p *prefetcherDatabase + + Trie + copyLock sync.Mutex + + copies chan Trie + wg sync.WaitGroup +} + +// newPrefetcherTrie returns a new prefetcherTrie that wraps the given trie. +// prefetcherTrie prefetches accounts and storage slots in parallel, using +// bounded workers from the prefetcherDatabase. As Trie is not safe for +// concurrent access, each prefetch operation uses a copy. The copy is kept in +// a buffered channel for reuse. +func newPrefetcherTrie(p *prefetcherDatabase, t Trie) *prefetcherTrie { + prefetcher := &prefetcherTrie{ + p: p, + Trie: t, + copies: make(chan Trie, p.maxConcurrency), + } + prefetcher.copies <- prefetcher.getCopy() + return prefetcher +} + +func (p *prefetcherTrie) Wait() { + p.wg.Wait() +} + +// getCopy returns a copy of the trie. The copy is taken from the copies channel +// if available, otherwise a new copy is created. +func (p *prefetcherTrie) getCopy() Trie { + select { + case copy := <-p.copies: + return copy + default: + p.copyLock.Lock() + defer p.copyLock.Unlock() + return p.p.Database.CopyTrie(p.Trie) + } +} + +// putCopy keeps the copy for future use. If the buffer is full, the copy is +// discarded. +func (p *prefetcherTrie) putCopy(copy Trie) { + select { + case p.copies <- copy: + default: + } +} + +func (p *prefetcherTrie) PrefetchAccount(address common.Address) { + p.wg.Add(1) + f := func() { + defer p.wg.Done() + + tr := p.getCopy() + _, err := tr.GetAccount(address) + if err != nil { + log.Error("GetAccount failed in prefetcher", "err", err) + } + p.putCopy(tr) + } + p.p.workers.Execute(f) +} + +func (p *prefetcherTrie) PrefetchStorage(address common.Address, key []byte) { + p.wg.Add(1) + f := func() { + defer p.wg.Done() + + tr := p.getCopy() + _, err := tr.GetStorage(address, key) + if err != nil { + log.Error("GetStorage failed in prefetcher", "err", err) + } + p.putCopy(tr) + } + p.p.workers.Execute(f) +} diff --git a/core/state/statedb.go b/core/state/statedb.go index e73cf9accb..7a4c6faf7e 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -203,13 +203,13 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. -func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) { +func (s *StateDB) StartPrefetcher(namespace string) { if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil } if s.snap != nil { - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, maxConcurrency) + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) } } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 6c6ddeab07..ae7cf457cc 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -28,16 +28,16 @@ package state import ( "sync" - "time" "github.com/ava-labs/subnet-evm/metrics" - "github.com/ava-labs/subnet-evm/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) -// triePrefetchMetricsPrefix is the prefix under which to publish the metrics. -const triePrefetchMetricsPrefix = "trie/prefetch/" +var ( + // triePrefetchMetricsPrefix is the prefix under which to publish the metrics. + triePrefetchMetricsPrefix = "trie/prefetch/" +) // triePrefetcher is an active prefetcher, which receives accounts or storage // items and does trie-loading of them. The goal is to get as much useful content @@ -45,96 +45,68 @@ const triePrefetchMetricsPrefix = "trie/prefetch/" // // Note, the prefetcher's API is not thread safe. type triePrefetcher struct { - db Database // Database to fetch trie nodes through + db PrefetcherDB // Database to fetch trie nodes through root common.Hash // Root hash of the account trie for metrics fetches map[string]Trie // Partially or fully fetched tries. Only populated for inactive copies. fetchers map[string]*subfetcher // Subfetchers for each trie - maxConcurrency int - workers *utils.BoundedWorkers - - subfetcherWorkersMeter metrics.Meter - subfetcherWaitTimer metrics.Counter - subfetcherCopiesMeter metrics.Meter - + deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter accountSkipMeter metrics.Meter accountWasteMeter metrics.Meter - - storageFetchersMeter metrics.Meter - storageLoadMeter metrics.Meter - storageLargestLoadMeter metrics.Meter - storageDupMeter metrics.Meter - storageSkipMeter metrics.Meter - storageWasteMeter metrics.Meter + storageLoadMeter metrics.Meter + storageDupMeter metrics.Meter + storageSkipMeter metrics.Meter + storageWasteMeter metrics.Meter } -func newTriePrefetcher(db Database, root common.Hash, namespace string, maxConcurrency int) *triePrefetcher { +func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { + var pdb PrefetcherDB + pdb = withPrefetcherDefaults{db} + if db, ok := db.(withPrefetcherDB); ok { + pdb = db.PrefetcherDB() + } prefix := triePrefetchMetricsPrefix + namespace - return &triePrefetcher{ - db: db, + p := &triePrefetcher{ + db: pdb, root: root, fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map - maxConcurrency: maxConcurrency, - workers: utils.NewBoundedWorkers(maxConcurrency), // Scale up as needed to [maxConcurrency] - - subfetcherWorkersMeter: metrics.GetOrRegisterMeter(prefix+"/subfetcher/workers", nil), - subfetcherWaitTimer: metrics.GetOrRegisterCounter(prefix+"/subfetcher/wait", nil), - subfetcherCopiesMeter: metrics.GetOrRegisterMeter(prefix+"/subfetcher/copies", nil), - + deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), - - storageFetchersMeter: metrics.GetOrRegisterMeter(prefix+"/storage/fetchers", nil), - storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), - storageLargestLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/lload", nil), - storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), - storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), - storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), + storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), + storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), + storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } + return p } // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { - // If the prefetcher is an inactive one, bail out - if p.fetches != nil { - return - } - - // Collect stats from all fetchers - var ( - storageFetchers int64 - largestLoad int64 - ) + defer p.db.Close() for _, fetcher := range p.fetchers { - fetcher.abort() // safe to call multiple times (should be a no-op on happy path) + fetcher.abort() // safe to do multiple times if metrics.Enabled { - p.subfetcherCopiesMeter.Mark(int64(fetcher.copies())) - if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) p.accountDupMeter.Mark(int64(fetcher.dups)) - p.accountSkipMeter.Mark(int64(fetcher.skips())) + p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) for _, key := range fetcher.used { delete(fetcher.seen, string(key)) } p.accountWasteMeter.Mark(int64(len(fetcher.seen))) } else { - storageFetchers++ - oseen := int64(len(fetcher.seen)) - if oseen > largestLoad { - largestLoad = oseen - } - p.storageLoadMeter.Mark(oseen) + p.storageLoadMeter.Mark(int64(len(fetcher.seen))) p.storageDupMeter.Mark(int64(fetcher.dups)) - p.storageSkipMeter.Mark(int64(fetcher.skips())) + p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) for _, key := range fetcher.used { delete(fetcher.seen, string(key)) @@ -143,20 +115,6 @@ func (p *triePrefetcher) close() { } } } - if metrics.Enabled { - p.storageFetchersMeter.Mark(storageFetchers) - p.storageLargestLoadMeter.Mark(largestLoad) - } - - // Stop all workers once fetchers are aborted (otherwise - // could stop while waiting) - // - // Record number of workers that were spawned during this run - workersUsed := int64(p.workers.Wait()) - if metrics.Enabled { - p.subfetcherWorkersMeter.Mark(workersUsed) - } - // Clear out all fetchers (will crash on a second call, deliberate) p.fetchers = nil } @@ -169,23 +127,17 @@ func (p *triePrefetcher) copy() *triePrefetcher { copy := &triePrefetcher{ db: p.db, root: p.root, - fetches: make(map[string]Trie), // Active prefetchers use the fetchers map - - subfetcherWorkersMeter: p.subfetcherWorkersMeter, - subfetcherWaitTimer: p.subfetcherWaitTimer, - subfetcherCopiesMeter: p.subfetcherCopiesMeter, + fetches: make(map[string]Trie), // Active prefetchers use the fetches map + deliveryMissMeter: p.deliveryMissMeter, accountLoadMeter: p.accountLoadMeter, accountDupMeter: p.accountDupMeter, accountSkipMeter: p.accountSkipMeter, accountWasteMeter: p.accountWasteMeter, - - storageFetchersMeter: p.storageFetchersMeter, - storageLoadMeter: p.storageLoadMeter, - storageLargestLoadMeter: p.storageLargestLoadMeter, - storageDupMeter: p.storageDupMeter, - storageSkipMeter: p.storageSkipMeter, - storageWasteMeter: p.storageWasteMeter, + storageLoadMeter: p.storageLoadMeter, + storageDupMeter: p.storageDupMeter, + storageSkipMeter: p.storageSkipMeter, + storageWasteMeter: p.storageWasteMeter, } // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { @@ -210,12 +162,11 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm if p.fetches != nil { return } - // Active fetcher, schedule the retrievals id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { - fetcher = newSubfetcher(p, owner, root, addr) + fetcher = newSubfetcher(p.db, p.root, owner, root, addr) p.fetchers[id] = fetcher } fetcher.schedule(keys) @@ -229,27 +180,24 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { if p.fetches != nil { trie := p.fetches[id] if trie == nil { + p.deliveryMissMeter.Mark(1) return nil } return p.db.CopyTrie(trie) } - // Otherwise the prefetcher is active, bail if no trie was prefetched for this root fetcher := p.fetchers[id] if fetcher == nil { + p.deliveryMissMeter.Mark(1) return nil } + // Interrupt the prefetcher if it's by any chance still running and return + // a copy of any pre-loaded trie. + fetcher.abort() // safe to do multiple times - // Wait for the fetcher to finish and shutdown orchestrator, if it exists - start := time.Now() - fetcher.wait() - if metrics.Enabled { - p.subfetcherWaitTimer.Inc(time.Since(start).Milliseconds()) - } - - // Return a copy of one of the prefetched tries trie := fetcher.peek() if trie == nil { + p.deliveryMissMeter.Mark(1) return nil } return trie @@ -276,15 +224,20 @@ func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string { // main prefetcher is paused and either all requested items are processed or if // the trie being worked on is retrieved from the prefetcher. type subfetcher struct { - p *triePrefetcher - - db Database // Database to load trie nodes through + db PrefetcherDB // Database to load trie nodes through state common.Hash // Root hash of the state to prefetch owner common.Hash // Owner of the trie, usually account hash root common.Hash // Root hash of the trie to prefetch addr common.Address // Address of the account that the trie belongs to + trie Trie // Trie being populated with nodes + + tasks [][]byte // Items queued up for retrieval + lock sync.Mutex // Lock protecting the task queue - to *trieOrchestrator // Orchestrate concurrent fetching of a single trie + wake chan struct{} // Wake channel if a new task is scheduled + stop chan struct{} // Channel to interrupt processing + term chan struct{} // Channel to signal interruption + copy chan chan Trie // Channel to request a copy of the current trie seen map[string]struct{} // Tracks the entries already loaded dups int // Number of duplicate preload tasks @@ -293,348 +246,150 @@ type subfetcher struct { // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. -func newSubfetcher(p *triePrefetcher, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { +func newSubfetcher(db PrefetcherDB, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { sf := &subfetcher{ - p: p, - db: p.db, - state: p.root, + db: db, + state: state, owner: owner, root: root, addr: addr, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + term: make(chan struct{}), + copy: make(chan chan Trie), seen: make(map[string]struct{}), } - sf.to = newTrieOrchestrator(sf) - if sf.to != nil { - go sf.to.processTasks() - } - // We return [sf] here to ensure we don't try to re-create if - // we aren't able to setup a [newTrieOrchestrator] the first time. + go sf.loop() return sf } // schedule adds a batch of trie keys to the queue to prefetch. -// This should never block, so an array is used instead of a channel. -// -// This is not thread-safe. func (sf *subfetcher) schedule(keys [][]byte) { // Append the tasks to the current queue - tasks := make([][]byte, 0, len(keys)) - for _, key := range keys { - // Check if keys already seen - sk := string(key) - if _, ok := sf.seen[sk]; ok { - sf.dups++ - continue - } - sf.seen[sk] = struct{}{} - tasks = append(tasks, key) - } + sf.lock.Lock() + sf.tasks = append(sf.tasks, keys...) + sf.lock.Unlock() - // After counting keys, exit if they can't be prefetched - if sf.to == nil { - return + // Notify the prefetcher, it's fine if it's already terminated + select { + case sf.wake <- struct{}{}: + default: } - - // Add tasks to queue for prefetching - sf.to.enqueueTasks(tasks) } // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it // is currently. func (sf *subfetcher) peek() Trie { - if sf.to == nil { - return nil - } - return sf.to.copyBase() -} + ch := make(chan Trie) + select { + case sf.copy <- ch: + // Subfetcher still alive, return copy from it + return <-ch -// wait must only be called if [triePrefetcher] has not been closed. If this happens, -// workers will not finish. -func (sf *subfetcher) wait() { - if sf.to == nil { - // Unable to open trie - return + case <-sf.term: + // Subfetcher already terminated, return a copy directly + if sf.trie == nil { + return nil + } + return sf.db.CopyTrie(sf.trie) } - sf.to.wait() } +// abort interrupts the subfetcher immediately. It is safe to call abort multiple +// times but it is not thread safe. func (sf *subfetcher) abort() { - if sf.to == nil { - // Unable to open trie - return - } - sf.to.abort() -} - -func (sf *subfetcher) skips() int { - if sf.to == nil { - // Unable to open trie - return 0 - } - return sf.to.skipCount() -} - -func (sf *subfetcher) copies() int { - if sf.to == nil { - // Unable to open trie - return 0 + select { + case <-sf.stop: + default: + close(sf.stop) } - return sf.to.copies + <-sf.term } -// trieOrchestrator is not thread-safe. -type trieOrchestrator struct { - sf *subfetcher - - // base is an unmodified Trie we keep for - // creating copies for each worker goroutine. - // - // We care more about quick copies than good copies - // because most (if not all) of the nodes that will be populated - // in the copy will come from the underlying triedb cache. Ones - // that don't come from this cache probably had to be fetched - // from disk anyways. - base Trie - baseLock sync.Mutex - - tasksAllowed bool - skips int // number of tasks skipped - pendingTasks [][]byte - taskLock sync.Mutex - - processingTasks sync.WaitGroup - - wake chan struct{} - stop chan struct{} - stopOnce sync.Once - loopTerm chan struct{} - - copies int - copyChan chan Trie - copySpawner chan struct{} -} +// loop waits for new tasks to be scheduled and keeps loading them until it runs +// out of tasks or its underlying trie is retrieved for committing. +func (sf *subfetcher) loop() { + // No matter how the loop stops, signal anyone waiting that it's terminated + defer close(sf.term) -func newTrieOrchestrator(sf *subfetcher) *trieOrchestrator { // Start by opening the trie and stop processing if it fails - var ( - base Trie - err error - ) if sf.owner == (common.Hash{}) { - base, err = sf.db.OpenTrie(sf.root) + trie, err := sf.db.OpenTrie(sf.root) if err != nil { log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return nil + return } + sf.trie = trie } else { // The trie argument can be nil as verkle doesn't support prefetching // yet. TODO FIX IT(rjl493456442), otherwise code will panic here. - base, err = sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil) + trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil) if err != nil { log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return nil + return } + sf.trie = trie } - - // Instantiate trieOrchestrator - to := &trieOrchestrator{ - sf: sf, - base: base, - - tasksAllowed: true, - wake: make(chan struct{}, 1), - stop: make(chan struct{}), - loopTerm: make(chan struct{}), - - copyChan: make(chan Trie, sf.p.maxConcurrency), - copySpawner: make(chan struct{}, sf.p.maxConcurrency), - } - - // Create initial trie copy - to.copies++ - to.copySpawner <- struct{}{} - to.copyChan <- to.copyBase() - return to -} - -func (to *trieOrchestrator) copyBase() Trie { - to.baseLock.Lock() - defer to.baseLock.Unlock() - - return to.sf.db.CopyTrie(to.base) -} - -func (to *trieOrchestrator) skipCount() int { - to.taskLock.Lock() - defer to.taskLock.Unlock() - - return to.skips -} - -func (to *trieOrchestrator) enqueueTasks(tasks [][]byte) { - to.taskLock.Lock() - defer to.taskLock.Unlock() - - if len(tasks) == 0 { - return - } - - // Add tasks to [pendingTasks] - if !to.tasksAllowed { - to.skips += len(tasks) - return - } - to.processingTasks.Add(len(tasks)) - to.pendingTasks = append(to.pendingTasks, tasks...) - - // Wake up processor - select { - case to.wake <- struct{}{}: - default: + handleTask := func(task []byte) { + if _, ok := sf.seen[string(task)]; ok { + sf.dups++ + } else { + if len(task) == common.AddressLength { + sf.db.PrefetchAccount(sf.trie, common.BytesToAddress(task)) + } else { + sf.db.PrefetchStorage(sf.trie, sf.addr, task) + } + sf.seen[string(task)] = struct{}{} + } } -} - -func (to *trieOrchestrator) handleStop(remaining int) { - to.taskLock.Lock() - to.skips += remaining - to.taskLock.Unlock() - to.processingTasks.Add(-remaining) -} + defer func() { + if sf.db.CanPrefetchDuringShutdown() { + for _, task := range sf.tasks { + handleTask(task) + } + sf.tasks = nil + } -func (to *trieOrchestrator) processTasks() { - defer close(to.loopTerm) + sf.db.WaitTrie(sf.trie) + }() + // Trie opened successfully, keep prefetching items for { - // Determine if we should process or exit select { - case <-to.wake: - case <-to.stop: - return - } - - // Get current tasks - to.taskLock.Lock() - tasks := to.pendingTasks - to.pendingTasks = nil - to.taskLock.Unlock() - - // Enqueue more work as soon as trie copies are available - lt := len(tasks) - for i := 0; i < lt; i++ { - // Try to stop as soon as possible, if channel is closed - remaining := lt - i - select { - case <-to.stop: - to.handleStop(remaining) - return - default: - } - - // Try to create to get an active copy first (select is non-deterministic, - // so we may end up creating a new copy when we don't need to) - var t Trie - select { - case t = <-to.copyChan: - default: - // Wait for an available copy or create one, if we weren't - // able to get a previously created copy + case <-sf.wake: + // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock + sf.lock.Lock() + tasks := sf.tasks + sf.tasks = nil + sf.lock.Unlock() + + // Prefetch any tasks until the loop is interrupted + for i, task := range tasks { select { - case <-to.stop: - to.handleStop(remaining) + case <-sf.stop: + // If termination is requested, add any leftover back and return + sf.lock.Lock() + sf.tasks = append(sf.tasks, tasks[i:]...) + sf.lock.Unlock() return - case t = <-to.copyChan: - case to.copySpawner <- struct{}{}: - to.copies++ - t = to.copyBase() - } - } - // Enqueue work, unless stopped. - fTask := tasks[i] - f := func() { - // Perform task - var err error - if len(fTask) == common.AddressLength { - _, err = t.GetAccount(common.BytesToAddress(fTask)) - } else { - _, err = t.GetStorage(to.sf.addr, fTask) - } - if err != nil { - log.Error("Trie prefetcher failed fetching", "root", to.sf.root, "err", err) - } - to.processingTasks.Done() + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) - // Return copy when we are done with it, so someone else can use it - // - // channel is buffered and will not block - to.copyChan <- t + default: + // No termination request yet, prefetch the next entry + handleTask(task) + } } - // Enqueue task for processing (may spawn new goroutine - // if not at [maxConcurrency]) - // - // If workers are stopped before calling [Execute], this function may - // panic. - to.sf.p.workers.Execute(f) - } - } -} - -func (to *trieOrchestrator) stopAcceptingTasks() { - to.taskLock.Lock() - defer to.taskLock.Unlock() + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) - if !to.tasksAllowed { - return + case <-sf.stop: + // Termination is requested, abort and leave remaining tasks + return + } } - to.tasksAllowed = false - - // We don't clear [to.pendingTasks] here because - // it will be faster to prefetch them even though we - // are still waiting. -} - -// wait stops accepting new tasks and waits for ongoing tasks to complete. If -// wait is called, it is not necessary to call [abort]. -// -// It is safe to call wait multiple times. -func (to *trieOrchestrator) wait() { - // Prevent more tasks from being enqueued - to.stopAcceptingTasks() - - // Wait for processing tasks to complete - to.processingTasks.Wait() - - // Stop orchestrator loop - to.stopOnce.Do(func() { - close(to.stop) - }) - <-to.loopTerm -} - -// abort stops any ongoing tasks and shuts down the orchestrator loop. If abort -// is called, it is not necessary to call [wait]. -// -// It is safe to call abort multiple times. -func (to *trieOrchestrator) abort() { - // Prevent more tasks from being enqueued - to.stopAcceptingTasks() - - // Stop orchestrator loop - to.stopOnce.Do(func() { - close(to.stop) - }) - <-to.loopTerm - - // Capture any dangling pending tasks (processTasks - // may exit before enqueing all pendingTasks) - to.taskLock.Lock() - pendingCount := len(to.pendingTasks) - to.skips += pendingCount - to.pendingTasks = nil - to.taskLock.Unlock() - to.processingTasks.Add(-pendingCount) - - // Wait for processing tasks to complete - to.processingTasks.Wait() } diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go new file mode 100644 index 0000000000..e6b8d730e1 --- /dev/null +++ b/core/state/trie_prefetcher_extra_test.go @@ -0,0 +1,199 @@ +// (c) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package state + +import ( + "crypto/rand" + "encoding/binary" + "fmt" + "os" + "path" + "strconv" + "testing" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/subnet-evm/core/rawdb" + "github.com/ava-labs/subnet-evm/core/state/snapshot" + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ava-labs/subnet-evm/metrics" + "github.com/ava-labs/subnet-evm/triedb" + "github.com/ava-labs/subnet-evm/triedb/hashdb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/stretchr/testify/require" +) + +const namespace = "chain" + +// Write a test to add 100m kvs to a leveldb so that we can test the prefetcher +// performance. + +func BenchmarkPrefetcherDatabase(b *testing.B) { + require := require.New(b) + + dir := b.TempDir() + if env := os.Getenv("TEST_DB_DIR"); env != "" { + dir = env + } + wantKVs := 100_000 + if env := os.Getenv("TEST_DB_KVS"); env != "" { + var err error + wantKVs, err = strconv.Atoi(env) + require.NoError(err) + } + + levelDB, err := rawdb.NewLevelDBDatabase(path.Join(dir, "level.db"), 0, 0, "", false) + require.NoError(err) + + root := types.EmptyRootHash + count := uint64(0) + block := uint64(0) + + rootKey := []byte("root") + countKey := []byte("count") + blockKey := []byte("block") + got, err := levelDB.Get(rootKey) + if err == nil { + root = common.BytesToHash(got) + } + got, err = levelDB.Get(countKey) + if err == nil { + count = binary.BigEndian.Uint64(got) + } + got, err = levelDB.Get(blockKey) + if err == nil { + block = binary.BigEndian.Uint64(got) + } + + // Make a trie on the levelDB + address1 := common.Address{42} + address2 := common.Address{43} + addBlock := func(db Database, snaps *snapshot.Tree, kvsPerBlock int, prefetchers int) (statedb *StateDB) { + statedb, root, err = addKVs(db, snaps, address1, address2, root, block, kvsPerBlock, prefetchers) + require.NoError(err) + count += uint64(kvsPerBlock) + block++ + + return statedb + } + + lastCommit := block + commit := func(levelDB ethdb.Database, snaps *snapshot.Tree, db Database) { + require.NoError(db.TrieDB().Commit(root, false)) + + for i := lastCommit + 1; i <= block; i++ { + require.NoError(snaps.Flatten(fakeHash(i))) + } + lastCommit = block + + // update the tracking keys + err = levelDB.Put(rootKey, root.Bytes()) + require.NoError(err) + err = database.PutUInt64(levelDB, blockKey, block) + require.NoError(err) + err = database.PutUInt64(levelDB, countKey, count) + require.NoError(err) + } + + tdbConfig := &triedb.Config{ + HashDB: &hashdb.Config{ + CleanCacheSize: 3 * 1024 * 1024 * 1024, + }, + } + db := NewDatabaseWithConfig(levelDB, tdbConfig) + snaps := snapshot.NewTestTree(levelDB, fakeHash(block), root) + for count < uint64(wantKVs) { + previous := root + _ = addBlock(db, snaps, 100_000, 0) // Note this updates root and count + b.Logf("Root: %v, kvs: %d, block: %d", root, count, block) + + // Commit every 10 blocks or on the last iteration + if block%10 == 0 || count >= uint64(wantKVs) { + commit(levelDB, snaps, db) + b.Logf("Root: %v, kvs: %d, block: %d (committed)", root, count, block) + } + if previous != root { + require.NoError(db.TrieDB().Dereference(previous)) + } else { + panic("root did not change") + } + } + require.NoError(levelDB.Close()) + b.Logf("Starting benchmarks") + b.Logf("Root: %v, kvs: %d, block: %d", root, count, block) + for _, updates := range []int{100, 200, 500} { + for _, prefetchers := range []int{0, 1, 4, 16} { + b.Run(fmt.Sprintf("updates_%d_prefetchers_%d", updates, prefetchers), func(b *testing.B) { + startRoot, startBlock, startCount := root, block, count + defer func() { root, block, count = startRoot, startBlock, startCount }() + + levelDB, err := rawdb.NewLevelDBDatabase(path.Join(dir, "level.db"), 0, 0, "", false) + require.NoError(err) + snaps := snapshot.NewTestTree(levelDB, fakeHash(block), root) + db := NewDatabaseWithConfig(levelDB, tdbConfig) + if prefetchers > 0 { + db = WithPrefetcher(db, prefetchers) + } + getMetric := func(metric string) int64 { + meter := metrics.GetOrRegisterMeter(triePrefetchMetricsPrefix+namespace+"/storage/"+metric, nil) + return meter.Snapshot().Count() + } + startLoads := getMetric("load") + for i := 0; i < b.N; i++ { + _ = addBlock(db, snaps, updates, prefetchers) + } + require.NoError(levelDB.Close()) + b.ReportMetric(float64(getMetric("load")-startLoads)/float64(b.N), "loads") + }) + } + } +} + +func fakeHash(block uint64) common.Hash { + return common.BytesToHash(binary.BigEndian.AppendUint64(nil, block)) +} + +// addKVs adds count random key-value pairs to the state trie of address1 and +// address2 (each count/2) and returns the new state db and root. +func addKVs( + db Database, snaps *snapshot.Tree, + address1, address2 common.Address, root common.Hash, block uint64, + count int, prefetchers int, +) (*StateDB, common.Hash, error) { + snap := snaps.Snapshot(root) + if snap == nil { + return nil, common.Hash{}, fmt.Errorf("snapshot not found") + } + statedb, err := NewWithSnapshot(root, db, snap) + if err != nil { + return nil, common.Hash{}, err + } + if prefetchers > 0 { + statedb.StartPrefetcher(namespace) + defer statedb.StopPrefetcher() + } + statedb.SetNonce(address1, 1) + for i := 0; i < count/2; i++ { + key := make([]byte, 32) + value := make([]byte, 32) + rand.Read(key) + rand.Read(value) + + statedb.SetState(address1, common.BytesToHash(key), common.BytesToHash(value)) + } + statedb.SetNonce(address2, 1) + for i := 0; i < count/2; i++ { + key := make([]byte, 32) + value := make([]byte, 32) + rand.Read(key) + rand.Read(value) + + statedb.SetState(address2, common.BytesToHash(key), common.BytesToHash(value)) + } + root, err = statedb.CommitWithSnap(block+1, true, snaps, fakeHash(block+1), fakeHash(block)) + if err != nil { + return nil, common.Hash{}, err + } + return statedb, root, err +} diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index b8edcbb6a8..7348dba0eb 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -59,7 +59,8 @@ func filledStateDB() *StateDB { func TestCopyAndClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency) + prefetchDb := newPrefetcherDatabase(db.db, maxConcurrency) + prefetcher := newTriePrefetcher(prefetchDb, db.originalRoot, "") skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) @@ -84,7 +85,8 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency) + prefetchDb := newPrefetcherDatabase(db.db, maxConcurrency) + prefetcher := newTriePrefetcher(prefetchDb, db.originalRoot, "") skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) a := prefetcher.trie(common.Hash{}, db.originalRoot) @@ -100,7 +102,8 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency) + prefetchDb := newPrefetcherDatabase(db.db, maxConcurrency) + prefetcher := newTriePrefetcher(prefetchDb, db.originalRoot, "") skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) cpy := prefetcher.copy() diff --git a/miner/worker.go b/miner/worker.go index 4fd7919553..4fa159a175 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -284,7 +284,7 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre if err != nil { return nil, err } - state.StartPrefetcher("miner", w.eth.BlockChain().CacheConfig().TriePrefetcherParallelism) + state.StartPrefetcher("miner") return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), state: state, From b8ff8dcf14ac6dbd191a801ab91458dbf09f5fbf Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Thu, 21 Nov 2024 16:24:26 -0800 Subject: [PATCH 02/18] migrated as is --- core/blockchain.go | 9 +- core/state/prefetcher_database.go | 208 ----------------------- core/state/statedb.go | 16 +- core/state/trie_prefetcher.go | 95 +++++------ core/state/trie_prefetcher.libevm.go | 108 ++++++++++++ core/state/trie_prefetcher_extra_test.go | 5 +- core/state/trie_prefetcher_test.go | 9 +- libevm/options/options.go | 42 +++++ miner/worker.go | 6 +- 9 files changed, 221 insertions(+), 277 deletions(-) delete mode 100644 core/state/prefetcher_database.go create mode 100644 core/state/trie_prefetcher.libevm.go create mode 100644 libevm/options/options.go diff --git a/core/blockchain.go b/core/blockchain.go index 689d07e6a2..96fb597fc8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1377,7 +1377,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain") + statedb.StartPrefetcher("chain", state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism)) activeState = statedb // Process block using the parent state as reference point @@ -1736,7 +1736,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain") + statedb.StartPrefetcher("chain", state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism)) defer func() { statedb.StopPrefetcher() }() @@ -2134,10 +2134,7 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error { bc.hc.SetCurrentHeader(block.Header()) lastAcceptedHash := block.Hash() - bc.stateCache = state.WithPrefetcher( - state.NewDatabaseWithNodeDB(bc.db, bc.triedb), - bc.cacheConfig.TriePrefetcherParallelism, - ) + bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb) if err := bc.loadLastState(lastAcceptedHash); err != nil { return err diff --git a/core/state/prefetcher_database.go b/core/state/prefetcher_database.go deleted file mode 100644 index 438cb1ebab..0000000000 --- a/core/state/prefetcher_database.go +++ /dev/null @@ -1,208 +0,0 @@ -// (c) 2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package state - -import ( - "sync" - - "github.com/ava-labs/subnet-evm/utils" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" -) - -// PrefetcherDB is an interface that extends Database with additional methods -// used in trie_prefetcher. This includes specific methods for prefetching -// accounts and storage slots, (which may be non-blocking and/or parallelized) -// and methods to wait for pending prefetches. -type PrefetcherDB interface { - // From Database - OpenTrie(root common.Hash) (Trie, error) - OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, trie Trie) (Trie, error) - CopyTrie(t Trie) Trie - - // Additional methods - PrefetchAccount(t Trie, address common.Address) - PrefetchStorage(t Trie, address common.Address, key []byte) - CanPrefetchDuringShutdown() bool - WaitTrie(t Trie) - Close() -} - -// withPrefetcher is an optional interface that a Database can implement to -// signal PrefetcherDB() should be called to get a Database for use in -// trie_prefetcher. Each call to PrefetcherDB() should return a new -// PrefetcherDB instance. -type withPrefetcherDB interface { - PrefetcherDB() PrefetcherDB -} - -type withPrefetcher struct { - Database - maxConcurrency int -} - -func (db *withPrefetcher) PrefetcherDB() PrefetcherDB { - return newPrefetcherDatabase(db.Database, db.maxConcurrency) -} - -func WithPrefetcher(db Database, maxConcurrency int) Database { - return &withPrefetcher{db, maxConcurrency} -} - -// withPrefetcherDefaults extends Database and implements PrefetcherDB by adding -// default implementations for PrefetchAccount and PrefetchStorage that read the -// account and storage slot from the trie. -type withPrefetcherDefaults struct { - Database -} - -func (withPrefetcherDefaults) PrefetchAccount(t Trie, address common.Address) { - _, _ = t.GetAccount(address) -} - -func (withPrefetcherDefaults) PrefetchStorage(t Trie, address common.Address, key []byte) { - _, _ = t.GetStorage(address, key) -} - -func (withPrefetcherDefaults) CanPrefetchDuringShutdown() bool { return false } -func (withPrefetcherDefaults) WaitTrie(Trie) {} -func (withPrefetcherDefaults) Close() {} - -type prefetcherDatabase struct { - Database - - maxConcurrency int - workers *utils.BoundedWorkers -} - -func newPrefetcherDatabase(db Database, maxConcurrency int) *prefetcherDatabase { - return &prefetcherDatabase{ - Database: db, - maxConcurrency: maxConcurrency, - workers: utils.NewBoundedWorkers(maxConcurrency), - } -} - -func (p *prefetcherDatabase) OpenTrie(root common.Hash) (Trie, error) { - trie, err := p.Database.OpenTrie(root) - return newPrefetcherTrie(p, trie), err -} - -func (p *prefetcherDatabase) OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, trie Trie) (Trie, error) { - storageTrie, err := p.Database.OpenStorageTrie(stateRoot, address, root, trie) - return newPrefetcherTrie(p, storageTrie), err -} - -func (p *prefetcherDatabase) CopyTrie(t Trie) Trie { - switch t := t.(type) { - case *prefetcherTrie: - return t.getCopy() - default: - return p.Database.CopyTrie(t) - } -} - -// PrefetchAccount should only be called on a trie returned from OpenTrie or OpenStorageTrie -func (*prefetcherDatabase) PrefetchAccount(t Trie, address common.Address) { - t.(*prefetcherTrie).PrefetchAccount(address) -} - -// PrefetchStorage should only be called on a trie returned from OpenTrie or OpenStorageTrie -func (*prefetcherDatabase) PrefetchStorage(t Trie, address common.Address, key []byte) { - t.(*prefetcherTrie).PrefetchStorage(address, key) -} - -// WaitTrie should only be called on a trie returned from OpenTrie or OpenStorageTrie -func (*prefetcherDatabase) WaitTrie(t Trie) { - t.(*prefetcherTrie).Wait() -} - -func (p *prefetcherDatabase) Close() { - p.workers.Wait() -} - -func (p *prefetcherDatabase) CanPrefetchDuringShutdown() bool { - return true -} - -type prefetcherTrie struct { - p *prefetcherDatabase - - Trie - copyLock sync.Mutex - - copies chan Trie - wg sync.WaitGroup -} - -// newPrefetcherTrie returns a new prefetcherTrie that wraps the given trie. -// prefetcherTrie prefetches accounts and storage slots in parallel, using -// bounded workers from the prefetcherDatabase. As Trie is not safe for -// concurrent access, each prefetch operation uses a copy. The copy is kept in -// a buffered channel for reuse. -func newPrefetcherTrie(p *prefetcherDatabase, t Trie) *prefetcherTrie { - prefetcher := &prefetcherTrie{ - p: p, - Trie: t, - copies: make(chan Trie, p.maxConcurrency), - } - prefetcher.copies <- prefetcher.getCopy() - return prefetcher -} - -func (p *prefetcherTrie) Wait() { - p.wg.Wait() -} - -// getCopy returns a copy of the trie. The copy is taken from the copies channel -// if available, otherwise a new copy is created. -func (p *prefetcherTrie) getCopy() Trie { - select { - case copy := <-p.copies: - return copy - default: - p.copyLock.Lock() - defer p.copyLock.Unlock() - return p.p.Database.CopyTrie(p.Trie) - } -} - -// putCopy keeps the copy for future use. If the buffer is full, the copy is -// discarded. -func (p *prefetcherTrie) putCopy(copy Trie) { - select { - case p.copies <- copy: - default: - } -} - -func (p *prefetcherTrie) PrefetchAccount(address common.Address) { - p.wg.Add(1) - f := func() { - defer p.wg.Done() - - tr := p.getCopy() - _, err := tr.GetAccount(address) - if err != nil { - log.Error("GetAccount failed in prefetcher", "err", err) - } - p.putCopy(tr) - } - p.p.workers.Execute(f) -} - -func (p *prefetcherTrie) PrefetchStorage(address common.Address, key []byte) { - p.wg.Add(1) - f := func() { - defer p.wg.Done() - - tr := p.getCopy() - _, err := tr.GetStorage(address, key) - if err != nil { - log.Error("GetStorage failed in prefetcher", "err", err) - } - p.putCopy(tr) - } - p.p.workers.Execute(f) -} diff --git a/core/state/statedb.go b/core/state/statedb.go index 7a4c6faf7e..7252eda741 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -41,6 +41,7 @@ import ( "github.com/ava-labs/subnet-evm/trie" "github.com/ava-labs/subnet-evm/trie/trienode" "github.com/ava-labs/subnet-evm/trie/triestate" + "github.com/ava-labs/subnet-evm/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" @@ -200,16 +201,27 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St return sdb, nil } +type workerPool struct{ *utils.BoundedWorkers } + +func (wp workerPool) Wait() { + _ = wp.BoundedWorkers.Wait() +} + +func WorkerOpt(prefetchers int) PrefetcherOption { + pool := workerPool{utils.NewBoundedWorkers(prefetchers)} + return WithWorkerPools(func() WorkerPool { return pool }) +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. -func (s *StateDB) StartPrefetcher(namespace string) { +func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) { if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil } if s.snap != nil { - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...) } } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index ae7cf457cc..b09adb5223 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -1,13 +1,3 @@ -// (c) 2019-2020, Ava Labs, Inc. -// -// This file is a derived work, based on the go-ethereum library whose original -// notices appear below. -// -// It is distributed under a license compatible with the licensing terms of the -// original code from which it is derived. -// -// Much love to the original authors for their work. -// ********** // Copyright 2020 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -29,6 +19,7 @@ package state import ( "sync" + "github.com/ava-labs/subnet-evm/libevm/options" "github.com/ava-labs/subnet-evm/metrics" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -45,7 +36,7 @@ var ( // // Note, the prefetcher's API is not thread safe. type triePrefetcher struct { - db PrefetcherDB // Database to fetch trie nodes through + db Database // Database to fetch trie nodes through root common.Hash // Root hash of the account trie for metrics fetches map[string]Trie // Partially or fully fetched tries. Only populated for inactive copies. fetchers map[string]*subfetcher // Subfetchers for each trie @@ -59,17 +50,14 @@ type triePrefetcher struct { storageDupMeter metrics.Meter storageSkipMeter metrics.Meter storageWasteMeter metrics.Meter + + options []PrefetcherOption } -func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { - var pdb PrefetcherDB - pdb = withPrefetcherDefaults{db} - if db, ok := db.(withPrefetcherDB); ok { - pdb = db.PrefetcherDB() - } +func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...PrefetcherOption) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: pdb, + db: db, root: root, fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map @@ -82,6 +70,8 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + + options: opts, } return p } @@ -89,7 +79,6 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { - defer p.db.Close() for _, fetcher := range p.fetchers { fetcher.abort() // safe to do multiple times @@ -138,6 +127,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { storageDupMeter: p.storageDupMeter, storageSkipMeter: p.storageSkipMeter, storageWasteMeter: p.storageWasteMeter, + + options: p.options, } // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { @@ -166,7 +157,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { - fetcher = newSubfetcher(p.db, p.root, owner, root, addr) + fetcher = newSubfetcher(p.db, p.root, owner, root, addr, p.options...) p.fetchers[id] = fetcher } fetcher.schedule(keys) @@ -224,7 +215,7 @@ func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string { // main prefetcher is paused and either all requested items are processed or if // the trie being worked on is retrieved from the prefetcher. type subfetcher struct { - db PrefetcherDB // Database to load trie nodes through + db Database // Database to load trie nodes through state common.Hash // Root hash of the state to prefetch owner common.Hash // Owner of the trie, usually account hash root common.Hash // Root hash of the trie to prefetch @@ -242,11 +233,13 @@ type subfetcher struct { seen map[string]struct{} // Tracks the entries already loaded dups int // Number of duplicate preload tasks used [][]byte // Tracks the entries used in the end + + pool *subfetcherPool } // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. -func newSubfetcher(db PrefetcherDB, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { +func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, opts ...PrefetcherOption) *subfetcher { sf := &subfetcher{ db: db, state: state, @@ -259,6 +252,7 @@ func newSubfetcher(db PrefetcherDB, state common.Hash, owner common.Hash, root c copy: make(chan chan Trie), seen: make(map[string]struct{}), } + options.As[prefetcherConfig](opts...).applyTo(sf) go sf.loop() return sf } @@ -309,6 +303,7 @@ func (sf *subfetcher) abort() { // loop waits for new tasks to be scheduled and keeps loading them until it runs // out of tasks or its underlying trie is retrieved for committing. func (sf *subfetcher) loop() { + defer sf.wait() // No matter how the loop stops, signal anyone waiting that it's terminated defer close(sf.term) @@ -330,29 +325,6 @@ func (sf *subfetcher) loop() { } sf.trie = trie } - handleTask := func(task []byte) { - if _, ok := sf.seen[string(task)]; ok { - sf.dups++ - } else { - if len(task) == common.AddressLength { - sf.db.PrefetchAccount(sf.trie, common.BytesToAddress(task)) - } else { - sf.db.PrefetchStorage(sf.trie, sf.addr, task) - } - sf.seen[string(task)] = struct{}{} - } - } - defer func() { - if sf.db.CanPrefetchDuringShutdown() { - for _, task := range sf.tasks { - handleTask(task) - } - sf.tasks = nil - } - - sf.db.WaitTrie(sf.trie) - }() - // Trie opened successfully, keep prefetching items for { select { @@ -379,7 +351,16 @@ func (sf *subfetcher) loop() { default: // No termination request yet, prefetch the next entry - handleTask(task) + if _, ok := sf.seen[string(task)]; ok { + sf.dups++ + } else { + if len(task) == common.AddressLength { + sf.GetAccount(common.BytesToAddress(task)) + } else { + sf.GetStorage(sf.addr, task) + } + sf.seen[string(task)] = struct{}{} + } } } @@ -388,8 +369,26 @@ func (sf *subfetcher) loop() { ch <- sf.db.CopyTrie(sf.trie) case <-sf.stop: - // Termination is requested, abort and leave remaining tasks - return + //libevm:start + // + // This is copied, with alteration, from ethereum/go-ethereum#29519 + // and can be deleted once we update to include that change. + + // Termination is requested, abort if no more tasks are pending. If + // there are some, exhaust them first. + sf.lock.Lock() + done := len(sf.tasks) == 0 + sf.lock.Unlock() + + if done { + return + } + + select { + case sf.wake <- struct{}{}: + default: + } + //libevm:end } } } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go new file mode 100644 index 0000000000..5c680fb5fa --- /dev/null +++ b/core/state/trie_prefetcher.libevm.go @@ -0,0 +1,108 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package state + +import ( + "sync" + + "github.com/ava-labs/subnet-evm/libevm/options" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +// A PrefetcherOption configures behaviour of trie prefetching. +type PrefetcherOption = options.Option[prefetcherConfig] + +type prefetcherConfig struct { + newWorkers func() WorkerPool +} + +// A WorkerPool is responsible for executing functions, possibly asynchronously. +type WorkerPool interface { + Execute(func()) + Wait() +} + +// WithWorkerPools configures trie prefetching to execute asynchronously. The +// provided constructor is called once for each trie being fetched and it MAY +// return the same pool. +func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { + return options.Func[prefetcherConfig](func(c *prefetcherConfig) { + c.newWorkers = ctor + }) +} + +type subfetcherPool struct { + workers WorkerPool + tries sync.Pool +} + +// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided +// with a [PrefetcherOption]. +func (c *prefetcherConfig) applyTo(sf *subfetcher) { + sf.pool = &subfetcherPool{ + tries: sync.Pool{ + // Although the workers may be shared between all subfetchers, each + // MUST have its own Trie pool. + New: func() any { + return sf.db.CopyTrie(sf.trie) + }, + }, + } + if c.newWorkers != nil { + sf.pool.workers = c.newWorkers() + } +} + +func (sf *subfetcher) wait() { + if w := sf.pool.workers; w != nil { + w.Wait() + } +} + +// execute runs the provided function with a copy of the subfetcher's Trie. +// Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was +// configured with a [WorkerPool] then it is used for function execution, +// otherwise `fn` is just called directly. +func (sf *subfetcher) execute(fn func(Trie)) { + trie := sf.pool.tries.Get().(Trie) + if w := sf.pool.workers; w != nil { + w.Execute(func() { fn(trie) }) + } else { + fn(trie) + } + sf.pool.tries.Put(trie) +} + +// GetAccount optimistically pre-fetches an account, dropping the returned value +// and logging errors. See [subfetcher.execute] re worker pools. +func (sf *subfetcher) GetAccount(addr common.Address) { + sf.execute(func(t Trie) { + if _, err := t.GetAccount(addr); err != nil { + log.Error("account prefetching failed", "address", addr, "err", err) + } + }) +} + +// GetStorage is the storage equivalent of [subfetcher.GetAccount]. +func (sf *subfetcher) GetStorage(addr common.Address, key []byte) { + sf.execute(func(t Trie) { + if _, err := t.GetStorage(addr, key); err != nil { + log.Error("storage prefetching failed", "address", addr, "key", key, "err", err) + } + }) +} diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index e6b8d730e1..a9ee06c24c 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -132,9 +132,6 @@ func BenchmarkPrefetcherDatabase(b *testing.B) { require.NoError(err) snaps := snapshot.NewTestTree(levelDB, fakeHash(block), root) db := NewDatabaseWithConfig(levelDB, tdbConfig) - if prefetchers > 0 { - db = WithPrefetcher(db, prefetchers) - } getMetric := func(metric string) int64 { meter := metrics.GetOrRegisterMeter(triePrefetchMetricsPrefix+namespace+"/storage/"+metric, nil) return meter.Snapshot().Count() @@ -170,7 +167,7 @@ func addKVs( return nil, common.Hash{}, err } if prefetchers > 0 { - statedb.StartPrefetcher(namespace) + statedb.StartPrefetcher(namespace, WorkerOpt(prefetchers)) defer statedb.StopPrefetcher() } statedb.SetNonce(address1, 1) diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index 7348dba0eb..12c912d012 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -59,8 +59,7 @@ func filledStateDB() *StateDB { func TestCopyAndClose(t *testing.T) { db := filledStateDB() - prefetchDb := newPrefetcherDatabase(db.db, maxConcurrency) - prefetcher := newTriePrefetcher(prefetchDb, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WorkerOpt(maxConcurrency)) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) @@ -85,8 +84,7 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - prefetchDb := newPrefetcherDatabase(db.db, maxConcurrency) - prefetcher := newTriePrefetcher(prefetchDb, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) a := prefetcher.trie(common.Hash{}, db.originalRoot) @@ -102,8 +100,7 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - prefetchDb := newPrefetcherDatabase(db.db, maxConcurrency) - prefetcher := newTriePrefetcher(prefetchDb, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) cpy := prefetcher.copy() diff --git a/libevm/options/options.go b/libevm/options/options.go new file mode 100644 index 0000000000..af7bc751a9 --- /dev/null +++ b/libevm/options/options.go @@ -0,0 +1,42 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +// Package options provides a generic mechanism for defining configuration of +// arbitrary types. +package options + +// An Option configures values of arbitrary type. +type Option[T any] interface { + Configure(*T) +} + +// As applies Options to a zero-value T, which it then returns. +func As[T any](opts ...Option[T]) *T { + var t T + for _, o := range opts { + o.Configure(&t) + } + return &t +} + +// A Func converts a function into an [Option], using itself as the Configure +// method. +type Func[T any] func(*T) + +var _ Option[struct{}] = Func[struct{}](nil) + +// Configure implements the [Option] interface. +func (f Func[T]) Configure(t *T) { f(t) } diff --git a/miner/worker.go b/miner/worker.go index 4fa159a175..e7eb69f355 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -280,14 +280,14 @@ func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateConte } func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.PredicateContext, parent *types.Header, header *types.Header, tstart time.Time) (*environment, error) { - state, err := w.chain.StateAt(parent.Root) + currentState, err := w.chain.StateAt(parent.Root) if err != nil { return nil, err } - state.StartPrefetcher("miner") + currentState.StartPrefetcher("miner", state.WorkerOpt(w.chain.CacheConfig().TriePrefetcherParallelism)) return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), - state: state, + state: currentState, parent: parent, header: header, tcount: 0, From 3405b88b32430948642bba1af57677a9068312ff Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Thu, 21 Nov 2024 17:58:36 -0800 Subject: [PATCH 03/18] fixes --- core/blockchain.go | 26 ++++++++------------- core/state/statedb.go | 29 +++++++++++++++++++----- core/state/trie_prefetcher.libevm.go | 10 +++++--- core/state/trie_prefetcher_extra_test.go | 5 +++- core/state/trie_prefetcher_test.go | 4 +++- miner/worker.go | 10 ++++++-- 6 files changed, 55 insertions(+), 29 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 96fb597fc8..197b9de495 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1349,16 +1349,6 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockContentValidationTimer.Inc(time.Since(substart).Milliseconds()) // No validation errors for the block - var activeState *state.StateDB - defer func() { - // The chain importer is starting and stopping trie prefetchers. If a bad - // block or other error is hit however, an early return may not properly - // terminate the background threads. This defer ensures that we clean up - // and dangling prefetcher, without deferring each and holding on live refs. - if activeState != nil { - activeState.StopPrefetcher() - } - }() // Retrieve the parent block to determine which root to build state on substart = time.Now() @@ -1377,8 +1367,11 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain", state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism)) - activeState = statedb + opt, cleanup := state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism) + defer cleanup() + + statedb.StartPrefetcher("chain", opt) + defer statedb.StopPrefetcher() // Process block using the parent state as reference point pstart := time.Now() @@ -1736,10 +1729,11 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain", state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism)) - defer func() { - statedb.StopPrefetcher() - }() + opt, cleanup := state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism) + defer cleanup() + + statedb.StartPrefetcher("chain", opt) + defer statedb.StopPrefetcher() // Process previously stored block receipts, _, usedGas, err := bc.processor.Process(current, parent.Header(), statedb, vm.Config{}) diff --git a/core/state/statedb.go b/core/state/statedb.go index 7252eda741..c69a94eefa 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -30,6 +30,7 @@ package state import ( "fmt" "sort" + "sync" "time" "github.com/ava-labs/subnet-evm/core/rawdb" @@ -201,15 +202,31 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St return sdb, nil } -type workerPool struct{ *utils.BoundedWorkers } +type workerPool struct { + *utils.BoundedWorkers + wg sync.WaitGroup +} -func (wp workerPool) Wait() { - _ = wp.BoundedWorkers.Wait() +func (wp *workerPool) Wait() { + wp.wg.Wait() } -func WorkerOpt(prefetchers int) PrefetcherOption { - pool := workerPool{utils.NewBoundedWorkers(prefetchers)} - return WithWorkerPools(func() WorkerPool { return pool }) +func (wp *workerPool) Execute(f func()) { + wp.wg.Add(1) + wp.BoundedWorkers.Execute(func() { + f() + wp.wg.Done() + }) +} + +func WorkerOpt(prefetchers int) (PrefetcherOption, func()) { + pool := utils.NewBoundedWorkers(prefetchers) + cleanup := func() { _ = pool.Wait() } + return WithWorkerPools(func() WorkerPool { + return &workerPool{ + BoundedWorkers: pool, + } + }), cleanup } // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 5c680fb5fa..912374fc6e 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -79,13 +79,17 @@ func (sf *subfetcher) wait() { // configured with a [WorkerPool] then it is used for function execution, // otherwise `fn` is just called directly. func (sf *subfetcher) execute(fn func(Trie)) { - trie := sf.pool.tries.Get().(Trie) if w := sf.pool.workers; w != nil { - w.Execute(func() { fn(trie) }) + w.Execute(func() { + trie := sf.pool.tries.Get().(Trie) + fn(trie) + sf.pool.tries.Put(trie) + }) } else { + trie := sf.pool.tries.Get().(Trie) fn(trie) + sf.pool.tries.Put(trie) } - sf.pool.tries.Put(trie) } // GetAccount optimistically pre-fetches an account, dropping the returned value diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index a9ee06c24c..be6d14b42b 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -167,7 +167,10 @@ func addKVs( return nil, common.Hash{}, err } if prefetchers > 0 { - statedb.StartPrefetcher(namespace, WorkerOpt(prefetchers)) + opt, cleanup := WorkerOpt(prefetchers) + defer cleanup() + + statedb.StartPrefetcher(namespace, opt) defer statedb.StopPrefetcher() } statedb.SetNonce(address1, 1) diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index 12c912d012..bcdc645ca1 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -59,7 +59,9 @@ func filledStateDB() *StateDB { func TestCopyAndClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WorkerOpt(maxConcurrency)) + opt, cleanup := WorkerOpt(maxConcurrency) + defer cleanup() + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) diff --git a/miner/worker.go b/miner/worker.go index e7eb69f355..152c95ae7e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -82,7 +82,8 @@ type environment struct { // way that the gas pool and state is reset. predicateResults *predicate.Results - start time.Time // Time that block building began + start time.Time // Time that block building began + cleanup func() // Cleanup function to be called when the environment is no longer needed } // worker is the main object which takes care of submitting new work to consensus engine @@ -225,6 +226,9 @@ func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateConte return } env.state.StopPrefetcher() + if env.cleanup != nil { + env.cleanup() + } }() // Configure any upgrades that should go into effect during this block. err = core.ApplyUpgrades(w.chainConfig, &parent.Time, types.NewBlockWithHeader(header), env.state) @@ -284,7 +288,8 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre if err != nil { return nil, err } - currentState.StartPrefetcher("miner", state.WorkerOpt(w.chain.CacheConfig().TriePrefetcherParallelism)) + opt, cleanup := state.WorkerOpt(w.chain.CacheConfig().TriePrefetcherParallelism) + currentState.StartPrefetcher("miner", opt) return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), state: currentState, @@ -296,6 +301,7 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre predicateContext: predicateContext, predicateResults: predicate.NewResults(), start: tstart, + cleanup: cleanup, }, nil } From a5160bde02277d18ed33b3d1a975594a0153d726 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Thu, 21 Nov 2024 18:00:45 -0800 Subject: [PATCH 04/18] fix uts --- core/state/trie_prefetcher_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index bcdc645ca1..4a48e3c58f 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -86,7 +86,9 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + opt, cleanup := WorkerOpt(maxConcurrency) + defer cleanup() + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) a := prefetcher.trie(common.Hash{}, db.originalRoot) @@ -102,7 +104,9 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + opt, cleanup := WorkerOpt(maxConcurrency) + defer cleanup() + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) cpy := prefetcher.copy() From 42db569fb39a6fe788c6280e3e60b1313ca4a601 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Thu, 21 Nov 2024 18:14:16 -0800 Subject: [PATCH 05/18] rename --- core/blockchain.go | 4 ++-- core/state/statedb.go | 2 +- core/state/trie_prefetcher_extra_test.go | 2 +- core/state/trie_prefetcher_test.go | 6 +++--- miner/worker.go | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 197b9de495..95b6f6f5df 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1367,7 +1367,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - opt, cleanup := state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism) + opt, cleanup := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) defer cleanup() statedb.StartPrefetcher("chain", opt) @@ -1729,7 +1729,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - opt, cleanup := state.WorkerOpt(bc.cacheConfig.TriePrefetcherParallelism) + opt, cleanup := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) defer cleanup() statedb.StartPrefetcher("chain", opt) diff --git a/core/state/statedb.go b/core/state/statedb.go index c69a94eefa..640a8a3228 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -219,7 +219,7 @@ func (wp *workerPool) Execute(f func()) { }) } -func WorkerOpt(prefetchers int) (PrefetcherOption, func()) { +func WithConcurrentWorkers(prefetchers int) (PrefetcherOption, func()) { pool := utils.NewBoundedWorkers(prefetchers) cleanup := func() { _ = pool.Wait() } return WithWorkerPools(func() WorkerPool { diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index be6d14b42b..e7560b12a2 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -167,7 +167,7 @@ func addKVs( return nil, common.Hash{}, err } if prefetchers > 0 { - opt, cleanup := WorkerOpt(prefetchers) + opt, cleanup := WithConcurrentWorkers(prefetchers) defer cleanup() statedb.StartPrefetcher(namespace, opt) diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index 4a48e3c58f..a636336ae1 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -59,7 +59,7 @@ func filledStateDB() *StateDB { func TestCopyAndClose(t *testing.T) { db := filledStateDB() - opt, cleanup := WorkerOpt(maxConcurrency) + opt, cleanup := WithConcurrentWorkers(maxConcurrency) defer cleanup() prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") @@ -86,7 +86,7 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - opt, cleanup := WorkerOpt(maxConcurrency) + opt, cleanup := WithConcurrentWorkers(maxConcurrency) defer cleanup() prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") @@ -104,7 +104,7 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - opt, cleanup := WorkerOpt(maxConcurrency) + opt, cleanup := WithConcurrentWorkers(maxConcurrency) defer cleanup() prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") diff --git a/miner/worker.go b/miner/worker.go index 152c95ae7e..2cc36ce126 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -288,7 +288,7 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre if err != nil { return nil, err } - opt, cleanup := state.WorkerOpt(w.chain.CacheConfig().TriePrefetcherParallelism) + opt, cleanup := state.WithConcurrentWorkers(w.chain.CacheConfig().TriePrefetcherParallelism) currentState.StartPrefetcher("miner", opt) return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), From 799a01c952f831ad8a1cc16252ac96a699730ce4 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Thu, 21 Nov 2024 18:19:36 -0800 Subject: [PATCH 06/18] patch waiting --- core/state/trie_prefetcher.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index b09adb5223..4a714cdbe9 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -336,15 +336,8 @@ func (sf *subfetcher) loop() { sf.lock.Unlock() // Prefetch any tasks until the loop is interrupted - for i, task := range tasks { + for _, task := range tasks { select { - case <-sf.stop: - // If termination is requested, add any leftover back and return - sf.lock.Lock() - sf.tasks = append(sf.tasks, tasks[i:]...) - sf.lock.Unlock() - return - case ch := <-sf.copy: // Somebody wants a copy of the current trie, grant them ch <- sf.db.CopyTrie(sf.trie) From 958808fe4daa962d90e23b6e875e991d87e1a04d Mon Sep 17 00:00:00 2001 From: Arran Schlosberg <519948+ARR4N@users.noreply.github.com> Date: Mon, 25 Nov 2024 15:51:05 +0000 Subject: [PATCH 07/18] refactor: block on `StopPrefetchers()` instead of `cleanup` (#1396) --- core/blockchain.go | 8 +-- core/state/statedb.go | 29 +++------- core/state/trie_prefetcher.go | 11 ++-- core/state/trie_prefetcher.libevm.go | 70 ++++++++++++++---------- core/state/trie_prefetcher_extra_test.go | 4 +- core/state/trie_prefetcher_test.go | 9 +-- libevm/sync/sync.go | 52 ++++++++++++++++++ miner/worker.go | 9 +-- 8 files changed, 118 insertions(+), 74 deletions(-) create mode 100644 libevm/sync/sync.go diff --git a/core/blockchain.go b/core/blockchain.go index 95b6f6f5df..99d50f5226 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1367,9 +1367,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - opt, cleanup := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) - defer cleanup() - + opt := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) statedb.StartPrefetcher("chain", opt) defer statedb.StopPrefetcher() @@ -1729,9 +1727,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - opt, cleanup := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) - defer cleanup() - + opt := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) statedb.StartPrefetcher("chain", opt) defer statedb.StopPrefetcher() diff --git a/core/state/statedb.go b/core/state/statedb.go index 640a8a3228..b4c2da9566 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -30,7 +30,6 @@ package state import ( "fmt" "sort" - "sync" "time" "github.com/ava-labs/subnet-evm/core/rawdb" @@ -204,29 +203,19 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St type workerPool struct { *utils.BoundedWorkers - wg sync.WaitGroup } -func (wp *workerPool) Wait() { - wp.wg.Wait() +func (wp *workerPool) Done() { + // Done is guaranteed to only be called after all work is already complete, + // so Wait()ing is redundant, but it also releases resources. + wp.BoundedWorkers.Wait() } -func (wp *workerPool) Execute(f func()) { - wp.wg.Add(1) - wp.BoundedWorkers.Execute(func() { - f() - wp.wg.Done() - }) -} - -func WithConcurrentWorkers(prefetchers int) (PrefetcherOption, func()) { - pool := utils.NewBoundedWorkers(prefetchers) - cleanup := func() { _ = pool.Wait() } - return WithWorkerPools(func() WorkerPool { - return &workerPool{ - BoundedWorkers: pool, - } - }), cleanup +func WithConcurrentWorkers(prefetchers int) PrefetcherOption { + pool := &workerPool{ + BoundedWorkers: utils.NewBoundedWorkers(prefetchers), + } + return WithWorkerPools(func() WorkerPool { return pool }) } // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 4a714cdbe9..8465deae2a 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -104,6 +104,7 @@ func (p *triePrefetcher) close() { } } } + p.releaseWorkerPools() // Clear out all fetchers (will crash on a second call, deliberate) p.fetchers = nil } @@ -303,9 +304,11 @@ func (sf *subfetcher) abort() { // loop waits for new tasks to be scheduled and keeps loading them until it runs // out of tasks or its underlying trie is retrieved for committing. func (sf *subfetcher) loop() { - defer sf.wait() // No matter how the loop stops, signal anyone waiting that it's terminated - defer close(sf.term) + defer func() { + sf.pool.wait() + close(sf.term) + }() // Start by opening the trie and stop processing if it fails if sf.owner == (common.Hash{}) { @@ -348,9 +351,9 @@ func (sf *subfetcher) loop() { sf.dups++ } else { if len(task) == common.AddressLength { - sf.GetAccount(common.BytesToAddress(task)) + sf.pool.GetAccount(common.BytesToAddress(task)) } else { - sf.GetStorage(sf.addr, task) + sf.pool.GetStorage(sf.addr, task) } sf.seen[string(task)] = struct{}{} } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go index 912374fc6e..d59bd4eb54 100644 --- a/core/state/trie_prefetcher.libevm.go +++ b/core/state/trie_prefetcher.libevm.go @@ -17,9 +17,8 @@ package state import ( - "sync" - "github.com/ava-labs/subnet-evm/libevm/options" + "github.com/ava-labs/subnet-evm/libevm/sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) @@ -31,14 +30,16 @@ type prefetcherConfig struct { newWorkers func() WorkerPool } -// A WorkerPool is responsible for executing functions, possibly asynchronously. +// A WorkerPool executes functions asynchronously. Done() is called to signal +// that the pool is no longer needed and that Execute() is guaranteed to not be +// called again. type WorkerPool interface { Execute(func()) - Wait() + Done() } // WithWorkerPools configures trie prefetching to execute asynchronously. The -// provided constructor is called once for each trie being fetched and it MAY +// provided constructor is called once for each trie being fetched but it MAY // return the same pool. func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { return options.Func[prefetcherConfig](func(c *prefetcherConfig) { @@ -48,17 +49,18 @@ func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { type subfetcherPool struct { workers WorkerPool - tries sync.Pool + tries sync.Pool[Trie] + wg sync.WaitGroup } // applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided // with a [PrefetcherOption]. func (c *prefetcherConfig) applyTo(sf *subfetcher) { sf.pool = &subfetcherPool{ - tries: sync.Pool{ + tries: sync.Pool[Trie]{ // Although the workers may be shared between all subfetchers, each // MUST have its own Trie pool. - New: func() any { + New: func() Trie { return sf.db.CopyTrie(sf.trie) }, }, @@ -68,43 +70,55 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) { } } -func (sf *subfetcher) wait() { - if w := sf.pool.workers; w != nil { - w.Wait() +// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be +// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed +// to be shared between them. This is because we guarantee in the public API +// that no further calls will be made to Execute() after a call to Done(). +func (p *triePrefetcher) releaseWorkerPools() { + for _, f := range p.fetchers { + if w := f.pool.workers; w != nil { + w.Done() + } } } +func (p *subfetcherPool) wait() { + p.wg.Wait() +} + // execute runs the provided function with a copy of the subfetcher's Trie. -// Copies are stored in a [sync.Pool] to reduce creation overhead. If sf was +// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was // configured with a [WorkerPool] then it is used for function execution, // otherwise `fn` is just called directly. -func (sf *subfetcher) execute(fn func(Trie)) { - if w := sf.pool.workers; w != nil { - w.Execute(func() { - trie := sf.pool.tries.Get().(Trie) - fn(trie) - sf.pool.tries.Put(trie) - }) +func (p *subfetcherPool) execute(fn func(Trie)) { + p.wg.Add(1) + do := func() { + t := p.tries.Get() + fn(t) + p.tries.Put(t) + p.wg.Done() + } + + if w := p.workers; w != nil { + w.Execute(do) } else { - trie := sf.pool.tries.Get().(Trie) - fn(trie) - sf.pool.tries.Put(trie) + do() } } // GetAccount optimistically pre-fetches an account, dropping the returned value -// and logging errors. See [subfetcher.execute] re worker pools. -func (sf *subfetcher) GetAccount(addr common.Address) { - sf.execute(func(t Trie) { +// and logging errors. See [subfetcherPool.execute] re worker pools. +func (p *subfetcherPool) GetAccount(addr common.Address) { + p.execute(func(t Trie) { if _, err := t.GetAccount(addr); err != nil { log.Error("account prefetching failed", "address", addr, "err", err) } }) } -// GetStorage is the storage equivalent of [subfetcher.GetAccount]. -func (sf *subfetcher) GetStorage(addr common.Address, key []byte) { - sf.execute(func(t Trie) { +// GetStorage is the storage equivalent of [subfetcherPool.GetAccount]. +func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) { + p.execute(func(t Trie) { if _, err := t.GetStorage(addr, key); err != nil { log.Error("storage prefetching failed", "address", addr, "key", key, "err", err) } diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index e7560b12a2..4a7448b636 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -167,9 +167,7 @@ func addKVs( return nil, common.Hash{}, err } if prefetchers > 0 { - opt, cleanup := WithConcurrentWorkers(prefetchers) - defer cleanup() - + opt := WithConcurrentWorkers(prefetchers) statedb.StartPrefetcher(namespace, opt) defer statedb.StopPrefetcher() } diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index a636336ae1..9df6ad8820 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -59,8 +59,7 @@ func filledStateDB() *StateDB { func TestCopyAndClose(t *testing.T) { db := filledStateDB() - opt, cleanup := WithConcurrentWorkers(maxConcurrency) - defer cleanup() + opt := WithConcurrentWorkers(maxConcurrency) prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) @@ -86,8 +85,7 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - opt, cleanup := WithConcurrentWorkers(maxConcurrency) - defer cleanup() + opt := WithConcurrentWorkers(maxConcurrency) prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) @@ -104,8 +102,7 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - opt, cleanup := WithConcurrentWorkers(maxConcurrency) - defer cleanup() + opt := WithConcurrentWorkers(maxConcurrency) prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) diff --git a/libevm/sync/sync.go b/libevm/sync/sync.go new file mode 100644 index 0000000000..991a3a875e --- /dev/null +++ b/libevm/sync/sync.go @@ -0,0 +1,52 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +// Package sync extends the standard library's sync package. +package sync + +import "sync" + +// Aliases of stdlib sync's types to avoid having to import it alongside this +// package. +type ( + Cond = sync.Cond + Locker = sync.Locker + Map = sync.Map + Mutex = sync.Mutex + Once = sync.Once + RWMutex = sync.RWMutex + WaitGroup = sync.WaitGroup +) + +// A Pool is a type-safe wrapper around [sync.Pool]. +type Pool[T any] struct { + New func() T + pool sync.Pool + once Once +} + +// Get is equivalent to [sync.Pool.Get]. +func (p *Pool[T]) Get() T { + p.once.Do(func() { // Do() guarantees at least once, not just only once + p.pool.New = func() any { return p.New() } + }) + return p.pool.Get().(T) //nolint:forcetypeassert +} + +// Put is equivalent to [sync.Pool.Put]. +func (p *Pool[T]) Put(t T) { + p.pool.Put(t) +} diff --git a/miner/worker.go b/miner/worker.go index 2cc36ce126..7a508b2e57 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -82,8 +82,7 @@ type environment struct { // way that the gas pool and state is reset. predicateResults *predicate.Results - start time.Time // Time that block building began - cleanup func() // Cleanup function to be called when the environment is no longer needed + start time.Time // Time that block building began } // worker is the main object which takes care of submitting new work to consensus engine @@ -226,9 +225,6 @@ func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateConte return } env.state.StopPrefetcher() - if env.cleanup != nil { - env.cleanup() - } }() // Configure any upgrades that should go into effect during this block. err = core.ApplyUpgrades(w.chainConfig, &parent.Time, types.NewBlockWithHeader(header), env.state) @@ -288,7 +284,7 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre if err != nil { return nil, err } - opt, cleanup := state.WithConcurrentWorkers(w.chain.CacheConfig().TriePrefetcherParallelism) + opt := state.WithConcurrentWorkers(w.chain.CacheConfig().TriePrefetcherParallelism) currentState.StartPrefetcher("miner", opt) return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), @@ -301,7 +297,6 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre predicateContext: predicateContext, predicateResults: predicate.NewResults(), start: tstart, - cleanup: cleanup, }, nil } From 5ebae5e2a36fc21b132a24bc811fc699baa36c7e Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Mon, 25 Nov 2024 10:42:58 -0800 Subject: [PATCH 08/18] nits --- RELEASES.md | 2 ++ core/blockchain.go | 6 ++---- core/state/trie_prefetcher_extra_test.go | 3 +-- core/state/trie_prefetcher_test.go | 9 +++------ miner/worker.go | 4 ++-- 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/RELEASES.md b/RELEASES.md index 9781320c7d..a28c0847bc 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -2,6 +2,8 @@ ## Pending Release +* Refactored trie_prefetcher.go to be structurally similar to upstream. + ## [v0.6.11](https://github.com/ava-labs/subnet-evm/releases/tag/v0.6.11) This release focuses on Standalone DB and database configs. diff --git a/core/blockchain.go b/core/blockchain.go index 99d50f5226..19615c80ba 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1367,8 +1367,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - opt := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) - statedb.StartPrefetcher("chain", opt) + statedb.StartPrefetcher("chain", state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism)) defer statedb.StopPrefetcher() // Process block using the parent state as reference point @@ -1727,8 +1726,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - opt := state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism) - statedb.StartPrefetcher("chain", opt) + statedb.StartPrefetcher("chain", state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism)) defer statedb.StopPrefetcher() // Process previously stored block diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index 4a7448b636..960ed9c0e8 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -167,8 +167,7 @@ func addKVs( return nil, common.Hash{}, err } if prefetchers > 0 { - opt := WithConcurrentWorkers(prefetchers) - statedb.StartPrefetcher(namespace, opt) + statedb.StartPrefetcher(namespace, WithConcurrentWorkers(prefetchers)) defer statedb.StopPrefetcher() } statedb.SetNonce(address1, 1) diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index 9df6ad8820..999fcfc789 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -59,8 +59,7 @@ func filledStateDB() *StateDB { func TestCopyAndClose(t *testing.T) { db := filledStateDB() - opt := WithConcurrentWorkers(maxConcurrency) - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WithConcurrentWorkers(maxConcurrency)) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) @@ -85,8 +84,7 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - opt := WithConcurrentWorkers(maxConcurrency) - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WithConcurrentWorkers(maxConcurrency)) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) a := prefetcher.trie(common.Hash{}, db.originalRoot) @@ -102,8 +100,7 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - opt := WithConcurrentWorkers(maxConcurrency) - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", opt) + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WithConcurrentWorkers(maxConcurrency)) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) cpy := prefetcher.copy() diff --git a/miner/worker.go b/miner/worker.go index 7a508b2e57..d0f275e169 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -284,8 +284,8 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre if err != nil { return nil, err } - opt := state.WithConcurrentWorkers(w.chain.CacheConfig().TriePrefetcherParallelism) - currentState.StartPrefetcher("miner", opt) + numPrefetchers := w.chain.CacheConfig().TriePrefetcherParallelism + currentState.StartPrefetcher("miner", state.WithConcurrentWorkers(numPrefetchers)) return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), state: currentState, From f6566801680b3fe68dcedcf537af852de97aa464 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Wed, 20 Nov 2024 07:48:56 -0800 Subject: [PATCH 09/18] review comments --- core/state/trie_prefetcher_extra_test.go | 54 +++++++++--------------- 1 file changed, 21 insertions(+), 33 deletions(-) diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index 960ed9c0e8..71bb5b54bd 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -54,28 +54,26 @@ func BenchmarkPrefetcherDatabase(b *testing.B) { countKey := []byte("count") blockKey := []byte("block") got, err := levelDB.Get(rootKey) - if err == nil { + if err == nil { // on success root = common.BytesToHash(got) } got, err = levelDB.Get(countKey) - if err == nil { + if err == nil { // on success count = binary.BigEndian.Uint64(got) } got, err = levelDB.Get(blockKey) - if err == nil { + if err == nil { // on success block = binary.BigEndian.Uint64(got) } // Make a trie on the levelDB address1 := common.Address{42} address2 := common.Address{43} - addBlock := func(db Database, snaps *snapshot.Tree, kvsPerBlock int, prefetchers int) (statedb *StateDB) { - statedb, root, err = addKVs(db, snaps, address1, address2, root, block, kvsPerBlock, prefetchers) + addBlock := func(db Database, snaps *snapshot.Tree, kvsPerBlock int, prefetchers int) { + _, root, err = addKVs(db, snaps, address1, address2, root, block, kvsPerBlock, prefetchers) require.NoError(err) count += uint64(kvsPerBlock) block++ - - return statedb } lastCommit := block @@ -88,12 +86,9 @@ func BenchmarkPrefetcherDatabase(b *testing.B) { lastCommit = block // update the tracking keys - err = levelDB.Put(rootKey, root.Bytes()) - require.NoError(err) - err = database.PutUInt64(levelDB, blockKey, block) - require.NoError(err) - err = database.PutUInt64(levelDB, countKey, count) - require.NoError(err) + require.NoError(levelDB.Put(rootKey, root.Bytes())) + require.NoError(database.PutUInt64(levelDB, blockKey, block)) + require.NoError(database.PutUInt64(levelDB, countKey, count)) } tdbConfig := &triedb.Config{ @@ -105,7 +100,7 @@ func BenchmarkPrefetcherDatabase(b *testing.B) { snaps := snapshot.NewTestTree(levelDB, fakeHash(block), root) for count < uint64(wantKVs) { previous := root - _ = addBlock(db, snaps, 100_000, 0) // Note this updates root and count + addBlock(db, snaps, 100_000, 0) // Note this updates root and count b.Logf("Root: %v, kvs: %d, block: %d", root, count, block) // Commit every 10 blocks or on the last iteration @@ -116,7 +111,7 @@ func BenchmarkPrefetcherDatabase(b *testing.B) { if previous != root { require.NoError(db.TrieDB().Dereference(previous)) } else { - panic("root did not change") + b.Fatal("root did not change") } } require.NoError(levelDB.Close()) @@ -138,7 +133,7 @@ func BenchmarkPrefetcherDatabase(b *testing.B) { } startLoads := getMetric("load") for i := 0; i < b.N; i++ { - _ = addBlock(db, snaps, updates, prefetchers) + addBlock(db, snaps, updates, prefetchers) } require.NoError(levelDB.Close()) b.ReportMetric(float64(getMetric("load")-startLoads)/float64(b.N), "loads") @@ -170,23 +165,16 @@ func addKVs( statedb.StartPrefetcher(namespace, WithConcurrentWorkers(prefetchers)) defer statedb.StopPrefetcher() } - statedb.SetNonce(address1, 1) - for i := 0; i < count/2; i++ { - key := make([]byte, 32) - value := make([]byte, 32) - rand.Read(key) - rand.Read(value) - - statedb.SetState(address1, common.BytesToHash(key), common.BytesToHash(value)) - } - statedb.SetNonce(address2, 1) - for i := 0; i < count/2; i++ { - key := make([]byte, 32) - value := make([]byte, 32) - rand.Read(key) - rand.Read(value) - - statedb.SetState(address2, common.BytesToHash(key), common.BytesToHash(value)) + for _, address := range []common.Address{address1, address2} { + statedb.SetNonce(address, 1) + for i := 0; i < count/2; i++ { + key := make([]byte, 32) + value := make([]byte, 32) + rand.Read(key) + rand.Read(value) + + statedb.SetState(address, common.BytesToHash(key), common.BytesToHash(value)) + } } root, err = statedb.CommitWithSnap(block+1, true, snaps, fakeHash(block+1), fakeHash(block)) if err != nil { From 0187c95b4750c8f724db9b332de5e8849f5bf69b Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Mon, 16 Dec 2024 10:10:54 -0800 Subject: [PATCH 10/18] Update RELEASES.md Signed-off-by: Darioush Jalali --- RELEASES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/RELEASES.md b/RELEASES.md index 5b69a2ef84..4368d17eda 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -4,6 +4,9 @@ * Refactored trie_prefetcher.go to be structurally similar to upstream. +## [v0.7.0](https://github.com/ava-labs/subnet-evm/releases/tag/v0.7.0) + +- Changed default write option from `Sync` to `NoSync` in PebbleDB ## [v0.6.11](https://github.com/ava-labs/subnet-evm/releases/tag/v0.6.11) From 9124c2963fd8d602349b5ba3c4d96a7fa79d1659 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Mon, 16 Dec 2024 10:11:32 -0800 Subject: [PATCH 11/18] Update RELEASES.md Signed-off-by: Darioush Jalali --- RELEASES.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/RELEASES.md b/RELEASES.md index 4368d17eda..15d9444cd5 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -6,8 +6,14 @@ ## [v0.7.0](https://github.com/ava-labs/subnet-evm/releases/tag/v0.7.0) +## Updates + - Changed default write option from `Sync` to `NoSync` in PebbleDB +## Fixes + +- Fixed database close on shutdown + ## [v0.6.11](https://github.com/ava-labs/subnet-evm/releases/tag/v0.6.11) This release focuses on Standalone DB and database configs. From 6881c249390828db1c719e0a0ef3ac284d79f3ac Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Mon, 16 Dec 2024 10:12:10 -0800 Subject: [PATCH 12/18] Update RELEASES.md Signed-off-by: Darioush Jalali --- RELEASES.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RELEASES.md b/RELEASES.md index 15d9444cd5..29febf1347 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -6,11 +6,11 @@ ## [v0.7.0](https://github.com/ava-labs/subnet-evm/releases/tag/v0.7.0) -## Updates +### Updates - Changed default write option from `Sync` to `NoSync` in PebbleDB -## Fixes +### Fixes - Fixed database close on shutdown From acd80b2d294917ffc24460ed3c6fdbf90290da90 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 17 Dec 2024 10:11:11 -0800 Subject: [PATCH 13/18] Update libevm/sync/sync.go Co-authored-by: Quentin McGaw Signed-off-by: Darioush Jalali --- libevm/sync/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libevm/sync/sync.go b/libevm/sync/sync.go index 991a3a875e..7621e20d79 100644 --- a/libevm/sync/sync.go +++ b/libevm/sync/sync.go @@ -1,4 +1,4 @@ -// Copyright 2024 the libevm authors. +// Copyright 2024 the subnet-evm authors. // // The libevm additions to go-ethereum are free software: you can redistribute // them and/or modify them under the terms of the GNU Lesser General Public License From 64f2587d265ef801e104ba0a4cc12edb40558157 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 17 Dec 2024 10:11:32 -0800 Subject: [PATCH 14/18] Update core/state/trie_prefetcher_extra_test.go Co-authored-by: Quentin McGaw Signed-off-by: Darioush Jalali --- core/state/trie_prefetcher_extra_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index 71bb5b54bd..35bf98d7a6 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -178,7 +178,7 @@ func addKVs( } root, err = statedb.CommitWithSnap(block+1, true, snaps, fakeHash(block+1), fakeHash(block)) if err != nil { - return nil, common.Hash{}, err + return nil, common.Hash{}, fmt.Errorf("committing with snap: %w", err) } return statedb, root, err } From 86abc25d2f3a1236026e834a96feda7bcd9e1f4b Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 17 Dec 2024 10:11:47 -0800 Subject: [PATCH 15/18] Update core/state/trie_prefetcher_extra_test.go Co-authored-by: Quentin McGaw Signed-off-by: Darioush Jalali --- core/state/trie_prefetcher_extra_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index 35bf98d7a6..368223398a 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -159,7 +159,7 @@ func addKVs( } statedb, err := NewWithSnapshot(root, db, snap) if err != nil { - return nil, common.Hash{}, err + return nil, common.Hash{}, fmt.Errorf("creating state with snapshot: %w", err) } if prefetchers > 0 { statedb.StartPrefetcher(namespace, WithConcurrentWorkers(prefetchers)) From 8dbc22a4fb661b7ee9b12dfd25fa0d8670575f91 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 17 Dec 2024 10:12:06 -0800 Subject: [PATCH 16/18] Update core/state/trie_prefetcher_extra_test.go Co-authored-by: Quentin McGaw Signed-off-by: Darioush Jalali --- core/state/trie_prefetcher_extra_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index 368223398a..b9b02d56c4 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -180,5 +180,5 @@ func addKVs( if err != nil { return nil, common.Hash{}, fmt.Errorf("committing with snap: %w", err) } - return statedb, root, err + return statedb, root, nil } From 65b946cecf039e4d4b7974684cb12b2287da72ce Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 17 Dec 2024 10:32:16 -0800 Subject: [PATCH 17/18] improve comment --- core/state/trie_prefetcher_extra_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index b9b02d56c4..689820d15b 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -26,9 +26,14 @@ import ( const namespace = "chain" -// Write a test to add 100m kvs to a leveldb so that we can test the prefetcher -// performance. - +// BenchmarkPrefetcherDatabase benchmarks the performance of the trie +// prefetcher. By default, a state with 100k storage keys is created and stored +// in a temporary directory. Setting the TEST_DB_KVS and TEST_DB_DIR environment +// variables modifies the defaults. The benchmark measures the time to update +// the trie after 100, 200, and 500 storage slot updates per iteration, +// simulating a block with that number of storage slot updates. For performance +// reasons, when making changes involving the trie prefetcher, this benchmark +// should be run against state including with around 100m storage entries. func BenchmarkPrefetcherDatabase(b *testing.B) { require := require.New(b) From b1ac029957843533b5ddb731c60700c68194c04c Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 17 Dec 2024 11:24:50 -0800 Subject: [PATCH 18/18] Update core/state/trie_prefetcher_extra_test.go Co-authored-by: Quentin McGaw Signed-off-by: Darioush Jalali --- core/state/trie_prefetcher_extra_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go index 689820d15b..5ac1d1c5c2 100644 --- a/core/state/trie_prefetcher_extra_test.go +++ b/core/state/trie_prefetcher_extra_test.go @@ -33,7 +33,7 @@ const namespace = "chain" // the trie after 100, 200, and 500 storage slot updates per iteration, // simulating a block with that number of storage slot updates. For performance // reasons, when making changes involving the trie prefetcher, this benchmark -// should be run against state including with around 100m storage entries. +// should be run against a state including around 100m storage entries. func BenchmarkPrefetcherDatabase(b *testing.B) { require := require.New(b)