Skip to content

Commit 9b59c75

Browse files
rjl493456442karalabe
authored andcommitted
miner: fix data race in tests (#20310)
* miner: fix data race in tests miner: fix linter * miner: address comment
1 parent f71e85b commit 9b59c75

File tree

3 files changed

+28
-104
lines changed

3 files changed

+28
-104
lines changed

miner/miner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even
7272
mux: mux,
7373
engine: engine,
7474
exitCh: make(chan struct{}),
75-
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock),
75+
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
7676
canStart: 1,
7777
}
7878
go miner.update()

miner/worker.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ type worker struct {
176176
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
177177
}
178178

179-
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool) *worker {
179+
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
180180
worker := &worker{
181181
config: config,
182182
chainConfig: chainConfig,
@@ -219,8 +219,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
219219
go worker.taskLoop()
220220

221221
// Submit first work to initialize pending state.
222-
worker.startCh <- struct{}{}
223-
222+
if init {
223+
worker.startCh <- struct{}{}
224+
}
224225
return worker
225226
}
226227

miner/worker_test.go

Lines changed: 23 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package miner
1919
import (
2020
"math/big"
2121
"math/rand"
22+
"sync/atomic"
2223
"testing"
2324
"time"
2425

@@ -180,7 +181,7 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction {
180181
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) {
181182
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
182183
backend.txPool.AddLocals(pendingTxs)
183-
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil)
184+
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false)
184185
w.setEtherbase(testBankAddress)
185186
return w, backend
186187
}
@@ -230,32 +231,13 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
230231
newBlock <- struct{}{}
231232
}
232233
}
233-
234-
// Ensure worker has finished initialization
235-
for {
236-
b := w.pendingBlock()
237-
if b != nil && b.NumberU64() == 1 {
238-
break
239-
}
240-
}
241-
w.start() // Start mining!
242-
243-
// Ignore first 2 commits caused by start operation
244-
ignored := make(chan struct{}, 2)
245-
w.skipSealHook = func(task *task) bool {
246-
ignored <- struct{}{}
247-
return true
248-
}
249-
for i := 0; i < 2; i++ {
250-
<-ignored
251-
}
252-
253-
go listenNewBlock()
254-
255234
// Ignore empty commit here for less noise
256235
w.skipSealHook = func(task *task) bool {
257236
return len(task.receipts) == 0
258237
}
238+
w.start() // Start mining!
239+
go listenNewBlock()
240+
259241
for i := 0; i < 5; i++ {
260242
b.txPool.AddLocal(b.newRandomTx(true))
261243
b.txPool.AddLocal(b.newRandomTx(false))
@@ -269,38 +251,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
269251
}
270252
}
271253

272-
func TestPendingStateAndBlockEthash(t *testing.T) {
273-
testPendingStateAndBlock(t, ethashChainConfig, ethash.NewFaker())
274-
}
275-
func TestPendingStateAndBlockClique(t *testing.T) {
276-
testPendingStateAndBlock(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
277-
}
278-
279-
func testPendingStateAndBlock(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
280-
defer engine.Close()
281-
282-
w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)
283-
defer w.close()
284-
285-
// Ensure snapshot has been updated.
286-
time.Sleep(100 * time.Millisecond)
287-
block, state := w.pending()
288-
if block.NumberU64() != 1 {
289-
t.Errorf("block number mismatch: have %d, want %d", block.NumberU64(), 1)
290-
}
291-
if balance := state.GetBalance(testUserAddress); balance.Cmp(big.NewInt(1000)) != 0 {
292-
t.Errorf("account balance mismatch: have %d, want %d", balance, 1000)
293-
}
294-
b.txPool.AddLocals(newTxs)
295-
296-
// Ensure the new tx events has been processed
297-
time.Sleep(100 * time.Millisecond)
298-
block, state = w.pending()
299-
if balance := state.GetBalance(testUserAddress); balance.Cmp(big.NewInt(2000)) != 0 {
300-
t.Errorf("account balance mismatch: have %d, want %d", balance, 2000)
301-
}
302-
}
303-
304254
func TestEmptyWorkEthash(t *testing.T) {
305255
testEmptyWork(t, ethashChainConfig, ethash.NewFaker())
306256
}
@@ -315,49 +265,41 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens
315265
defer w.close()
316266

317267
var (
318-
taskCh = make(chan struct{}, 2)
319268
taskIndex int
269+
taskCh = make(chan struct{}, 2)
320270
)
321-
322271
checkEqual := func(t *testing.T, task *task, index int) {
272+
// The first empty work without any txs included
323273
receiptLen, balance := 0, big.NewInt(0)
324274
if index == 1 {
275+
// The second full work with 1 tx included
325276
receiptLen, balance = 1, big.NewInt(1000)
326277
}
327278
if len(task.receipts) != receiptLen {
328-
t.Errorf("receipt number mismatch: have %d, want %d", len(task.receipts), receiptLen)
279+
t.Fatalf("receipt number mismatch: have %d, want %d", len(task.receipts), receiptLen)
329280
}
330281
if task.state.GetBalance(testUserAddress).Cmp(balance) != 0 {
331-
t.Errorf("account balance mismatch: have %d, want %d", task.state.GetBalance(testUserAddress), balance)
282+
t.Fatalf("account balance mismatch: have %d, want %d", task.state.GetBalance(testUserAddress), balance)
332283
}
333284
}
334-
335285
w.newTaskHook = func(task *task) {
336286
if task.block.NumberU64() == 1 {
337287
checkEqual(t, task, taskIndex)
338288
taskIndex += 1
339289
taskCh <- struct{}{}
340290
}
341291
}
292+
w.skipSealHook = func(task *task) bool { return true }
342293
w.fullTaskHook = func() {
343294
// Aarch64 unit tests are running in a VM on travis, they must
344295
// be given more time to execute.
345296
time.Sleep(time.Second)
346297
}
347-
348-
// Ensure worker has finished initialization
349-
for {
350-
b := w.pendingBlock()
351-
if b != nil && b.NumberU64() == 1 {
352-
break
353-
}
354-
}
355-
356-
w.start()
298+
w.start() // Start mining!
357299
for i := 0; i < 2; i += 1 {
358300
select {
359301
case <-taskCh:
360-
case <-time.NewTimer(30 * time.Second).C:
302+
case <-time.NewTimer(3 * time.Second).C:
361303
t.Error("new task timeout")
362304
}
363305
}
@@ -375,6 +317,9 @@ func TestStreamUncleBlock(t *testing.T) {
375317
taskIndex := 0
376318
w.newTaskHook = func(task *task) {
377319
if task.block.NumberU64() == 2 {
320+
// The first task is an empty task, the second
321+
// one has 1 pending tx, the third one has 1 tx
322+
// and 1 uncle.
378323
if taskIndex == 2 {
379324
have := task.block.Header().UncleHash
380325
want := types.CalcUncleHash([]*types.Header{b.uncleBlock.Header()})
@@ -392,26 +337,17 @@ func TestStreamUncleBlock(t *testing.T) {
392337
w.fullTaskHook = func() {
393338
time.Sleep(100 * time.Millisecond)
394339
}
395-
396-
// Ensure worker has finished initialization
397-
for {
398-
b := w.pendingBlock()
399-
if b != nil && b.NumberU64() == 2 {
400-
break
401-
}
402-
}
403340
w.start()
404341

405-
// Ignore the first two works
406342
for i := 0; i < 2; i += 1 {
407343
select {
408344
case <-taskCh:
409345
case <-time.NewTimer(time.Second).C:
410346
t.Error("new task timeout")
411347
}
412348
}
413-
b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}})
414349

350+
b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}})
415351
select {
416352
case <-taskCh:
417353
case <-time.NewTimer(time.Second).C:
@@ -438,6 +374,8 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en
438374
taskIndex := 0
439375
w.newTaskHook = func(task *task) {
440376
if task.block.NumberU64() == 1 {
377+
// The first task is an empty task, the second
378+
// one has 1 pending tx, the third one has 2 txs
441379
if taskIndex == 2 {
442380
receiptLen, balance := 2, big.NewInt(2000)
443381
if len(task.receipts) != receiptLen {
@@ -457,13 +395,6 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en
457395
w.fullTaskHook = func() {
458396
time.Sleep(100 * time.Millisecond)
459397
}
460-
// Ensure worker has finished initialization
461-
for {
462-
b := w.pendingBlock()
463-
if b != nil && b.NumberU64() == 1 {
464-
break
465-
}
466-
}
467398

468399
w.start()
469400
// Ignore the first two works
@@ -508,11 +439,11 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
508439
progress = make(chan struct{}, 10)
509440
result = make([]float64, 0, 10)
510441
index = 0
511-
start = false
442+
start uint32
512443
)
513444
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
514445
// Short circuit if interval checking hasn't started.
515-
if !start {
446+
if atomic.LoadUint32(&start) == 0 {
516447
return
517448
}
518449
var wantMinInterval, wantRecommitInterval time.Duration
@@ -544,19 +475,11 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
544475
index += 1
545476
progress <- struct{}{}
546477
}
547-
// Ensure worker has finished initialization
548-
for {
549-
b := w.pendingBlock()
550-
if b != nil && b.NumberU64() == 1 {
551-
break
552-
}
553-
}
554-
555478
w.start()
556479

557-
time.Sleep(time.Second)
480+
time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt
481+
atomic.StoreUint32(&start, 1)
558482

559-
start = true
560483
w.setRecommitInterval(3 * time.Second)
561484
select {
562485
case <-progress:

0 commit comments

Comments
 (0)