Skip to content

Commit 51f4757

Browse files
eth/traces: add state limit ethereum#25812 (#1300)
This PR introduces a new mechanism in chain tracer for preventing creating too many trace states. The workflow of chain tracer can be divided into several parts: - state creator generates trace state in a thread - state tracer retrieves the trace state and applies the tracing on top in another thread - state collector gathers all result from state tracer and stream to users It's basically a producer-consumer model here, while if we imagine that the state producer generates states too fast, then it will lead to accumulate lots of unused states in memory. Even worse, in path-based state scheme it will only keep the latest 128 states in memory, and the newly generated state will invalidate the oldest one by marking it as stale. The solution for fixing it is to limit the speed of state generation. If there are over 128 states un-consumed in memory, then the creation will be paused until the states are be consumed properly. Co-authored-by: rjl493456442 <[email protected]>
1 parent 0988b4b commit 51f4757

File tree

3 files changed

+309
-39
lines changed

3 files changed

+309
-39
lines changed

eth/tracers/api.go

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ const (
6161
// For non-archive nodes, this limit _will_ be overblown, as disk-backed tries
6262
// will only be found every ~15K blocks or so.
6363
defaultTracechainMemLimit = common.StorageSize(500 * 1024 * 1024)
64+
65+
// maximumPendingTraceStates is the maximum number of states allowed waiting
66+
// for tracing. The creation of trace state will be paused if the unused
67+
// trace states exceed this limit.
68+
maximumPendingTraceStates = 128
6469
)
6570

6671
// StateReleaseFunc is used to deallocate resources held by constructing a
@@ -245,30 +250,6 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf
245250
return sub, nil
246251
}
247252

248-
// releaser is a helper tool responsible for caching the release
249-
// callbacks of tracing state.
250-
type releaser struct {
251-
releases []StateReleaseFunc
252-
lock sync.Mutex
253-
}
254-
255-
func (r *releaser) add(release StateReleaseFunc) {
256-
r.lock.Lock()
257-
defer r.lock.Unlock()
258-
259-
r.releases = append(r.releases, release)
260-
}
261-
262-
func (r *releaser) call() {
263-
r.lock.Lock()
264-
defer r.lock.Unlock()
265-
266-
for _, release := range r.releases {
267-
release()
268-
}
269-
r.releases = r.releases[:0]
270-
}
271-
272253
// traceChain configures a new tracer according to the provided configuration, and
273254
// executes all the transactions contained within. The tracing chain range includes
274255
// the end block but excludes the start one. The return value will be one item per
@@ -285,11 +266,11 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
285266
threads = blocks
286267
}
287268
var (
288-
pend = new(sync.WaitGroup)
289-
ctx = context.Background()
290-
taskCh = make(chan *blockTraceTask, threads)
291-
resCh = make(chan *blockTraceTask, threads)
292-
reler = new(releaser)
269+
pend = new(sync.WaitGroup)
270+
ctx = context.Background()
271+
taskCh = make(chan *blockTraceTask, threads)
272+
resCh = make(chan *blockTraceTask, threads)
273+
tracker = newStateTracker(maximumPendingTraceStates, start.NumberU64())
293274
)
294275
for th := 0; th < threads; th++ {
295276
pend.Add(1)
@@ -328,8 +309,10 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
328309
task.statedb.Finalise(api.backend.ChainConfig().IsEIP158(task.block.Number()))
329310
task.results[i] = &txTraceResult{Result: res}
330311
}
331-
// Tracing state is used up, queue it for de-referencing
332-
reler.add(task.release)
312+
// Tracing state is used up, queue it for de-referencing. Note the
313+
// state is the parent state of trace block, use block.number-1 as
314+
// the state number.
315+
tracker.releaseState(task.block.NumberU64()-1, task.release)
333316

334317
// Stream the result back to the result catcher or abort on teardown
335318
select {
@@ -356,8 +339,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
356339
close(taskCh)
357340
pend.Wait()
358341

359-
// Clean out any pending derefs.
360-
reler.call()
342+
// Clean out any pending release functions of trace states.
343+
tracker.callReleases()
361344

362345
// Log the chain result
363346
switch {
@@ -394,6 +377,13 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
394377
failed = err
395378
break
396379
}
380+
// Make sure the state creator doesn't go too far. Too many unprocessed
381+
// trace state may cause the oldest state to become stale(e.g. in
382+
// path-based scheme).
383+
if err = tracker.wait(number); err != nil {
384+
failed = err
385+
break
386+
}
397387
// Prepare the statedb for tracing. Don't use the live database for
398388
// tracing to avoid persisting state junks into the database. Switch
399389
// over to `preferDisk` mode only if the memory usage exceeds the
@@ -409,18 +399,18 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
409399
failed = err
410400
break
411401
}
412-
// Clean out any pending derefs. Note this step must be done after
413-
// constructing tracing state, because the tracing state of block
414-
// next depends on the parent state and construction may fail if
415-
// we release too early.
416-
reler.call()
402+
// Clean out any pending release functions of trace state. Note this
403+
// step must be done after constructing tracing state, because the
404+
// tracing state of block next depends on the parent state and construction
405+
// may fail if we release too early.
406+
tracker.callReleases()
417407

418408
// Send the block over to the concurrent tracers (if not in the fast-forward phase)
419409
txs := next.Transactions()
420410
select {
421411
case taskCh <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}:
422412
case <-closed:
423-
reler.add(release)
413+
tracker.releaseState(number, release)
424414
return
425415
}
426416
traced += uint64(len(txs))

eth/tracers/tracker.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2022 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package tracers
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
)
23+
24+
// stateTracker is an auxiliary tool used to cache the release functions of all
25+
// used trace states, and to determine whether the creation of trace state needs
26+
// to be paused in case there are too many states waiting for tracing.
27+
type stateTracker struct {
28+
limit int // Maximum number of states allowed waiting for tracing
29+
oldest uint64 // The number of the oldest state which is still using for trace
30+
used []bool // List of flags indicating whether the trace state has been used up
31+
releases []StateReleaseFunc // List of trace state release functions waiting to be called
32+
cond *sync.Cond
33+
lock *sync.RWMutex
34+
}
35+
36+
// newStateTracker initializes the tracker with provided state limits and
37+
// the number of the first state that will be used.
38+
func newStateTracker(limit int, oldest uint64) *stateTracker {
39+
lock := new(sync.RWMutex)
40+
return &stateTracker{
41+
limit: limit,
42+
oldest: oldest,
43+
used: make([]bool, limit),
44+
cond: sync.NewCond(lock),
45+
lock: lock,
46+
}
47+
}
48+
49+
// releaseState marks the state specified by the number as released and caches
50+
// the corresponding release functions internally.
51+
func (t *stateTracker) releaseState(number uint64, release StateReleaseFunc) {
52+
t.lock.Lock()
53+
defer t.lock.Unlock()
54+
55+
// Set the state as used, the corresponding flag is indexed by
56+
// the distance between the specified state and the oldest state
57+
// which is still using for trace.
58+
t.used[int(number-t.oldest)] = true
59+
60+
// If the oldest state is used up, update the oldest marker by moving
61+
// it to the next state which is not used up.
62+
if number == t.oldest {
63+
var count int
64+
for _, used := range t.used {
65+
if !used {
66+
break
67+
}
68+
count += 1
69+
}
70+
t.oldest += uint64(count)
71+
copy(t.used, t.used[count:])
72+
73+
// Clean up the array tail since they are useless now.
74+
for i := t.limit - count; i < t.limit; i++ {
75+
t.used[i] = false
76+
}
77+
// Fire the signal to all waiters that oldest marker is updated.
78+
t.cond.Broadcast()
79+
}
80+
t.releases = append(t.releases, release)
81+
}
82+
83+
// callReleases invokes all cached release functions.
84+
func (t *stateTracker) callReleases() {
85+
t.lock.Lock()
86+
defer t.lock.Unlock()
87+
88+
for _, release := range t.releases {
89+
release()
90+
}
91+
t.releases = t.releases[:0]
92+
}
93+
94+
// wait blocks until the accumulated trace states are less than the limit.
95+
func (t *stateTracker) wait(number uint64) error {
96+
t.lock.Lock()
97+
defer t.lock.Unlock()
98+
99+
for {
100+
if number < t.oldest {
101+
return fmt.Errorf("invalid state number %d head %d", number, t.oldest)
102+
}
103+
if number < t.oldest+uint64(t.limit) {
104+
// number is now within limit, wait over
105+
return nil
106+
}
107+
t.cond.Wait()
108+
}
109+
}

0 commit comments

Comments
 (0)