Skip to content

Commit 8e6d98e

Browse files
gzliudanholiman
andauthored
eth/downloader: change intial download size ethereum#21366 (#1045)
This changes how the downloader works, a little bit. Previously, when block sync started, we immediately started filling up to 8192 blocks. Usually this is fine, blocks are small in the early numbers. The threshold then is lowered as we measure the size of the blocks that are filled. However, if the node is shut down and restarts syncing while we're in a heavy segment, that might be bad. This PR introduces a more conservative initial threshold of 2K blocks instead. Co-authored-by: Martin Holst Swende <[email protected]>
1 parent ea459d6 commit 8e6d98e

File tree

5 files changed

+26
-24
lines changed

5 files changed

+26
-24
lines changed

eth/downloader/downloader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai
213213
dl := &Downloader{
214214
stateDB: stateDb,
215215
mux: mux,
216-
queue: newQueue(blockCacheItems),
216+
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
217217
peers: newPeerSet(),
218218
rttEstimate: uint64(rttMaxEstimate),
219219
rttConfidence: uint64(1000000),
@@ -359,7 +359,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
359359
log.Info("Block synchronisation started")
360360
}
361361
// Reset the queue, peer set and wake channels to clean any internal leftover state
362-
d.queue.Reset(blockCacheItems)
362+
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
363363
d.peers.Reset()
364364

365365
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {

eth/downloader/downloader_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
// Reduce some of the parameters to make the tester faster.
4040
func init() {
4141
MaxForkAncestry = uint64(10000)
42-
blockCacheItems = 1024
42+
blockCacheMaxItems = 1024
4343
fsHeaderContCheck = 500 * time.Millisecond
4444
}
4545

@@ -469,7 +469,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
469469
defer tester.terminate()
470470

471471
// Create a small enough block chain to download
472-
chain := testChainBase.shorten(blockCacheItems - 15)
472+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
473473
tester.newPeer("peer", protocol, chain)
474474

475475
// Synchronise with the peer and make sure all relevant data was retrieved
@@ -531,8 +531,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
531531
}
532532
tester.lock.Unlock()
533533

534-
if cached == blockCacheItems ||
535-
cached == blockCacheItems-reorgProtHeaderDelay ||
534+
if cached == blockCacheMaxItems ||
535+
cached == blockCacheMaxItems-reorgProtHeaderDelay ||
536536
retrieved+cached+frozen == targetBlocks+1 ||
537537
retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
538538
break
@@ -543,8 +543,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
543543
tester.lock.RLock()
544544
retrieved = len(tester.ownBlocks)
545545
tester.lock.RUnlock()
546-
if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
547-
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
546+
if cached != blockCacheMaxItems && cached != blockCacheMaxItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
547+
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
548548
}
549549

550550
// Permit the blocked blocks to import
@@ -807,7 +807,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
807807
defer tester.terminate()
808808

809809
// Create a small enough block chain to download
810-
chain := testChainBase.shorten(blockCacheItems - 15)
810+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
811811

812812
// Create peers of every type
813813
tester.newPeer("peer 62", 62, chain)
@@ -897,7 +897,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
897897
tester := newTester()
898898
defer tester.terminate()
899899

900-
chain := testChainBase.shorten(blockCacheItems - 15)
900+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
901901
brokenChain := chain.shorten(chain.len())
902902
delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
903903
tester.newPeer("attack", protocol, brokenChain)
@@ -928,7 +928,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
928928
tester := newTester()
929929
defer tester.terminate()
930930

931-
chain := testChainBase.shorten(blockCacheItems - 15)
931+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
932932

933933
// Attempt a full sync with an attacker feeding shifted headers
934934
brokenChain := chain.shorten(chain.len())
@@ -1129,7 +1129,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
11291129

11301130
tester := newTester()
11311131
defer tester.terminate()
1132-
chain := testChainBase.shorten(blockCacheItems - 15)
1132+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
11331133

11341134
// Set a sync init hook to catch progress changes
11351135
starting := make(chan struct{})
@@ -1290,7 +1290,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
12901290

12911291
tester := newTester()
12921292
defer tester.terminate()
1293-
chain := testChainBase.shorten(blockCacheItems - 15)
1293+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
12941294

12951295
// Set a sync init hook to catch progress changes
12961296
starting := make(chan struct{})
@@ -1362,7 +1362,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
13621362

13631363
tester := newTester()
13641364
defer tester.terminate()
1365-
chain := testChainBase.shorten(blockCacheItems - 15)
1365+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
13661366

13671367
// Set a sync init hook to catch progress changes
13681368
starting := make(chan struct{})

eth/downloader/queue.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ const (
4040
)
4141

4242
var (
43-
blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download
44-
blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
45-
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
43+
blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download
44+
blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks
45+
blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
46+
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
4647
)
4748

4849
var (
@@ -142,7 +143,7 @@ type queue struct {
142143
}
143144

144145
// newQueue creates a new download queue for scheduling block retrieval.
145-
func newQueue(blockCacheLimit int) *queue {
146+
func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
146147
lock := new(sync.RWMutex)
147148
q := &queue{
148149
headerContCh: make(chan bool),
@@ -151,12 +152,12 @@ func newQueue(blockCacheLimit int) *queue {
151152
active: sync.NewCond(lock),
152153
lock: lock,
153154
}
154-
q.Reset(blockCacheLimit)
155+
q.Reset(blockCacheLimit, thresholdInitialSize)
155156
return q
156157
}
157158

158159
// Reset clears out the queue contents.
159-
func (q *queue) Reset(blockCacheLimit int) {
160+
func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
160161
q.lock.Lock()
161162
defer q.lock.Unlock()
162163

@@ -175,6 +176,7 @@ func (q *queue) Reset(blockCacheLimit int) {
175176
q.receiptPendPool = make(map[string]*fetchRequest)
176177

177178
q.resultCache = newResultStore(blockCacheLimit)
179+
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
178180
}
179181

180182
// Close marks the end of the sync, unblocking Results.

eth/downloader/queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func dummyPeer(id string) *peerConnection {
9999
}
100100

101101
func TestBasics(t *testing.T) {
102-
q := newQueue(10)
102+
q := newQueue(10, 10)
103103
if !q.Idle() {
104104
t.Errorf("new queue should be idle")
105105
}
@@ -176,7 +176,7 @@ func TestBasics(t *testing.T) {
176176
}
177177

178178
func TestEmptyBlocks(t *testing.T) {
179-
q := newQueue(10)
179+
q := newQueue(10, 10)
180180

181181
q.Prepare(1, FastSync)
182182
// Schedule a batch of headers
@@ -245,7 +245,7 @@ func XTestDelivery(t *testing.T) {
245245
if false {
246246
log.SetDefault(log.NewLogger(slog.NewTextHandler(os.Stdout, nil)))
247247
}
248-
q := newQueue(10)
248+
q := newQueue(10, 10)
249249
var wg sync.WaitGroup
250250
q.Prepare(1, FastSync)
251251
wg.Add(1)

eth/downloader/testchain_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ var (
3939
)
4040

4141
// The common prefix of all test chains:
42-
var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
42+
var testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis)
4343

4444
// Different forks on top of the base chain:
4545
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain

0 commit comments

Comments
 (0)