From 48d15eb679554f76a75da9e71f610494e763ca90 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 12 Jan 2018 14:50:45 +0800 Subject: [PATCH 01/12] consensus/ethash: start remote ggoroutine to handle remote mining --- consensus/ethash/ethash.go | 47 +++++++++++++- consensus/ethash/sealer.go | 121 ++++++++++++++++++++++++++++++++++++- 2 files changed, 164 insertions(+), 4 deletions(-) diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index f79dd6c36b94..1cb50addf98c 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -33,7 +33,9 @@ import ( "unsafe" mmap "github.com/edsrzf/mmap-go" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" @@ -389,6 +391,29 @@ type Config struct { PowMode Mode } +// remoteOpType defines all the types of operations related to remote sealer. +type remoteOpType uint + +const ( + getWork remoteOpType = iota + submitWork +) + +// sealResult wraps the pow solution parameters for the specified block. +type sealResult struct { + nonce types.BlockNonce + mixDigest common.Hash + hash common.Hash +} + +// remoteOp wraps a mining operation sent by remote miner. +type remoteOp struct { + typ remoteOpType + result sealResult + errCh chan error + workCh chan [3]string +} + // Ethash is a consensus engine based on proof-of-work implementing the ethash // algorithm. type Ethash struct { @@ -397,12 +422,19 @@ type Ethash struct { caches *lru // In memory caches to avoid regenerating too often datasets *lru // In memory datasets to avoid regenerating too often + exitCh chan struct{} // Notification channel to exiting backend threads + // Mining related fields rand *rand.Rand // Properly seeded random source for nonces threads int // Number of threads to mine on if mining update chan struct{} // Notification channel to update mining parameters hashrate metrics.Meter // Meter tracking the average hashrate + // Remote sealer related fields + workCh chan *types.Block // Notification channel to push new work to remote sealer + resultCh chan *types.Block // Channel used by mining threads to return result + remoteOp chan *remoteOp // Channel used to receive all mining operations from api + // The fields below are hooks for testing shared *Ethash // Shared PoW verifier to avoid cache regeneration fakeFail uint64 // Block number which fails PoW check even in fake mode @@ -423,13 +455,20 @@ func New(config Config) *Ethash { if config.DatasetDir != "" && config.DatasetsOnDisk > 0 { log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk) } - return &Ethash{ + ethash := &Ethash{ config: config, caches: newlru("cache", config.CachesInMem, newCache), datasets: newlru("dataset", config.DatasetsInMem, newDataset), update: make(chan struct{}), hashrate: metrics.NewMeter(), + exitCh: make(chan struct{}), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + remoteOp: make(chan *remoteOp), } + go ethash.remote(ethash.resultCh) + runtime.SetFinalizer(ethash, (*Ethash).close) + return ethash } // NewTester creates a small sized ethash PoW scheme useful only for testing @@ -489,6 +528,12 @@ func NewShared() *Ethash { return &Ethash{shared: sharedEthash} } +// close closes the exit channel to notify all backend threads exiting. +func (ethash *Ethash) close() { + runtime.SetFinalizer(ethash, nil) + close(ethash.exitCh) +} + // cache tries to retrieve a verification cache for the specified block number // by first checking against a list of in-memory caches, then against caches // stored on disk, and finally generating one if none can be found. diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index b5e742d8bbba..5d3cd8e94c47 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -24,12 +24,19 @@ import ( "runtime" "sync" + "errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) +var ( + errNoSealWork = errors.New("No work available yet, don't panic.") + errInvalidSealResult = errors.New("Invalid proof-of-work solution.") + errNonDefinedRemoteOp = errors.New("Non-defined remote mining operation.") +) + // Seal implements consensus.Engine, attempting to find a nonce that satisfies // the block's difficulty requirements. func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { @@ -43,9 +50,9 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if ethash.shared != nil { return ethash.shared.Seal(chain, block, stop) } + // Create a runner and the multiple search threads it directs abort := make(chan struct{}) - found := make(chan *types.Block) ethash.lock.Lock() threads := ethash.threads @@ -64,12 +71,25 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if threads < 0 { threads = 0 // Allows disabling local mining without extra logic around local/remote } + + // Clear up result channel before new round mining start + for empty := false; !empty; { + select { + case <-ethash.resultCh: + default: + empty = true + } + } + + // Push new work to remote sealer + ethash.workCh <- block + var pend sync.WaitGroup for i := 0; i < threads; i++ { pend.Add(1) go func(id int, nonce uint64) { defer pend.Done() - ethash.mine(block, id, nonce, abort, found) + ethash.mine(block, id, nonce, abort, ethash.resultCh) }(i, uint64(ethash.rand.Int63())) } // Wait until sealing is terminated or a nonce is found @@ -78,7 +98,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop case <-stop: // Outside abort, stop all miner threads close(abort) - case result = <-found: + case result = <-ethash.resultCh: // One of the threads found a block, abort all others close(abort) case <-ethash.update: @@ -150,3 +170,98 @@ search: // during sealing so it's not unmapped while being read. runtime.KeepAlive(dataset) } + +// remote starts a standalone goroutine to handle remote mining related stuff. +func (ethash *Ethash) remote(resultCh chan *types.Block) { + var ( + works = make(map[common.Hash]*types.Block) + currentWork *types.Block + ) + + // makeWork returns a work package for external sealer. The work package consists of 3 strings + // result[0], 32 bytes hex encoded current block header pow-hash + // result[1], 32 bytes hex encoded seed hash used for DAG + // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty + makeWork := func() ([3]string, error) { + var res [3]string + if currentWork == nil { + return res, errNoSealWork + } + res[0] = currentWork.HashNoNonce().Hex() + res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex() + // Calculate the "target" to be returned to the external sealer + n := big.NewInt(1) + n.Lsh(n, 255) + n.Div(n, currentWork.Difficulty()) + n.Lsh(n, 1) + res[2] = common.BytesToHash(n.Bytes()).Hex() + works[currentWork.HashNoNonce()] = currentWork + return res, nil + } + + // verifyWork verifies the submitted pow solution, returning + // whether the solution was accepted or not (not can be both a bad pow as well as + // any other error, like no work pending). + verifyWork := func(result sealResult) bool { + // Make sure the work submitted is present + block := works[result.hash] + if block == nil { + log.Info("Work submitted but none pending", "hash", result.hash) + return false + } + // Make sure the Engine solutions is indeed valid + header := block.Header() + header.Nonce = result.nonce + header.MixDigest = result.mixDigest + + if err := ethash.VerifySeal(nil, header); err != nil { + log.Warn("Invalid proof-of-work submitted", "hash", result.hash, "err", err) + return false + } + + // Solutions seems to be valid, return to the miner and notify acceptance + resultCh <- block.WithSeal(header) + delete(works, result.hash) + return true + } + +running: + for { + select { + case block := <-ethash.workCh: + if block.ParentHash() != currentWork.ParentHash() { + // Start new round mining, throw out all previous work. + works = make(map[common.Hash]*types.Block) + } + // Update current work with new received block + currentWork = block + + case op := <-ethash.remoteOp: + switch op.typ { + case getWork: + // Push current mining work to return channel + work, err := makeWork() + if err != nil { + op.errCh <- err + } else { + op.workCh <- work + close(op.errCh) + } + case submitWork: + // Verify submitted PoW solution based on maintained mining blocks + if verifyWork(op.result) { + close(op.errCh) + } else { + op.errCh <- errInvalidSealResult + } + default: + op.errCh <- errNonDefinedRemoteOp + } + + case <-ethash.exitCh: + // Exit remote loop if ethash object is cleared by GC + break running + } + } + log.Trace("Ethash remote sealer is exiting") +} From 9833ccd3f6a44cdd059733a44415dcc41ca4c9ef Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 12 Jan 2018 15:54:48 +0800 Subject: [PATCH 02/12] consensus/ethash: expose remote miner api --- consensus/ethash/api.go | 71 ++++++++++++++++++++++++++++++++++++++ consensus/ethash/ethash.go | 10 ++++-- 2 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 consensus/ethash/api.go diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go new file mode 100644 index 000000000000..3b7aea3ce2b0 --- /dev/null +++ b/consensus/ethash/api.go @@ -0,0 +1,71 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . +package ethash + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// API exposes ethash related methods for the RPC interface +type API struct { + ethash *Ethash +} + +// GetWork returns a work package for external miner. The work package consists of 3 strings +// result[0], 32 bytes hex encoded current block header pow-hash +// result[1], 32 bytes hex encoded seed hash used for DAG +// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty +func (s *API) GetWork() ([3]string, error) { + var ( + workCh = make(chan [3]string) + errCh = make(chan error) + ) + op := &remoteOp{ + typ: getWork, + workCh: workCh, + errCh: errCh, + } + s.ethash.remoteOp <- op + err := <-errCh + if err == nil { + return <-workCh, nil + } else { + return [3]string{}, err + } +} + +// SubmitWork can be used by external miner to submit their POW solution. It returns an indication if the work was +// accepted. Note, this is not an indication if the provided work was valid! +func (s *API) SubmitWork(nonce types.BlockNonce, solution, digest common.Hash) bool { + var errCh = make(chan error) + result := sealResult{ + nonce: nonce, + mixDigest: digest, + hash: solution, + } + op := &remoteOp{ + typ: submitWork, + result: result, + errCh: errCh, + } + s.ethash.remoteOp <- op + if err := <-errCh; err == nil { + return true + } else { + return false + } +} diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 1cb50addf98c..0c2bccb9a2a3 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -610,10 +610,14 @@ func (ethash *Ethash) Hashrate() float64 { return ethash.hashrate.Rate1() } -// APIs implements consensus.Engine, returning the user facing RPC APIs. Currently -// that is empty. +// APIs implements consensus.Engine, returning the user facing RPC APIs. func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { - return nil + return []rpc.API{{ + Namespace: "ethash", + Version: "1.0", + Service: &API{ethash}, + Public: true, + }} } // SeedHash is the seed to use for generating a verification cache and the mining From d2d3b84fa7e61f16ae689e39fa58f29ec3bc059a Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 12 Jan 2018 17:54:34 +0800 Subject: [PATCH 03/12] consensus/ethash: expose submitHashrate api --- consensus/ethash/api.go | 43 +++++-- consensus/ethash/ethash.go | 63 ++++++++-- consensus/ethash/ethash_test.go | 48 +++++++- consensus/ethash/sealer.go | 74 +++++++----- eth/api.go | 40 +------ miner/agent.go | 8 -- miner/miner.go | 8 -- miner/remote_agent.go | 202 -------------------------------- miner/worker.go | 1 - 9 files changed, 179 insertions(+), 308 deletions(-) delete mode 100644 miner/remote_agent.go diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index 3b7aea3ce2b0..fb64b85d0718 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -1,4 +1,4 @@ -// Copyright 2017 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -16,7 +16,10 @@ package ethash import ( + "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" ) @@ -35,13 +38,12 @@ func (s *API) GetWork() ([3]string, error) { errCh = make(chan error) ) op := &remoteOp{ - typ: getWork, + typ: opGetWork, workCh: workCh, errCh: errCh, } s.ethash.remoteOp <- op - err := <-errCh - if err == nil { + if err := <-errCh; err == nil { return <-workCh, nil } else { return [3]string{}, err @@ -52,15 +54,14 @@ func (s *API) GetWork() ([3]string, error) { // accepted. Note, this is not an indication if the provided work was valid! func (s *API) SubmitWork(nonce types.BlockNonce, solution, digest common.Hash) bool { var errCh = make(chan error) - result := sealResult{ - nonce: nonce, - mixDigest: digest, - hash: solution, - } op := &remoteOp{ - typ: submitWork, - result: result, - errCh: errCh, + typ: opSubmitWork, + result: mineResult{ + nonce: nonce, + mixDigest: digest, + hash: solution, + }, + errCh: errCh, } s.ethash.remoteOp <- op if err := <-errCh; err == nil { @@ -69,3 +70,21 @@ func (s *API) SubmitWork(nonce types.BlockNonce, solution, digest common.Hash) b return false } } + +// SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined +// hash rate of all miners which submit work through this node. It accepts the miner hash rate and an identifier which +// must be unique between nodes. +func (s *API) SubmitHashRate(r hexutil.Uint64, id common.Hash) bool { + submit := func(rate map[common.Hash]hashrate) { + rate[id] = hashrate{rate: uint64(r), ping: time.Now()} + } + errCh := make(chan error) + op := &remoteOp{ + typ: opHashrate, + rateOp: submit, + errCh: errCh, + } + s.ethash.remoteOp <- op + <-errCh + return true +} diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 0c2bccb9a2a3..2fccc5773463 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -395,23 +395,34 @@ type Config struct { type remoteOpType uint const ( - getWork remoteOpType = iota - submitWork + opGetWork remoteOpType = iota + opSubmitWork + opHashrate ) -// sealResult wraps the pow solution parameters for the specified block. -type sealResult struct { +// mineResult wraps the pow solution parameters for the specified block. +type mineResult struct { nonce types.BlockNonce mixDigest common.Hash hash common.Hash } +type hashrate struct { + id common.Hash + ping time.Time + rate uint64 +} + +type hashrateOp func(map[common.Hash]hashrate) + // remoteOp wraps a mining operation sent by remote miner. type remoteOp struct { typ remoteOpType - result sealResult + result mineResult errCh chan error workCh chan [3]string + + rateOp hashrateOp } // Ethash is a consensus engine based on proof-of-work implementing the ethash @@ -434,7 +445,6 @@ type Ethash struct { workCh chan *types.Block // Notification channel to push new work to remote sealer resultCh chan *types.Block // Channel used by mining threads to return result remoteOp chan *remoteOp // Channel used to receive all mining operations from api - // The fields below are hooks for testing shared *Ethash // Shared PoW verifier to avoid cache regeneration fakeFail uint64 // Block number which fails PoW check even in fake mode @@ -474,7 +484,20 @@ func New(config Config) *Ethash { // NewTester creates a small sized ethash PoW scheme useful only for testing // purposes. func NewTester() *Ethash { - return New(Config{CachesInMem: 1, PowMode: ModeTest}) + ethash := &Ethash{ + config: Config{PowMode: ModeTest}, + caches: newlru("cache", 1, newCache), + datasets: newlru("dataset", 1, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeter(), + exitCh: make(chan struct{}), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + remoteOp: make(chan *remoteOp), + } + go ethash.remote(ethash.resultCh) + runtime.SetFinalizer(ethash, (*Ethash).close) + return ethash } // NewFaker creates a ethash consensus engine with a fake PoW scheme that accepts @@ -606,14 +629,36 @@ func (ethash *Ethash) SetThreads(threads int) { // Hashrate implements PoW, returning the measured rate of the search invocations // per second over the last minute. +// Note the returned hashrate includes local hashrate, but also includes the total +// hashrate of all remote miner. func (ethash *Ethash) Hashrate() float64 { - return ethash.hashrate.Rate1() + var ( + total uint64 + errCh = make(chan error) + ) + // getHashrate gathers all remote miner's hashrate. + getHashrate := func(hashrate map[common.Hash]hashrate) { + for _, rate := range hashrate { + // this could overflow + total += rate.rate + } + return + } + op := &remoteOp{ + typ: opHashrate, + rateOp: getHashrate, + errCh: errCh, + } + ethash.remoteOp <- op + <-errCh + + return ethash.hashrate.Rate1() + float64(total) } // APIs implements consensus.Engine, returning the user facing RPC APIs. func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { return []rpc.API{{ - Namespace: "ethash", + Namespace: "eth", Version: "1.0", Service: &API{ethash}, Public: true, diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index 31116da437dd..b5d8ae902210 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -24,6 +24,8 @@ import ( "sync" "testing" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" ) @@ -69,7 +71,7 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { const wiggle = 4 * epochLength r := rand.New(rand.NewSource(int64(workerIndex))) for epoch := 0; epoch < epochs; epoch++ { - block := int64(epoch)*epochLength - wiggle/2 + r.Int63n(wiggle) + block := int64(epoch) * epochLength - wiggle / 2 + r.Int63n(wiggle) if block < 0 { block = 0 } @@ -77,3 +79,47 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { e.VerifySeal(nil, head) } } + +func TestRemoteSealer(t *testing.T) { + ethash := NewTester() + api := &API{ethash} + if _, err := api.GetWork(); err != errNoMineWork { + t.Error("expected to return an error indicate there is no mining work") + } + + head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(head) + // Push new work + ethash.Seal(nil, block, nil) + + var ( + work [3]string + err error + ) + if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + t.Error("expected to return a mining work has same hash") + } + + if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + t.Error("expected to return false when submit a fake solution") + } +} + +func TestHashRate(t *testing.T) { + var ( + ethash = NewTester() + api = &API{ethash} + hashrate = []hexutil.Uint64{100, 200, 300} + expect uint64 + ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")} + ) + for i := 0; i < len(hashrate); i += 1 { + if res := api.SubmitHashRate(hashrate[i], ids[i]); !res { + t.Error("remote miner submit hashrate failed") + } + expect += uint64(hashrate[i]) + } + if tot := ethash.Hashrate(); tot != float64(expect) { + t.Error("expect total hashrate should be same") + } +} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 5d3cd8e94c47..4d34f78cf7cc 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -18,13 +18,14 @@ package ethash import ( crand "crypto/rand" + "errors" "math" "math/big" "math/rand" "runtime" "sync" + "time" - "errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/types" @@ -32,9 +33,9 @@ import ( ) var ( - errNoSealWork = errors.New("No work available yet, don't panic.") - errInvalidSealResult = errors.New("Invalid proof-of-work solution.") - errNonDefinedRemoteOp = errors.New("Non-defined remote mining operation.") + errNoMineWork = errors.New("No mining work available yet, don't panic.") + errInvalidSealResult = errors.New("Invalid or stale proof-of-work solution.") + errUndefinedRemoteOp = errors.New("Undefined remote mining operation.") ) // Seal implements consensus.Engine, attempting to find a nonce that satisfies @@ -72,15 +73,6 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop threads = 0 // Allows disabling local mining without extra logic around local/remote } - // Clear up result channel before new round mining start - for empty := false; !empty; { - select { - case <-ethash.resultCh: - default: - empty = true - } - } - // Push new work to remote sealer ethash.workCh <- block @@ -175,17 +167,18 @@ search: func (ethash *Ethash) remote(resultCh chan *types.Block) { var ( works = make(map[common.Hash]*types.Block) + hashrate = make(map[common.Hash]hashrate) currentWork *types.Block ) - // makeWork returns a work package for external sealer. The work package consists of 3 strings + // getWork returns a work package for external miner. The work package consists of 3 strings // result[0], 32 bytes hex encoded current block header pow-hash // result[1], 32 bytes hex encoded seed hash used for DAG // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty - makeWork := func() ([3]string, error) { + getWork := func() ([3]string, error) { var res [3]string if currentWork == nil { - return res, errNoSealWork + return res, errNoMineWork } res[0] = currentWork.HashNoNonce().Hex() res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex() @@ -199,10 +192,10 @@ func (ethash *Ethash) remote(resultCh chan *types.Block) { return res, nil } - // verifyWork verifies the submitted pow solution, returning + // submitWork verifies the submitted pow solution, returning // whether the solution was accepted or not (not can be both a bad pow as well as // any other error, like no work pending). - verifyWork := func(result sealResult) bool { + submitWork := func(result mineResult) bool { // Make sure the work submitted is present block := works[result.hash] if block == nil { @@ -220,42 +213,65 @@ func (ethash *Ethash) remote(resultCh chan *types.Block) { } // Solutions seems to be valid, return to the miner and notify acceptance - resultCh <- block.WithSeal(header) - delete(works, result.hash) - return true + select { + case resultCh <- block.WithSeal(header): + delete(works, result.hash) + return true + default: + log.Info("Work submitted is stale", "hash", result.hash) + return false + } } + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + running: for { select { case block := <-ethash.workCh: - if block.ParentHash() != currentWork.ParentHash() { + if currentWork != nil && block.ParentHash() != currentWork.ParentHash() { // Start new round mining, throw out all previous work. works = make(map[common.Hash]*types.Block) } // Update current work with new received block + // Note same work can be past twice, happens when changing CPU threads. currentWork = block case op := <-ethash.remoteOp: switch op.typ { - case getWork: - // Push current mining work to return channel - work, err := makeWork() + case opGetWork: + // Return current mining work to remote miner + work, err := getWork() if err != nil { op.errCh <- err } else { - op.workCh <- work close(op.errCh) + op.workCh <- work } - case submitWork: + case opSubmitWork: // Verify submitted PoW solution based on maintained mining blocks - if verifyWork(op.result) { + if submitWork(op.result) { close(op.errCh) } else { op.errCh <- errInvalidSealResult } + case opHashrate: + // This case is used by remote miner hashrate operation + if op.rateOp != nil { + op.rateOp(hashrate) + close(op.errCh) + } default: - op.errCh <- errNonDefinedRemoteOp + op.errCh <- errUndefinedRemoteOp + } + + case <-ticker.C: + // Clear stale submited hashrate + for id, rate := range hashrate { + if time.Since(rate.ping) > 10*time.Second { + delete(hashrate, id) + } } case <-ethash.exitCh: diff --git a/eth/api.go b/eth/api.go index 0b6da456f608..ef66e99cbf0e 100644 --- a/eth/api.go +++ b/eth/api.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" @@ -70,16 +69,12 @@ func (api *PublicEthereumAPI) Hashrate() hexutil.Uint64 { // PublicMinerAPI provides an API to control the miner. // It offers only methods that operate on data that pose no security risk when it is publicly accessible. type PublicMinerAPI struct { - e *Ethereum - agent *miner.RemoteAgent + e *Ethereum } // NewPublicMinerAPI create a new PublicMinerAPI instance. func NewPublicMinerAPI(e *Ethereum) *PublicMinerAPI { - agent := miner.NewRemoteAgent(e.BlockChain(), e.Engine()) - e.Miner().Register(agent) - - return &PublicMinerAPI{e, agent} + return &PublicMinerAPI{e} } // Mining returns an indication if this node is currently mining. @@ -87,37 +82,6 @@ func (api *PublicMinerAPI) Mining() bool { return api.e.IsMining() } -// SubmitWork can be used by external miner to submit their POW solution. It returns an indication if the work was -// accepted. Note, this is not an indication if the provided work was valid! -func (api *PublicMinerAPI) SubmitWork(nonce types.BlockNonce, solution, digest common.Hash) bool { - return api.agent.SubmitWork(nonce, digest, solution) -} - -// GetWork returns a work package for external miner. The work package consists of 3 strings -// result[0], 32 bytes hex encoded current block header pow-hash -// result[1], 32 bytes hex encoded seed hash used for DAG -// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty -func (api *PublicMinerAPI) GetWork() ([3]string, error) { - if !api.e.IsMining() { - if err := api.e.StartMining(false); err != nil { - return [3]string{}, err - } - } - work, err := api.agent.GetWork() - if err != nil { - return work, fmt.Errorf("mining not ready: %v", err) - } - return work, nil -} - -// SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined -// hash rate of all miners which submit work through this node. It accepts the miner hash rate and an identifier which -// must be unique between nodes. -func (api *PublicMinerAPI) SubmitHashrate(hashrate hexutil.Uint64, id common.Hash) bool { - api.agent.SubmitHashrate(id, uint64(hashrate)) - return true -} - // PrivateMinerAPI provides private RPC methods to control the miner. // These methods can be abused by external users and must be considered insecure for use by untrusted users. type PrivateMinerAPI struct { diff --git a/miner/agent.go b/miner/agent.go index e3cebbd2e2a9..bb062c3ecb3d 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -18,7 +18,6 @@ package miner import ( "sync" - "sync/atomic" "github.com/ethereum/go-ethereum/consensus" @@ -110,10 +109,3 @@ func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { self.returnCh <- nil } } - -func (self *CpuAgent) GetHashRate() int64 { - if pow, ok := self.engine.(consensus.PoW); ok { - return int64(pow.Hashrate()) - } - return 0 -} diff --git a/miner/miner.go b/miner/miner.go index d9256e9787c0..3a744e846ad9 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -143,14 +143,6 @@ func (self *Miner) HashRate() (tot int64) { if pow, ok := self.engine.(consensus.PoW); ok { tot += int64(pow.Hashrate()) } - // do we care this might race? is it worth we're rewriting some - // aspects of the worker/locking up agents so we can get an accurate - // hashrate? - for agent := range self.worker.agents { - if _, ok := agent.(*CpuAgent); !ok { - tot += agent.GetHashRate() - } - } return } diff --git a/miner/remote_agent.go b/miner/remote_agent.go deleted file mode 100644 index 287e7530c3e8..000000000000 --- a/miner/remote_agent.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package miner - -import ( - "errors" - "math/big" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/ethash" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" -) - -type hashrate struct { - ping time.Time - rate uint64 -} - -type RemoteAgent struct { - mu sync.Mutex - - quitCh chan struct{} - workCh chan *Work - returnCh chan<- *Result - - chain consensus.ChainReader - engine consensus.Engine - currentWork *Work - work map[common.Hash]*Work - - hashrateMu sync.RWMutex - hashrate map[common.Hash]hashrate - - running int32 // running indicates whether the agent is active. Call atomically -} - -func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent { - return &RemoteAgent{ - chain: chain, - engine: engine, - work: make(map[common.Hash]*Work), - hashrate: make(map[common.Hash]hashrate), - } -} - -func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) { - a.hashrateMu.Lock() - defer a.hashrateMu.Unlock() - - a.hashrate[id] = hashrate{time.Now(), rate} -} - -func (a *RemoteAgent) Work() chan<- *Work { - return a.workCh -} - -func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) { - a.returnCh = returnCh -} - -func (a *RemoteAgent) Start() { - if !atomic.CompareAndSwapInt32(&a.running, 0, 1) { - return - } - a.quitCh = make(chan struct{}) - a.workCh = make(chan *Work, 1) - go a.loop(a.workCh, a.quitCh) -} - -func (a *RemoteAgent) Stop() { - if !atomic.CompareAndSwapInt32(&a.running, 1, 0) { - return - } - close(a.quitCh) - close(a.workCh) -} - -// GetHashRate returns the accumulated hashrate of all identifier combined -func (a *RemoteAgent) GetHashRate() (tot int64) { - a.hashrateMu.RLock() - defer a.hashrateMu.RUnlock() - - // this could overflow - for _, hashrate := range a.hashrate { - tot += int64(hashrate.rate) - } - return -} - -func (a *RemoteAgent) GetWork() ([3]string, error) { - a.mu.Lock() - defer a.mu.Unlock() - - var res [3]string - - if a.currentWork != nil { - block := a.currentWork.Block - - res[0] = block.HashNoNonce().Hex() - seedHash := ethash.SeedHash(block.NumberU64()) - res[1] = common.BytesToHash(seedHash).Hex() - // Calculate the "target" to be returned to the external miner - n := big.NewInt(1) - n.Lsh(n, 255) - n.Div(n, block.Difficulty()) - n.Lsh(n, 1) - res[2] = common.BytesToHash(n.Bytes()).Hex() - - a.work[block.HashNoNonce()] = a.currentWork - return res, nil - } - return res, errors.New("No work available yet, don't panic.") -} - -// SubmitWork tries to inject a pow solution into the remote agent, returning -// whether the solution was accepted or not (not can be both a bad pow as well as -// any other error, like no work pending). -func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool { - a.mu.Lock() - defer a.mu.Unlock() - - // Make sure the work submitted is present - work := a.work[hash] - if work == nil { - log.Info("Work submitted but none pending", "hash", hash) - return false - } - // Make sure the Engine solutions is indeed valid - result := work.Block.Header() - result.Nonce = nonce - result.MixDigest = mixDigest - - if err := a.engine.VerifySeal(a.chain, result); err != nil { - log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) - return false - } - block := work.Block.WithSeal(result) - - // Solutions seems to be valid, return to the miner and notify acceptance - a.returnCh <- &Result{work, block} - delete(a.work, hash) - - return true -} - -// loop monitors mining events on the work and quit channels, updating the internal -// state of the remote miner until a termination is requested. -// -// Note, the reason the work and quit channels are passed as parameters is because -// RemoteAgent.Start() constantly recreates these channels, so the loop code cannot -// assume data stability in these member fields. -func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-quitCh: - return - case work := <-workCh: - a.mu.Lock() - a.currentWork = work - a.mu.Unlock() - case <-ticker.C: - // cleanup - a.mu.Lock() - for hash, work := range a.work { - if time.Since(work.createdAt) > 7*(12*time.Second) { - delete(a.work, hash) - } - } - a.mu.Unlock() - - a.hashrateMu.Lock() - for id, hashrate := range a.hashrate { - if time.Since(hashrate.ping) > 10*time.Second { - delete(a.hashrate, id) - } - } - a.hashrateMu.Unlock() - } - } -} diff --git a/miner/worker.go b/miner/worker.go index 34329f84990e..624be1d0e6e7 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -57,7 +57,6 @@ type Agent interface { SetReturnCh(chan<- *Result) Stop() Start() - GetHashRate() int64 } // Work is the workers current environment and holds From dd83e3d40a301764f293a0260292703523eb658b Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 31 Jan 2018 14:31:36 +0800 Subject: [PATCH 04/12] miner, ethash: push empty block to sealer without waiting execution --- cmd/geth/consolecmd_test.go | 2 +- consensus/clique/clique.go | 5 + consensus/consensus.go | 3 + consensus/ethash/algorithm_test.go | 1 + consensus/ethash/api.go | 105 ++++++++++-------- consensus/ethash/ethash.go | 171 ++++++++++++++++------------- consensus/ethash/ethash_test.go | 58 +++++++++- consensus/ethash/sealer.go | 134 ++++++++++++---------- eth/backend.go | 2 +- internal/web3ext/web3ext.go | 24 ++++ les/backend.go | 1 + miner/worker.go | 36 ++++-- 12 files changed, 339 insertions(+), 203 deletions(-) diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index 8d8b10f8fae7..34ba877020c0 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -31,7 +31,7 @@ import ( ) const ( - ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 shh:1.0 txpool:1.0 web3:1.0" + ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 ethash:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 shh:1.0 txpool:1.0 web3:1.0" httpAPIs = "eth:1.0 net:1.0 rpc:1.0 web3:1.0" ) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 8968f500f116..5963900c9723 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -672,6 +672,11 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { return new(big.Int).Set(diffNoTurn) } +// Close implements consensus.Engine, returning internal error and close the clique. +func (c *Clique) Close() error { + return nil +} + // APIs implements consensus.Engine, returning the user facing RPC API to allow // controlling the signer voting. func (c *Clique) APIs(chain consensus.ChainReader) []rpc.API { diff --git a/consensus/consensus.go b/consensus/consensus.go index 5774af1a78eb..ae0fefb490d1 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -96,6 +96,9 @@ type Engine interface { // APIs returns the RPC APIs this consensus engine provides. APIs(chain ChainReader) []rpc.API + + // Close closes the consensus engine. + Close() error } // PoW is a consensus engine based on proof-of-work. diff --git a/consensus/ethash/algorithm_test.go b/consensus/ethash/algorithm_test.go index f0c6465fdb6b..e7625f7c00ce 100644 --- a/consensus/ethash/algorithm_test.go +++ b/consensus/ethash/algorithm_test.go @@ -730,6 +730,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) { go func(idx int) { defer pend.Done() ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}) + defer ethash.Close() if err := ethash.VerifySeal(nil, block.Header()); err != nil { t.Errorf("proc %d: block verification failed: %v", idx, err) } diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index fb64b85d0718..2e5c2c8f2d03 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -13,78 +13,87 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . + package ethash import ( - "time" + "errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" ) -// API exposes ethash related methods for the RPC interface +var errEthashStopped = errors.New("ethash stopped") + +// API exposes ethash related methods for the RPC interface. type API struct { ethash *Ethash } -// GetWork returns a work package for external miner. The work package consists of 3 strings -// result[0], 32 bytes hex encoded current block header pow-hash -// result[1], 32 bytes hex encoded seed hash used for DAG -// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty -func (s *API) GetWork() ([3]string, error) { +// GetWork returns a work package for external miner. +// +// The work package consists of 3 strings: +// result[0] - 32 bytes hex encoded current block header pow-hash +// result[1] - 32 bytes hex encoded seed hash used for DAG +// result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty +func (api *API) GetWork() ([3]string, error) { var ( - workCh = make(chan [3]string) - errCh = make(chan error) + workCh = make(chan [3]string, 1) + errCh = make(chan error, 1) + err error ) - op := &remoteOp{ - typ: opGetWork, - workCh: workCh, - errCh: errCh, + + select { + case api.ethash.fetchWorkCh <- &sealWork{errCh: errCh, resCh: workCh}: + case <-api.ethash.exitCh: + return [3]string{}, errEthashStopped } - s.ethash.remoteOp <- op - if err := <-errCh; err == nil { + + if err = <-errCh; err == nil { return <-workCh, nil - } else { - return [3]string{}, err } + return [3]string{}, err } -// SubmitWork can be used by external miner to submit their POW solution. It returns an indication if the work was -// accepted. Note, this is not an indication if the provided work was valid! -func (s *API) SubmitWork(nonce types.BlockNonce, solution, digest common.Hash) bool { - var errCh = make(chan error) - op := &remoteOp{ - typ: opSubmitWork, - result: mineResult{ - nonce: nonce, - mixDigest: digest, - hash: solution, - }, - errCh: errCh, - } - s.ethash.remoteOp <- op - if err := <-errCh; err == nil { - return true - } else { +// SubmitWork can be used by external miner to submit their POW solution. +// It returns an indication if the work was accepted. +// Note either an invalid solution, a stale work a non-existent work will return false. +func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) bool { + var errCh = make(chan error, 1) + + select { + case api.ethash.submitWorkCh <- &mineResult{ + nonce: nonce, + mixDigest: digest, + hash: hash, + errCh: errCh, + }: + case <-api.ethash.exitCh: return false } + + err := <-errCh + return err == nil } -// SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined -// hash rate of all miners which submit work through this node. It accepts the miner hash rate and an identifier which -// must be unique between nodes. -func (s *API) SubmitHashRate(r hexutil.Uint64, id common.Hash) bool { - submit := func(rate map[common.Hash]hashrate) { - rate[id] = hashrate{rate: uint64(r), ping: time.Now()} - } - errCh := make(chan error) - op := &remoteOp{ - typ: opHashrate, - rateOp: submit, - errCh: errCh, +// SubmitHashrate can be used for remote miners to submit their hash rate. +// This enables the node to report the combined hash rate of all miners +// which submit work through this node. +// +// It accepts the miner hash rate and an identifier which must be unique +// between nodes. +func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool { + var doneCh = make(chan struct{}, 1) + + select { + case api.ethash.submitRateCh <- &hashrate{done: doneCh, rate: uint64(rate), id: id}: + case <-api.ethash.exitCh: + return false } - s.ethash.remoteOp <- op - <-errCh + + // Block until hash rate submitted successfully. + <-doneCh + return true } diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 2fccc5773463..424f7799178d 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -391,38 +391,28 @@ type Config struct { PowMode Mode } -// remoteOpType defines all the types of operations related to remote sealer. -type remoteOpType uint - -const ( - opGetWork remoteOpType = iota - opSubmitWork - opHashrate -) - // mineResult wraps the pow solution parameters for the specified block. type mineResult struct { nonce types.BlockNonce mixDigest common.Hash hash common.Hash + + errCh chan error } +// hashrate wraps the hash rate submitted by the remote sealer. type hashrate struct { id common.Hash ping time.Time rate uint64 -} -type hashrateOp func(map[common.Hash]hashrate) - -// remoteOp wraps a mining operation sent by remote miner. -type remoteOp struct { - typ remoteOpType - result mineResult - errCh chan error - workCh chan [3]string + done chan struct{} +} - rateOp hashrateOp +// sealWork wraps a seal work package for remote sealer. +type sealWork struct { + errCh chan error + resCh chan [3]string } // Ethash is a consensus engine based on proof-of-work implementing the ethash @@ -433,8 +423,6 @@ type Ethash struct { caches *lru // In memory caches to avoid regenerating too often datasets *lru // In memory datasets to avoid regenerating too often - exitCh chan struct{} // Notification channel to exiting backend threads - // Mining related fields rand *rand.Rand // Properly seeded random source for nonces threads int // Number of threads to mine on if mining @@ -442,15 +430,21 @@ type Ethash struct { hashrate metrics.Meter // Meter tracking the average hashrate // Remote sealer related fields - workCh chan *types.Block // Notification channel to push new work to remote sealer - resultCh chan *types.Block // Channel used by mining threads to return result - remoteOp chan *remoteOp // Channel used to receive all mining operations from api + workCh chan *types.Block // Notification channel to push new work to remote sealer + resultCh chan *types.Block // Channel used by mining threads to return result + fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work + submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result + fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer. + submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate + // The fields below are hooks for testing shared *Ethash // Shared PoW verifier to avoid cache regeneration fakeFail uint64 // Block number which fails PoW check even in fake mode fakeDelay time.Duration // Time delay to sleep for before returning from verify - lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields + lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields + closeOnce sync.Once // Ensures exit channel will not be closed twice. + exitCh chan chan error // Notification channel to exiting backend threads } // New creates a full sized ethash PoW scheme. @@ -466,18 +460,20 @@ func New(config Config) *Ethash { log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk) } ethash := &Ethash{ - config: config, - caches: newlru("cache", config.CachesInMem, newCache), - datasets: newlru("dataset", config.DatasetsInMem, newDataset), - update: make(chan struct{}), - hashrate: metrics.NewMeter(), - exitCh: make(chan struct{}), - workCh: make(chan *types.Block), - resultCh: make(chan *types.Block), - remoteOp: make(chan *remoteOp), - } - go ethash.remote(ethash.resultCh) - runtime.SetFinalizer(ethash, (*Ethash).close) + config: config, + caches: newlru("cache", config.CachesInMem, newCache), + datasets: newlru("dataset", config.DatasetsInMem, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeter(), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + fetchWorkCh: make(chan *sealWork), + submitWorkCh: make(chan *mineResult), + fetchRateCh: make(chan chan uint64), + submitRateCh: make(chan *hashrate), + exitCh: make(chan chan error), + } + go ethash.remote() return ethash } @@ -485,18 +481,20 @@ func New(config Config) *Ethash { // purposes. func NewTester() *Ethash { ethash := &Ethash{ - config: Config{PowMode: ModeTest}, - caches: newlru("cache", 1, newCache), - datasets: newlru("dataset", 1, newDataset), - update: make(chan struct{}), - hashrate: metrics.NewMeter(), - exitCh: make(chan struct{}), - workCh: make(chan *types.Block), - resultCh: make(chan *types.Block), - remoteOp: make(chan *remoteOp), - } - go ethash.remote(ethash.resultCh) - runtime.SetFinalizer(ethash, (*Ethash).close) + config: Config{PowMode: ModeTest}, + caches: newlru("cache", 1, newCache), + datasets: newlru("dataset", 1, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeter(), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + fetchWorkCh: make(chan *sealWork), + submitWorkCh: make(chan *mineResult), + fetchRateCh: make(chan chan uint64), + submitRateCh: make(chan *hashrate), + exitCh: make(chan chan error), + } + go ethash.remote() return ethash } @@ -508,6 +506,7 @@ func NewFaker() *Ethash { config: Config{ PowMode: ModeFake, }, + exitCh: make(chan chan error), } } @@ -520,6 +519,7 @@ func NewFakeFailer(fail uint64) *Ethash { PowMode: ModeFake, }, fakeFail: fail, + exitCh: make(chan chan error), } } @@ -532,6 +532,7 @@ func NewFakeDelayer(delay time.Duration) *Ethash { PowMode: ModeFake, }, fakeDelay: delay, + exitCh: make(chan chan error), } } @@ -542,19 +543,32 @@ func NewFullFaker() *Ethash { config: Config{ PowMode: ModeFullFake, }, + exitCh: make(chan chan error), } } // NewShared creates a full sized ethash PoW shared between all requesters running // in the same process. func NewShared() *Ethash { - return &Ethash{shared: sharedEthash} + return &Ethash{ + shared: sharedEthash, + exitCh: make(chan chan error), + } } -// close closes the exit channel to notify all backend threads exiting. -func (ethash *Ethash) close() { - runtime.SetFinalizer(ethash, nil) - close(ethash.exitCh) +// Close closes the exit channel to notify all backend threads exiting. +func (ethash *Ethash) Close() error { + var err error + ethash.closeOnce.Do(func() { + var errCh = make(chan error) + select { + case ethash.exitCh <- errCh: + err = <-errCh + close(ethash.exitCh) + default: + } + }) + return err } // cache tries to retrieve a verification cache for the specified block number @@ -632,37 +646,38 @@ func (ethash *Ethash) SetThreads(threads int) { // Note the returned hashrate includes local hashrate, but also includes the total // hashrate of all remote miner. func (ethash *Ethash) Hashrate() float64 { - var ( - total uint64 - errCh = make(chan error) - ) - // getHashrate gathers all remote miner's hashrate. - getHashrate := func(hashrate map[common.Hash]hashrate) { - for _, rate := range hashrate { - // this could overflow - total += rate.rate - } - return - } - op := &remoteOp{ - typ: opHashrate, - rateOp: getHashrate, - errCh: errCh, + var resCh = make(chan uint64, 1) + + select { + case ethash.fetchRateCh <- resCh: + case <-ethash.exitCh: + // Return local hashrate only if ethash is stopped. + return ethash.hashrate.Rate1() } - ethash.remoteOp <- op - <-errCh + // Gather total submitted hash rate of remote sealers. + total := <-resCh return ethash.hashrate.Rate1() + float64(total) } // APIs implements consensus.Engine, returning the user facing RPC APIs. func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { - return []rpc.API{{ - Namespace: "eth", - Version: "1.0", - Service: &API{ethash}, - Public: true, - }} + // In order to ensure backward compatibility, we exposes ethash RPC APIs + // to both eth and ethash namespaces. + return []rpc.API{ + { + Namespace: "eth", + Version: "1.0", + Service: &API{ethash}, + Public: true, + }, + { + Namespace: "ethash", + Version: "1.0", + Service: &API{ethash}, + Public: true, + }, + } } // SeedHash is the seed to use for generating a verification cache and the mining diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index b5d8ae902210..ccdd30fb0f99 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -23,6 +23,7 @@ import ( "os" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -34,6 +35,7 @@ func TestTestMode(t *testing.T) { head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} ethash := NewTester() + defer ethash.Close() block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil) if err != nil { t.Fatalf("failed to seal block: %v", err) @@ -54,6 +56,7 @@ func TestCacheFileEvict(t *testing.T) { } defer os.RemoveAll(tmpdir) e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}) + defer e.Close() workers := 8 epochs := 100 @@ -71,7 +74,7 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { const wiggle = 4 * epochLength r := rand.New(rand.NewSource(int64(workerIndex))) for epoch := 0; epoch < epochs; epoch++ { - block := int64(epoch) * epochLength - wiggle / 2 + r.Int63n(wiggle) + block := int64(epoch)*epochLength - wiggle/2 + r.Int63n(wiggle) if block < 0 { block = 0 } @@ -82,14 +85,16 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { func TestRemoteSealer(t *testing.T) { ethash := NewTester() + defer ethash.Close() api := &API{ethash} - if _, err := api.GetWork(); err != errNoMineWork { - t.Error("expected to return an error indicate there is no mining work") + if _, err := api.GetWork(); err != errNoMiningWork { + t.Error("expect to return an error indicate there is no mining work") } head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(head) - // Push new work + + // Push new work. ethash.Seal(nil, block, nil) var ( @@ -97,11 +102,29 @@ func TestRemoteSealer(t *testing.T) { err error ) if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { - t.Error("expected to return a mining work has same hash") + t.Error("expect to return a mining work has same hash") } if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { - t.Error("expected to return false when submit a fake solution") + t.Error("expect to return false when submit a fake solution") + } + + // Push new block with same block number to replace the original one. + head = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} + block = types.NewBlockWithHeader(head) + ethash.Seal(nil, block, nil) + + if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + t.Error("expect to return the latest pushed work") + } + + // Push block with higher block number. + newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} + newBlock := types.NewBlockWithHeader(newHead) + ethash.Seal(nil, newBlock, nil) + + if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + t.Error("expect to return false when submit a stale solution") } } @@ -113,6 +136,13 @@ func TestHashRate(t *testing.T) { expect uint64 ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")} ) + + defer ethash.Close() + + if tot := ethash.Hashrate(); tot != 0 { + t.Error("expect the result should be zero") + } + for i := 0; i < len(hashrate); i += 1 { if res := api.SubmitHashRate(hashrate[i], ids[i]); !res { t.Error("remote miner submit hashrate failed") @@ -123,3 +153,19 @@ func TestHashRate(t *testing.T) { t.Error("expect total hashrate should be same") } } + +func TestClosedRemoteSealer(t *testing.T) { + ethash := NewTester() + // Make sure exit channel has been listened + time.Sleep(1 * time.Second) + ethash.Close() + + api := &API{ethash} + if _, err := api.GetWork(); err != errEthashStopped { + t.Error("expect to return an error to indicate ethash is stopped") + } + + if res := api.SubmitHashRate(hexutil.Uint64(100), common.HexToHash("a")); res { + t.Error("expect to return false when submit hashrate to a stopped ethash") + } +} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 4d34f78cf7cc..a843fab672aa 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -33,9 +33,8 @@ import ( ) var ( - errNoMineWork = errors.New("No mining work available yet, don't panic.") - errInvalidSealResult = errors.New("Invalid or stale proof-of-work solution.") - errUndefinedRemoteOp = errors.New("Undefined remote mining operation.") + errNoMiningWork = errors.New("no mining work available yet, don't panic") + errInvalidSealResult = errors.New("invalid or stale proof-of-work solution") ) // Seal implements consensus.Engine, attempting to find a nonce that satisfies @@ -51,7 +50,6 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if ethash.shared != nil { return ethash.shared.Seal(chain, block, stop) } - // Create a runner and the multiple search threads it directs abort := make(chan struct{}) @@ -72,10 +70,10 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if threads < 0 { threads = 0 // Allows disabling local mining without extra logic around local/remote } - // Push new work to remote sealer - ethash.workCh <- block - + if ethash.workCh != nil { + ethash.workCh <- block + } var pend sync.WaitGroup for i := 0; i < threads; i++ { pend.Add(1) @@ -164,61 +162,72 @@ search: } // remote starts a standalone goroutine to handle remote mining related stuff. -func (ethash *Ethash) remote(resultCh chan *types.Block) { +func (ethash *Ethash) remote() { var ( works = make(map[common.Hash]*types.Block) - hashrate = make(map[common.Hash]hashrate) + rates = make(map[common.Hash]hashrate) currentWork *types.Block ) - // getWork returns a work package for external miner. The work package consists of 3 strings - // result[0], 32 bytes hex encoded current block header pow-hash - // result[1], 32 bytes hex encoded seed hash used for DAG - // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty + // getWork returns a work package for external miner. + // + // The work package consists of 3 strings: + // result[0], 32 bytes hex encoded current block header pow-hash + // result[1], 32 bytes hex encoded seed hash used for DAG + // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty getWork := func() ([3]string, error) { var res [3]string if currentWork == nil { - return res, errNoMineWork + return res, errNoMiningWork } res[0] = currentWork.HashNoNonce().Hex() res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex() - // Calculate the "target" to be returned to the external sealer + + // Calculate the "target" to be returned to the external sealer. n := big.NewInt(1) n.Lsh(n, 255) n.Div(n, currentWork.Difficulty()) n.Lsh(n, 1) res[2] = common.BytesToHash(n.Bytes()).Hex() + + // Trace the seal work fetched by remote sealer. works[currentWork.HashNoNonce()] = currentWork return res, nil } // submitWork verifies the submitted pow solution, returning // whether the solution was accepted or not (not can be both a bad pow as well as - // any other error, like no work pending). - submitWork := func(result mineResult) bool { + // any other error, like no pending work or stale mining result). + submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, hash common.Hash) bool { // Make sure the work submitted is present - block := works[result.hash] + block := works[hash] if block == nil { - log.Info("Work submitted but none pending", "hash", result.hash) + log.Info("Work submitted but none pending", "hash", hash) return false } - // Make sure the Engine solutions is indeed valid - header := block.Header() - header.Nonce = result.nonce - header.MixDigest = result.mixDigest + // Verify the correctness of submitted result. + header := block.Header() + header.Nonce = nonce + header.MixDigest = mixDigest if err := ethash.VerifySeal(nil, header); err != nil { - log.Warn("Invalid proof-of-work submitted", "hash", result.hash, "err", err) + log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) return false } - // Solutions seems to be valid, return to the miner and notify acceptance + // Make sure the result channel is created. + if ethash.resultCh == nil { + log.Warn("Ethash result channel is empty, submitted mining result is rejected") + return false + } + + // Solutions seems to be valid, return to the miner and notify acceptance. select { - case resultCh <- block.WithSeal(header): - delete(works, result.hash) + case ethash.resultCh <- block.WithSeal(header): + delete(works, hash) return true default: - log.Info("Work submitted is stale", "hash", result.hash) + log.Info("Work submitted is stale", "hash", hash) return false } } @@ -234,48 +243,53 @@ running: // Start new round mining, throw out all previous work. works = make(map[common.Hash]*types.Block) } - // Update current work with new received block + // Update current work with new received block. // Note same work can be past twice, happens when changing CPU threads. currentWork = block - case op := <-ethash.remoteOp: - switch op.typ { - case opGetWork: - // Return current mining work to remote miner - work, err := getWork() - if err != nil { - op.errCh <- err - } else { - close(op.errCh) - op.workCh <- work - } - case opSubmitWork: - // Verify submitted PoW solution based on maintained mining blocks - if submitWork(op.result) { - close(op.errCh) - } else { - op.errCh <- errInvalidSealResult - } - case opHashrate: - // This case is used by remote miner hashrate operation - if op.rateOp != nil { - op.rateOp(hashrate) - close(op.errCh) - } - default: - op.errCh <- errUndefinedRemoteOp + case work := <-ethash.fetchWorkCh: + // Return current mining work to remote miner. + miningWork, err := getWork() + if err != nil { + work.errCh <- err + } else { + close(work.errCh) + work.resCh <- miningWork + } + + case result := <-ethash.submitWorkCh: + // Verify submitted PoW solution based on maintained mining blocks. + if submitWork(result.nonce, result.mixDigest, result.hash) { + close(result.errCh) + } else { + result.errCh <- errInvalidSealResult + } + + case result := <-ethash.submitRateCh: + // Trace remote sealer's hash rate by submitted value. + rates[result.id] = hashrate{rate: result.rate, ping: time.Now()} + close(result.done) + + case req := <-ethash.fetchRateCh: + // Gather all hash rate submitted by remote sealer. + var total uint64 + for _, rate := range rates { + // this could overflow + total += rate.rate } + req <- total case <-ticker.C: - // Clear stale submited hashrate - for id, rate := range hashrate { + // Clear stale submitted hash rate. + for id, rate := range rates { if time.Since(rate.ping) > 10*time.Second { - delete(hashrate, id) + delete(rates, id) } } - case <-ethash.exitCh: - // Exit remote loop if ethash object is cleared by GC + case errCh := <-ethash.exitCh: + // Exit remote loop if ethash is closed and return relevant error. + errCh <- nil break running } } diff --git a/eth/backend.go b/eth/backend.go index a18abdfb564a..62a85f3fcbc1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -411,6 +411,7 @@ func (s *Ethereum) Start(srvr *p2p.Server) error { func (s *Ethereum) Stop() error { s.bloomIndexer.Close() s.blockchain.Stop() + s.engine.Close() s.protocolManager.Stop() if s.lesServer != nil { s.lesServer.Stop() @@ -421,6 +422,5 @@ func (s *Ethereum) Stop() error { s.chainDb.Close() close(s.shutdownChan) - return nil } diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 89ebceec7c00..3364f41eb7bb 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -21,6 +21,7 @@ var Modules = map[string]string{ "admin": Admin_JS, "chequebook": Chequebook_JS, "clique": Clique_JS, + "ethash": Ethash_JS, "debug": Debug_JS, "eth": Eth_JS, "miner": Miner_JS, @@ -109,6 +110,29 @@ web3._extend({ }); ` +const Ethash_JS = ` +web3._extend({ + property: 'ethash', + methods: [ + new web3._extend.Method({ + name: 'getWork', + call: 'ethash_getWork', + params: 0 + }), + new web3._extend.Method({ + name: 'submitWork', + call: 'ethash_submitWork', + params: 3, + }), + new web3._extend.Method({ + name: 'submitHashRate', + call: 'ethash_submitHashRate', + params: 2, + }), + ] +}); +` + const Admin_JS = ` web3._extend({ property: 'admin', diff --git a/les/backend.go b/les/backend.go index 35f67f29f867..952d92cc2a22 100644 --- a/les/backend.go +++ b/les/backend.go @@ -248,6 +248,7 @@ func (s *LightEthereum) Stop() error { s.blockchain.Stop() s.protocolManager.Stop() s.txPool.Stop() + s.engine.Close() s.eventMux.Stop() diff --git a/miner/worker.go b/miner/worker.go index 624be1d0e6e7..210d22d7e9ac 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -447,13 +447,6 @@ func (self *worker) commitNewWork() { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { misc.ApplyDAOHardFork(work.state) } - pending, err := self.eth.TxPool().Pending() - if err != nil { - log.Error("Failed to fetch pending transactions", "err", err) - return - } - txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - work.commitTransactions(self.mux, txs, self.chain, self.coinbase) // compute uncles for the new block. var ( @@ -477,16 +470,41 @@ func (self *worker) commitNewWork() { for _, hash := range badUncles { delete(self.possibleUncles, hash) } - // Create the new block to seal with the consensus engine + + // Create an empty block based on temporary copied state for sealing in advance without waiting block + // execution finished. + if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil { + log.Error("Failed to finalize block for temporary sealing", "err", err) + } else { + // Push empty work in advance without applying pending transaction. + // The reason is transactions execution can cost a lot and sealer need to + // take advantage of this part time. + if atomic.LoadInt32(&self.mining) == 1 { + log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles)) + } + self.push(work) + } + + // Fill the block with all available pending transactions. + pending, err := self.eth.TxPool().Pending() + if err != nil { + log.Error("Failed to fetch pending transactions", "err", err) + return + } + txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) + work.commitTransactions(self.mux, txs, self.chain, self.coinbase) + + // Create the full block to seal with the consensus engine if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } // We only care about logging if we're actually mining. if atomic.LoadInt32(&self.mining) == 1 { - log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) } + // Push full work to sealer, which will replace the empty work sent before automatically. self.push(work) self.updateSnapshot() } From 31bcd52d4b63dbb54465604526a88d6abbb85f44 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 26 Apr 2018 16:56:33 +0800 Subject: [PATCH 05/12] consensus, internal: add getHashrate API for ethash --- consensus/ethash/api.go | 5 +++++ consensus/ethash/sealer.go | 2 +- internal/web3ext/web3ext.go | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index 2e5c2c8f2d03..1d648b82e27d 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -97,3 +97,8 @@ func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool { return true } + +// GetHashrate returns the current hashrate for local CPU miner and remote miner. +func (api *API) GetHashrate() uint64 { + return uint64(api.ethash.Hashrate()) +} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index a843fab672aa..539e783a6c64 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -33,7 +33,7 @@ import ( ) var ( - errNoMiningWork = errors.New("no mining work available yet, don't panic") + errNoMiningWork = errors.New("no mining work available yet") errInvalidSealResult = errors.New("invalid or stale proof-of-work solution") ) diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 3364f41eb7bb..c2e0cd3f5d1e 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -119,6 +119,11 @@ web3._extend({ call: 'ethash_getWork', params: 0 }), + new web3._extend.Method({ + name: 'getHashrate', + call: 'ethash_getHashrate', + params: 0 + }), new web3._extend.Method({ name: 'submitWork', call: 'ethash_submitWork', From 32555698dd3e75b02197732ac9e51258b7fa2001 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 2 May 2018 19:26:32 +0800 Subject: [PATCH 06/12] consensus: add three method for consensus interface --- consensus/clique/clique.go | 25 +++++++++++++++++++++++-- consensus/consensus.go | 9 +++++++++ consensus/errors.go | 3 +++ consensus/ethash/api.go | 6 ++++++ consensus/ethash/consensus.go | 16 ++++++++++++++++ consensus/ethash/ethash.go | 16 ++++++++++++++++ consensus/ethash/sealer.go | 4 ++++ 7 files changed, 77 insertions(+), 2 deletions(-) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 5963900c9723..4f637c3aa5c1 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -23,6 +23,7 @@ import ( "math/big" "math/rand" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/accounts" @@ -194,8 +195,9 @@ func ecrecover(header *types.Header, sigcache *lru.ARCCache) (common.Address, er // Clique is the proof-of-authority consensus engine proposed to support the // Ethereum testnet following the Ropsten attacks. type Clique struct { - config *params.CliqueConfig // Consensus engine configuration parameters - db ethdb.Database // Database to store and retrieve snapshot checkpoints + config *params.CliqueConfig // Consensus engine configuration parameters + db ethdb.Database // Database to store and retrieve snapshot checkpoints + running int32 // Indicator whether clique engine is running or not. recents *lru.ARCCache // Snapshots for recent block to speed up reorgs signatures *lru.ARCCache // Signatures of recent blocks to speed up mining @@ -601,6 +603,10 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch if c.config.Period == 0 && len(block.Transactions()) == 0 { return nil, errWaitTransactions } + // Make sure clique engine is started. + if !c.IsRunning() { + return nil, consensus.ErrEngineNotStart + } // Don't hold the signer fields for the entire sealing procedure c.lock.RLock() signer, signFn := c.signer, c.signFn @@ -672,6 +678,21 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { return new(big.Int).Set(diffNoTurn) } +// Start implements consensus.Engine, starting the clique consensus engine. +func (c *Clique) Start() { + atomic.StoreInt32(&c.running, 1) +} + +// Stop implements consensus.Engine, stopping the clique consensus engine. +func (c *Clique) Stop() { + atomic.StoreInt32(&c.running, 0) +} + +// IsRunning implements consensus.Engine, returning an indication if the clique engine is currently mining. +func (c *Clique) IsRunning() bool { + return atomic.LoadInt32(&c.running) > 0 +} + // Close implements consensus.Engine, returning internal error and close the clique. func (c *Clique) Close() error { return nil diff --git a/consensus/consensus.go b/consensus/consensus.go index ae0fefb490d1..3562388d516e 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -97,6 +97,15 @@ type Engine interface { // APIs returns the RPC APIs this consensus engine provides. APIs(chain ChainReader) []rpc.API + // Start starts the consensus engine. + Start() + + // Stop stops the consensus engine. + Stop() + + // IsRunning returns an indication whether consensus engine is running or not. + IsRunning() bool + // Close closes the consensus engine. Close() error } diff --git a/consensus/errors.go b/consensus/errors.go index a005c5f63de8..c7d7073e1932 100644 --- a/consensus/errors.go +++ b/consensus/errors.go @@ -34,4 +34,7 @@ var ( // ErrInvalidNumber is returned if a block's number doesn't equal it's parent's // plus one. ErrInvalidNumber = errors.New("invalid block number") + + // ErrEngineNotStart is returned if the consensus engine is not started. + ErrEngineNotStart = errors.New("consensus engine is not started") ) diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index 1d648b82e27d..6a1e01550687 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -44,6 +44,12 @@ func (api *API) GetWork() ([3]string, error) { err error ) + // Trigger ethash to start in remote mining mode(local/cpu mining is disabled) + // if ethash is not running. + if !api.ethash.IsRunning() { + api.ethash.StartMining(new(int)) + } + select { case api.ethash.fetchWorkCh <- &sealWork{errCh: errCh, resCh: workCh}: case <-api.ethash.exitCh: diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index eb0f73d98bb3..094085aa8c66 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -22,6 +22,7 @@ import ( "fmt" "math/big" "runtime" + "sync/atomic" "time" mapset "github.com/deckarep/golang-set" @@ -552,3 +553,18 @@ func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header } state.AddBalance(header.Coinbase, reward) } + +// Start implements consensus.Engine, starting the ethash engine. +func (ethash *Ethash) Start() { + atomic.StoreInt32(ðash.running, 1) +} + +// Stop implements consensus.Engine, stopping the ethash engine. +func (ethash *Ethash) Stop() { + atomic.StoreInt32(ðash.running, 0) +} + +// IsRunning implements consensus.Engine, returning an indication if the ethash engine is currently mining. +func (ethash *Ethash) IsRunning() bool { + return atomic.LoadInt32(ðash.running) > 0 +} diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 424f7799178d..7e9e661f78c2 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -428,6 +428,7 @@ type Ethash struct { threads int // Number of threads to mine on if mining update chan struct{} // Notification channel to update mining parameters hashrate metrics.Meter // Meter tracking the average hashrate + running int32 // Indicator whether ethash engine is running or not. // Remote sealer related fields workCh chan *types.Block // Notification channel to push new work to remote sealer @@ -486,6 +487,7 @@ func NewTester() *Ethash { datasets: newlru("dataset", 1, newDataset), update: make(chan struct{}), hashrate: metrics.NewMeter(), + running: 1, // enable local mining by default workCh: make(chan *types.Block), resultCh: make(chan *types.Block), fetchWorkCh: make(chan *sealWork), @@ -680,6 +682,20 @@ func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { } } +// StartMining starts the ethash engine with the given number of threads. +// If threads is nil the number of workers started is equal to the number of logical CPUs +// that are usable by this process. If threads is 0, than local/cpu mining will be disabled. +// If mining is already running, this method adjust the number of threads allowed to use. +func (ethash *Ethash) StartMining(threads *int) { + if threads == nil { + threads = new(int) + } else if *threads == 0 { + *threads = -1 // Disable local/cpu mining. + } + ethash.Start() + ethash.SetThreads(*threads) +} + // SeedHash is the seed to use for generating a verification cache and the mining // dataset. func SeedHash(block uint64) []byte { diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 539e783a6c64..4c164b779bdd 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -50,6 +50,10 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if ethash.shared != nil { return ethash.shared.Seal(chain, block, stop) } + // Make sure ethash engine is started. + if !ethash.IsRunning() { + return nil, consensus.ErrEngineNotStart + } // Create a runner and the multiple search threads it directs abort := make(chan struct{}) From ecafffbb3588d9b29e91482ebe98ea6b99ed77cf Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 2 May 2018 20:39:53 +0800 Subject: [PATCH 07/12] miner: expose consensus engine running status to miner --- miner/agent.go | 33 +++++++++++++++++++-------------- miner/miner.go | 17 +++++------------ miner/worker.go | 14 ++++++-------- 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/miner/agent.go b/miner/agent.go index bb062c3ecb3d..4f1ea3f65c00 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -18,10 +18,10 @@ package miner import ( "sync" - "sync/atomic" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/log" + "sync/atomic" ) type CpuAgent struct { @@ -35,27 +35,39 @@ type CpuAgent struct { chain consensus.ChainReader engine consensus.Engine - isMining int32 // isMining indicates whether the agent is currently mining + started int32 // started indicates whether the agent is currently started } func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent { - miner := &CpuAgent{ + agent := &CpuAgent{ chain: chain, engine: engine, stop: make(chan struct{}, 1), workCh: make(chan *Work, 1), } - return miner + return agent } func (self *CpuAgent) Work() chan<- *Work { return self.workCh } func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch } +func (self *CpuAgent) Start() { + if !atomic.CompareAndSwapInt32(&self.started, 0, 1) { + return // agent already started + } + go self.update() +} + func (self *CpuAgent) Stop() { - if !atomic.CompareAndSwapInt32(&self.isMining, 1, 0) { + if !atomic.CompareAndSwapInt32(&self.started, 1, 0) { return // agent already stopped } - self.stop <- struct{}{} + // Close the pending routines. + select { + case self.stop <- struct{}{}: + default: + } + done: // Empty work channel for { @@ -67,13 +79,6 @@ done: } } -func (self *CpuAgent) Start() { - if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) { - return // agent already started - } - go self.update() -} - func (self *CpuAgent) update() { out: for { @@ -103,7 +108,7 @@ func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash()) self.returnCh <- &Result{work, result} } else { - if err != nil { + if err != nil && err != consensus.ErrEngineNotStart { log.Warn("Block sealing failed", "err", err) } self.returnCh <- nil diff --git a/miner/miner.go b/miner/miner.go index 3a744e846ad9..9cfc8fd64fa1 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -44,12 +44,9 @@ type Backend interface { // Miner creates blocks and searches for proof-of-work values. type Miner struct { - mux *event.TypeMux - - worker *worker - + mux *event.TypeMux + worker *worker coinbase common.Address - mining int32 eth Backend engine consensus.Engine @@ -111,23 +108,19 @@ func (self *Miner) Start(coinbase common.Address) { log.Info("Network syncing, will start miner afterwards") return } - atomic.StoreInt32(&self.mining, 1) - log.Info("Starting mining operation") + self.engine.Start() self.worker.start() self.worker.commitNewWork() } func (self *Miner) Stop() { + self.engine.Stop() self.worker.stop() - atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.shouldStart, 0) } func (self *Miner) Register(agent Agent) { - if self.Mining() { - agent.Start() - } self.worker.register(agent) } @@ -136,7 +129,7 @@ func (self *Miner) Unregister(agent Agent) { } func (self *Miner) Mining() bool { - return atomic.LoadInt32(&self.mining) > 0 + return self.engine.IsRunning() } func (self *Miner) HashRate() (tot int64) { diff --git a/miner/worker.go b/miner/worker.go index 210d22d7e9ac..3d2a99a5be2a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -55,8 +55,8 @@ const ( type Agent interface { Work() chan<- *Work SetReturnCh(chan<- *Result) - Stop() Start() + Stop() } // Work is the workers current environment and holds @@ -231,6 +231,7 @@ func (self *worker) register(agent Agent) { defer self.mu.Unlock() self.agents[agent] = struct{}{} agent.SetReturnCh(self.recv) + agent.Start() } func (self *worker) unregister(agent Agent) { @@ -342,9 +343,6 @@ func (self *worker) wait() { // push sends a new work task to currently live miner agents. func (self *worker) push(work *Work) { - if atomic.LoadInt32(&self.mining) != 1 { - return - } for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) if ch := agent.Work(); ch != nil { @@ -415,8 +413,8 @@ func (self *worker) commitNewWork() { Extra: self.extra, Time: big.NewInt(tstamp), } - // Only set the coinbase if we are mining (avoid spurious block rewards) - if atomic.LoadInt32(&self.mining) == 1 { + // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) + if self.engine.IsRunning() { header.Coinbase = self.coinbase } if err := self.engine.Prepare(self.chain, header); err != nil { @@ -479,7 +477,7 @@ func (self *worker) commitNewWork() { // Push empty work in advance without applying pending transaction. // The reason is transactions execution can cost a lot and sealer need to // take advantage of this part time. - if atomic.LoadInt32(&self.mining) == 1 { + if self.engine.IsRunning() { log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles)) } self.push(work) @@ -500,7 +498,7 @@ func (self *worker) commitNewWork() { return } // We only care about logging if we're actually mining. - if atomic.LoadInt32(&self.mining) == 1 { + if self.engine.IsRunning() { log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) } From 41a32a9475f51905a1d467253d1abc9fb33a42dd Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 2 May 2018 21:38:09 +0800 Subject: [PATCH 08/12] eth, miner: specify etherbase when miner created --- consensus/clique/clique.go | 2 ++ consensus/ethash/consensus.go | 3 +++ core/tx_pool.go | 3 +++ eth/api.go | 22 ++++++++++------------ eth/backend.go | 10 ++++++++-- miner/miner.go | 28 ++++++++++++++++++---------- miner/worker.go | 21 +++++++++++++-------- 7 files changed, 57 insertions(+), 32 deletions(-) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 4f637c3aa5c1..3b89e71560e6 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -680,11 +680,13 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { // Start implements consensus.Engine, starting the clique consensus engine. func (c *Clique) Start() { + log.Info("Start clique consensus engine") atomic.StoreInt32(&c.running, 1) } // Stop implements consensus.Engine, stopping the clique consensus engine. func (c *Clique) Stop() { + log.Info("Stop clique consensus engine") atomic.StoreInt32(&c.running, 0) } diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 094085aa8c66..2cb72676e948 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) @@ -556,11 +557,13 @@ func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header // Start implements consensus.Engine, starting the ethash engine. func (ethash *Ethash) Start() { + log.Info("Start ethash consensus engine") atomic.StoreInt32(ðash.running, 1) } // Stop implements consensus.Engine, stopping the ethash engine. func (ethash *Ethash) Stop() { + log.Info("Stop ethash consensus engine") atomic.StoreInt32(ðash.running, 0) } diff --git a/core/tx_pool.go b/core/tx_pool.go index 7007f85ddfd1..0fb5ca52b2ca 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -465,6 +465,9 @@ func (pool *TxPool) SetGasPrice(price *big.Int) { pool.mu.Lock() defer pool.mu.Unlock() + if pool.gasPrice == price { + return + } pool.gasPrice = price for _, tx := range pool.priced.Cap(price, pool.locals) { pool.removeTx(tx.Hash(), false) diff --git a/eth/api.go b/eth/api.go index ef66e99cbf0e..a6b43198726c 100644 --- a/eth/api.go +++ b/eth/api.go @@ -96,7 +96,8 @@ func NewPrivateMinerAPI(e *Ethereum) *PrivateMinerAPI { // Start the miner with the given number of threads. If threads is nil the number // of workers started is equal to the number of logical CPUs that are usable by // this process. If mining is already running, this method adjust the number of -// threads allowed to use. +// threads allowed to use and updates the minimum price required by the transaction +// pool. func (api *PrivateMinerAPI) Start(threads *int) error { // Set the number of threads if the seal engine supports it if threads == nil { @@ -111,17 +112,14 @@ func (api *PrivateMinerAPI) Start(threads *int) error { log.Info("Updated mining threads", "threads", *threads) th.SetThreads(*threads) } + // Propagate the initial price point to the transaction pool + api.e.lock.RLock() + price := api.e.gasPrice + api.e.lock.RUnlock() + api.e.txPool.SetGasPrice(price) + // Start the miner and return - if !api.e.IsMining() { - // Propagate the initial price point to the transaction pool - api.e.lock.RLock() - price := api.e.gasPrice - api.e.lock.RUnlock() - - api.e.txPool.SetGasPrice(price) - return api.e.StartMining(true) - } - return nil + return api.e.StartMining(true) } // Stop the miner @@ -162,7 +160,7 @@ func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool { // GetHashrate returns the current hashrate of the miner. func (api *PrivateMinerAPI) GetHashrate() uint64 { - return uint64(api.e.miner.HashRate()) + return api.e.miner.HashRate() } // PrivateAdminAPI is the collection of Ethereum full node-related APIs diff --git a/eth/backend.go b/eth/backend.go index 62a85f3fcbc1..a41adebcfd9a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -166,7 +166,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } - eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) + + // Specify etherbase explicitly. + var etherbase common.Address + if addr, err := eth.Etherbase(); err == nil { + etherbase = addr + } + eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, etherbase) eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.APIBackend = &EthAPIBackend{eth, nil} @@ -338,7 +344,7 @@ func (s *Ethereum) StartMining(local bool) error { log.Error("Cannot start mining without etherbase", "err", err) return fmt.Errorf("etherbase missing: %v", err) } - if clique, ok := s.engine.(*clique.Clique); ok { + if clique, ok := s.engine.(*clique.Clique); ok && !clique.IsRunning() { wallet, err := s.accountManager.Find(accounts.Account{Address: eb}) if wallet == nil || err != nil { log.Error("Etherbase account unavailable locally", "err", err) diff --git a/miner/miner.go b/miner/miner.go index 9cfc8fd64fa1..0da233dd74af 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -54,12 +54,13 @@ type Miner struct { shouldStart int32 // should start indicates whether we should start after sync } -func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner { +func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, coinbase common.Address) *Miner { miner := &Miner{ eth: eth, mux: mux, engine: engine, - worker: newWorker(config, engine, common.Address{}, eth, mux), + coinbase: coinbase, + worker: newWorker(config, engine, coinbase, eth, mux), canStart: 1, } miner.Register(NewCpuAgent(eth.BlockChain(), engine)) @@ -108,15 +109,22 @@ func (self *Miner) Start(coinbase common.Address) { log.Info("Network syncing, will start miner afterwards") return } - log.Info("Starting mining operation") - self.engine.Start() - self.worker.start() - self.worker.commitNewWork() + if !self.engine.IsRunning() { + self.engine.Start() + } + if !self.worker.isRunning() { + self.worker.start() + self.worker.commitNewWork() + } } func (self *Miner) Stop() { - self.engine.Stop() - self.worker.stop() + if self.engine.IsRunning() { + self.engine.Stop() + } + if self.worker.isRunning() { + self.worker.stop() + } atomic.StoreInt32(&self.shouldStart, 0) } @@ -132,9 +140,9 @@ func (self *Miner) Mining() bool { return self.engine.IsRunning() } -func (self *Miner) HashRate() (tot int64) { +func (self *Miner) HashRate() (tot uint64) { if pow, ok := self.engine.(consensus.PoW); ok { - tot += int64(pow.Hashrate()) + tot += uint64(pow.Hashrate()) } return } diff --git a/miner/worker.go b/miner/worker.go index 3d2a99a5be2a..1dcf2c9b1e4f 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -127,8 +127,8 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - mining int32 - atWork int32 + running int32 + atWork int32 } func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { @@ -175,7 +175,7 @@ func (self *worker) setExtra(extra []byte) { } func (self *worker) pending() (*types.Block, *state.StateDB) { - if atomic.LoadInt32(&self.mining) == 0 { + if atomic.LoadInt32(&self.running) == 0 { // return a snapshot to avoid contention on currentMu mutex self.snapshotMu.RLock() defer self.snapshotMu.RUnlock() @@ -188,7 +188,7 @@ func (self *worker) pending() (*types.Block, *state.StateDB) { } func (self *worker) pendingBlock() *types.Block { - if atomic.LoadInt32(&self.mining) == 0 { + if atomic.LoadInt32(&self.running) == 0 { // return a snapshot to avoid contention on currentMu mutex self.snapshotMu.RLock() defer self.snapshotMu.RUnlock() @@ -204,7 +204,7 @@ func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() - atomic.StoreInt32(&self.mining, 1) + atomic.StoreInt32(&self.running, 1) // spin up agents for agent := range self.agents { @@ -217,15 +217,20 @@ func (self *worker) stop() { self.mu.Lock() defer self.mu.Unlock() - if atomic.LoadInt32(&self.mining) == 1 { + if atomic.LoadInt32(&self.running) == 1 { for agent := range self.agents { agent.Stop() } } - atomic.StoreInt32(&self.mining, 0) + atomic.StoreInt32(&self.running, 0) atomic.StoreInt32(&self.atWork, 0) } +// isRunning returns an indicator whether worker is currently running or not. +func (self *worker) isRunning() bool { + return atomic.LoadInt32(&self.running) > 0 +} + func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() @@ -266,7 +271,7 @@ func (self *worker) update() { // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if atomic.LoadInt32(&self.mining) == 0 { + if atomic.LoadInt32(&self.running) == 0 { self.currentMu.Lock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { From a218f37beca3d66ce61b98ff7207a1d0cc10684c Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 3 May 2018 14:16:34 +0800 Subject: [PATCH 09/12] miner: commit new work when consensus engine is started --- miner/agent.go | 2 +- miner/miner.go | 1 - miner/worker.go | 16 ++++++++++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/miner/agent.go b/miner/agent.go index 4f1ea3f65c00..beaf30556af2 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -18,10 +18,10 @@ package miner import ( "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/log" - "sync/atomic" ) type CpuAgent struct { diff --git a/miner/miner.go b/miner/miner.go index 0da233dd74af..e7add025deee 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -114,7 +114,6 @@ func (self *Miner) Start(coinbase common.Address) { } if !self.worker.isRunning() { self.worker.start() - self.worker.commitNewWork() } } diff --git a/miner/worker.go b/miner/worker.go index 1dcf2c9b1e4f..ba39cb83ddc2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -251,6 +251,11 @@ func (self *worker) update() { defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + var started bool // Indication whether consensus engine is started + for { // A real event arrived, process interesting content select { @@ -289,6 +294,17 @@ func (self *worker) update() { } } + // Commit new work when consensus engine is started. + case <-ticker.C: + if self.engine.IsRunning() { + if !started { + self.commitNewWork() + started = true + } + } else { + started = false + } + // System stopped case <-self.txsSub.Err(): return From 64abec019648eaa01d7f6165e52a3eeba1b725f7 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 20 Jun 2018 16:04:24 +0800 Subject: [PATCH 10/12] consensus, miner: fix some logics --- consensus/clique/clique.go | 10 +++-- consensus/ethash/api.go | 19 +++++++++- consensus/ethash/consensus.go | 10 +++-- consensus/ethash/ethash.go | 26 ++++++------- miner/agent.go | 5 +-- miner/miner.go | 20 +++------- miner/worker.go | 70 +++++++++++++++-------------------- 7 files changed, 77 insertions(+), 83 deletions(-) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 3b89e71560e6..6a8d8ce619dc 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -680,14 +680,16 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { // Start implements consensus.Engine, starting the clique consensus engine. func (c *Clique) Start() { - log.Info("Start clique consensus engine") - atomic.StoreInt32(&c.running, 1) + if atomic.CompareAndSwapInt32(&c.running, 0, 1) { + log.Info("Start clique consensus engine") + } } // Stop implements consensus.Engine, stopping the clique consensus engine. func (c *Clique) Stop() { - log.Info("Stop clique consensus engine") - atomic.StoreInt32(&c.running, 0) + if atomic.CompareAndSwapInt32(&c.running, 1, 0) { + log.Info("Stop clique consensus engine") + } } // IsRunning implements consensus.Engine, returning an indication if the clique engine is currently mining. diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index 6a1e01550687..eae6c169a8b9 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -24,11 +24,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var errEthashStopped = errors.New("ethash stopped") +var ( + errEthashStopped = errors.New("ethash stopped") + errAPINotSupported = errors.New("the current ethash running mode does not support this API") +) // API exposes ethash related methods for the RPC interface. type API struct { - ethash *Ethash + ethash *Ethash // Make sure the mode of ethash is normal. } // GetWork returns a work package for external miner. @@ -38,6 +41,10 @@ type API struct { // result[1] - 32 bytes hex encoded seed hash used for DAG // result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty func (api *API) GetWork() ([3]string, error) { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return [3]string{}, errAPINotSupported + } + var ( workCh = make(chan [3]string, 1) errCh = make(chan error, 1) @@ -66,6 +73,10 @@ func (api *API) GetWork() ([3]string, error) { // It returns an indication if the work was accepted. // Note either an invalid solution, a stale work a non-existent work will return false. func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) bool { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return false + } + var errCh = make(chan error, 1) select { @@ -90,6 +101,10 @@ func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) boo // It accepts the miner hash rate and an identifier which must be unique // between nodes. func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return false + } + var doneCh = make(chan struct{}, 1) select { diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 2cb72676e948..bee4efe9e6b7 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -557,14 +557,16 @@ func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header // Start implements consensus.Engine, starting the ethash engine. func (ethash *Ethash) Start() { - log.Info("Start ethash consensus engine") - atomic.StoreInt32(ðash.running, 1) + if atomic.CompareAndSwapInt32(ðash.running, 0, 1) { + log.Info("Start ethash consensus engine") + } } // Stop implements consensus.Engine, stopping the ethash engine. func (ethash *Ethash) Stop() { - log.Info("Stop ethash consensus engine") - atomic.StoreInt32(ðash.running, 0) + if atomic.CompareAndSwapInt32(ðash.running, 1, 0) { + log.Info("Stop ethash consensus engine") + } } // IsRunning implements consensus.Engine, returning an indication if the ethash engine is currently mining. diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 7e9e661f78c2..caf272ab0765 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -508,7 +508,6 @@ func NewFaker() *Ethash { config: Config{ PowMode: ModeFake, }, - exitCh: make(chan chan error), } } @@ -521,7 +520,6 @@ func NewFakeFailer(fail uint64) *Ethash { PowMode: ModeFake, }, fakeFail: fail, - exitCh: make(chan chan error), } } @@ -534,7 +532,6 @@ func NewFakeDelayer(delay time.Duration) *Ethash { PowMode: ModeFake, }, fakeDelay: delay, - exitCh: make(chan chan error), } } @@ -545,30 +542,27 @@ func NewFullFaker() *Ethash { config: Config{ PowMode: ModeFullFake, }, - exitCh: make(chan chan error), } } // NewShared creates a full sized ethash PoW shared between all requesters running // in the same process. func NewShared() *Ethash { - return &Ethash{ - shared: sharedEthash, - exitCh: make(chan chan error), - } + return &Ethash{shared: sharedEthash} } // Close closes the exit channel to notify all backend threads exiting. func (ethash *Ethash) Close() error { var err error ethash.closeOnce.Do(func() { - var errCh = make(chan error) - select { - case ethash.exitCh <- errCh: - err = <-errCh - close(ethash.exitCh) - default: + // Short circuit if the exit channel is not allocated. + if ethash.exitCh == nil { + return } + errCh := make(chan error) + ethash.exitCh <- errCh + err = <-errCh + close(ethash.exitCh) }) return err } @@ -648,6 +642,10 @@ func (ethash *Ethash) SetThreads(threads int) { // Note the returned hashrate includes local hashrate, but also includes the total // hashrate of all remote miner. func (ethash *Ethash) Hashrate() float64 { + // Short circuit if we are run the ethash in normal/test mode. + if ethash.config.PowMode != ModeNormal && ethash.config.PowMode != ModeTest { + return ethash.hashrate.Rate1() + } var resCh = make(chan uint64, 1) select { diff --git a/miner/agent.go b/miner/agent.go index beaf30556af2..8d37921ef520 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -63,10 +63,7 @@ func (self *CpuAgent) Stop() { return // agent already stopped } // Close the pending routines. - select { - case self.stop <- struct{}{}: - default: - } + close(self.stop) done: // Empty work channel diff --git a/miner/miner.go b/miner/miner.go index e7add025deee..949ce0f2b268 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -109,21 +109,11 @@ func (self *Miner) Start(coinbase common.Address) { log.Info("Network syncing, will start miner afterwards") return } - if !self.engine.IsRunning() { - self.engine.Start() - } - if !self.worker.isRunning() { - self.worker.start() - } + self.worker.start() } func (self *Miner) Stop() { - if self.engine.IsRunning() { - self.engine.Stop() - } - if self.worker.isRunning() { - self.worker.stop() - } + self.worker.stop() atomic.StoreInt32(&self.shouldStart, 0) } @@ -139,11 +129,11 @@ func (self *Miner) Mining() bool { return self.engine.IsRunning() } -func (self *Miner) HashRate() (tot uint64) { +func (self *Miner) HashRate() uint64 { if pow, ok := self.engine.(consensus.PoW); ok { - tot += uint64(pow.Hashrate()) + return uint64(pow.Hashrate()) } - return + return 0 } func (self *Miner) SetExtra(extra []byte) error { diff --git a/miner/worker.go b/miner/worker.go index ba39cb83ddc2..1cd666f17c4d 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -101,7 +101,6 @@ type worker struct { chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -127,8 +126,7 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - running int32 - atWork int32 + atWork int32 // The number of in-flight consensus engine work. } func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { @@ -175,62 +173,40 @@ func (self *worker) setExtra(extra []byte) { } func (self *worker) pending() (*types.Block, *state.StateDB) { - if atomic.LoadInt32(&self.running) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock, self.snapshotState.Copy() - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block, self.current.state.Copy() + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock, self.snapshotState.Copy() } func (self *worker) pendingBlock() *types.Block { - if atomic.LoadInt32(&self.running) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock } func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() - atomic.StoreInt32(&self.running, 1) - - // spin up agents + self.engine.Start() for agent := range self.agents { agent.Start() } } func (self *worker) stop() { - self.wg.Wait() - self.mu.Lock() defer self.mu.Unlock() - if atomic.LoadInt32(&self.running) == 1 { - for agent := range self.agents { - agent.Stop() - } + + self.engine.Stop() + for agent := range self.agents { + agent.Stop() } - atomic.StoreInt32(&self.running, 0) atomic.StoreInt32(&self.atWork, 0) } -// isRunning returns an indicator whether worker is currently running or not. -func (self *worker) isRunning() bool { - return atomic.LoadInt32(&self.running) > 0 -} - func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() @@ -276,7 +252,7 @@ func (self *worker) update() { // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if atomic.LoadInt32(&self.running) == 0 { + if !self.engine.IsRunning() { self.currentMu.Lock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { @@ -364,6 +340,11 @@ func (self *worker) wait() { // push sends a new work task to currently live miner agents. func (self *worker) push(work *Work) { + // Never send task to consensus engine if the etherbase is not specified. + if self.engine.IsRunning() && work.header.Coinbase == (common.Address{}) { + log.Info("Please explicitly specifies the etherbase") + return + } for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) if ch := agent.Work(); ch != nil { @@ -547,10 +528,19 @@ func (self *worker) updateSnapshot() { self.snapshotMu.Lock() defer self.snapshotMu.Unlock() + var uncles []*types.Header + self.current.uncles.Each(func(item interface{}) bool { + if header, ok := item.(*types.Header); ok { + uncles = append(uncles, header) + return true + } + return false + }) + self.snapshotBlock = types.NewBlock( self.current.header, self.current.txs, - nil, + uncles, self.current.receipts, ) self.snapshotState = self.current.state.Copy() From 1e844fe7a0fb6b0a4464ced6c16567e536026daa Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 3 Aug 2018 12:45:53 +0800 Subject: [PATCH 11/12] all: delete useless interfaces --- consensus/clique/clique.go | 29 ++---------------- consensus/consensus.go | 9 ------ consensus/errors.go | 3 -- consensus/ethash/api.go | 6 ---- consensus/ethash/consensus.go | 21 ------------- consensus/ethash/ethash.go | 16 ---------- consensus/ethash/sealer.go | 4 --- core/tx_pool.go | 3 -- eth/api.go | 16 +++++----- eth/backend.go | 9 ++---- miner/agent.go | 6 ++-- miner/miner.go | 8 ++--- miner/worker.go | 57 ++++++++++++++--------------------- 13 files changed, 41 insertions(+), 146 deletions(-) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 6a8d8ce619dc..5963900c9723 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -23,7 +23,6 @@ import ( "math/big" "math/rand" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/accounts" @@ -195,9 +194,8 @@ func ecrecover(header *types.Header, sigcache *lru.ARCCache) (common.Address, er // Clique is the proof-of-authority consensus engine proposed to support the // Ethereum testnet following the Ropsten attacks. type Clique struct { - config *params.CliqueConfig // Consensus engine configuration parameters - db ethdb.Database // Database to store and retrieve snapshot checkpoints - running int32 // Indicator whether clique engine is running or not. + config *params.CliqueConfig // Consensus engine configuration parameters + db ethdb.Database // Database to store and retrieve snapshot checkpoints recents *lru.ARCCache // Snapshots for recent block to speed up reorgs signatures *lru.ARCCache // Signatures of recent blocks to speed up mining @@ -603,10 +601,6 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch if c.config.Period == 0 && len(block.Transactions()) == 0 { return nil, errWaitTransactions } - // Make sure clique engine is started. - if !c.IsRunning() { - return nil, consensus.ErrEngineNotStart - } // Don't hold the signer fields for the entire sealing procedure c.lock.RLock() signer, signFn := c.signer, c.signFn @@ -678,25 +672,6 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { return new(big.Int).Set(diffNoTurn) } -// Start implements consensus.Engine, starting the clique consensus engine. -func (c *Clique) Start() { - if atomic.CompareAndSwapInt32(&c.running, 0, 1) { - log.Info("Start clique consensus engine") - } -} - -// Stop implements consensus.Engine, stopping the clique consensus engine. -func (c *Clique) Stop() { - if atomic.CompareAndSwapInt32(&c.running, 1, 0) { - log.Info("Stop clique consensus engine") - } -} - -// IsRunning implements consensus.Engine, returning an indication if the clique engine is currently mining. -func (c *Clique) IsRunning() bool { - return atomic.LoadInt32(&c.running) > 0 -} - // Close implements consensus.Engine, returning internal error and close the clique. func (c *Clique) Close() error { return nil diff --git a/consensus/consensus.go b/consensus/consensus.go index 3562388d516e..ae0fefb490d1 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -97,15 +97,6 @@ type Engine interface { // APIs returns the RPC APIs this consensus engine provides. APIs(chain ChainReader) []rpc.API - // Start starts the consensus engine. - Start() - - // Stop stops the consensus engine. - Stop() - - // IsRunning returns an indication whether consensus engine is running or not. - IsRunning() bool - // Close closes the consensus engine. Close() error } diff --git a/consensus/errors.go b/consensus/errors.go index c7d7073e1932..a005c5f63de8 100644 --- a/consensus/errors.go +++ b/consensus/errors.go @@ -34,7 +34,4 @@ var ( // ErrInvalidNumber is returned if a block's number doesn't equal it's parent's // plus one. ErrInvalidNumber = errors.New("invalid block number") - - // ErrEngineNotStart is returned if the consensus engine is not started. - ErrEngineNotStart = errors.New("consensus engine is not started") ) diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index eae6c169a8b9..b950fbd92cbe 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -51,12 +51,6 @@ func (api *API) GetWork() ([3]string, error) { err error ) - // Trigger ethash to start in remote mining mode(local/cpu mining is disabled) - // if ethash is not running. - if !api.ethash.IsRunning() { - api.ethash.StartMining(new(int)) - } - select { case api.ethash.fetchWorkCh <- &sealWork{errCh: errCh, resCh: workCh}: case <-api.ethash.exitCh: diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index bee4efe9e6b7..eb0f73d98bb3 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -22,7 +22,6 @@ import ( "fmt" "math/big" "runtime" - "sync/atomic" "time" mapset "github.com/deckarep/golang-set" @@ -32,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) @@ -554,22 +552,3 @@ func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header } state.AddBalance(header.Coinbase, reward) } - -// Start implements consensus.Engine, starting the ethash engine. -func (ethash *Ethash) Start() { - if atomic.CompareAndSwapInt32(ðash.running, 0, 1) { - log.Info("Start ethash consensus engine") - } -} - -// Stop implements consensus.Engine, stopping the ethash engine. -func (ethash *Ethash) Stop() { - if atomic.CompareAndSwapInt32(ðash.running, 1, 0) { - log.Info("Stop ethash consensus engine") - } -} - -// IsRunning implements consensus.Engine, returning an indication if the ethash engine is currently mining. -func (ethash *Ethash) IsRunning() bool { - return atomic.LoadInt32(ðash.running) > 0 -} diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index caf272ab0765..3fff266410c1 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -428,7 +428,6 @@ type Ethash struct { threads int // Number of threads to mine on if mining update chan struct{} // Notification channel to update mining parameters hashrate metrics.Meter // Meter tracking the average hashrate - running int32 // Indicator whether ethash engine is running or not. // Remote sealer related fields workCh chan *types.Block // Notification channel to push new work to remote sealer @@ -487,7 +486,6 @@ func NewTester() *Ethash { datasets: newlru("dataset", 1, newDataset), update: make(chan struct{}), hashrate: metrics.NewMeter(), - running: 1, // enable local mining by default workCh: make(chan *types.Block), resultCh: make(chan *types.Block), fetchWorkCh: make(chan *sealWork), @@ -680,20 +678,6 @@ func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { } } -// StartMining starts the ethash engine with the given number of threads. -// If threads is nil the number of workers started is equal to the number of logical CPUs -// that are usable by this process. If threads is 0, than local/cpu mining will be disabled. -// If mining is already running, this method adjust the number of threads allowed to use. -func (ethash *Ethash) StartMining(threads *int) { - if threads == nil { - threads = new(int) - } else if *threads == 0 { - *threads = -1 // Disable local/cpu mining. - } - ethash.Start() - ethash.SetThreads(*threads) -} - // SeedHash is the seed to use for generating a verification cache and the mining // dataset. func SeedHash(block uint64) []byte { diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 4c164b779bdd..539e783a6c64 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -50,10 +50,6 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if ethash.shared != nil { return ethash.shared.Seal(chain, block, stop) } - // Make sure ethash engine is started. - if !ethash.IsRunning() { - return nil, consensus.ErrEngineNotStart - } // Create a runner and the multiple search threads it directs abort := make(chan struct{}) diff --git a/core/tx_pool.go b/core/tx_pool.go index 0fb5ca52b2ca..7007f85ddfd1 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -465,9 +465,6 @@ func (pool *TxPool) SetGasPrice(price *big.Int) { pool.mu.Lock() defer pool.mu.Unlock() - if pool.gasPrice == price { - return - } pool.gasPrice = price for _, tx := range pool.priced.Cap(price, pool.locals) { pool.removeTx(tx.Hash(), false) diff --git a/eth/api.go b/eth/api.go index a6b43198726c..c1fbcb6d406a 100644 --- a/eth/api.go +++ b/eth/api.go @@ -112,14 +112,16 @@ func (api *PrivateMinerAPI) Start(threads *int) error { log.Info("Updated mining threads", "threads", *threads) th.SetThreads(*threads) } - // Propagate the initial price point to the transaction pool - api.e.lock.RLock() - price := api.e.gasPrice - api.e.lock.RUnlock() - api.e.txPool.SetGasPrice(price) - // Start the miner and return - return api.e.StartMining(true) + if !api.e.IsMining() { + // Propagate the initial price point to the transaction pool + api.e.lock.RLock() + price := api.e.gasPrice + api.e.lock.RUnlock() + api.e.txPool.SetGasPrice(price) + return api.e.StartMining(true) + } + return nil } // Stop the miner diff --git a/eth/backend.go b/eth/backend.go index a41adebcfd9a..32946a0ab3bf 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -167,12 +167,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { return nil, err } - // Specify etherbase explicitly. - var etherbase common.Address - if addr, err := eth.Etherbase(); err == nil { - etherbase = addr - } - eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, etherbase) + eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.APIBackend = &EthAPIBackend{eth, nil} @@ -344,7 +339,7 @@ func (s *Ethereum) StartMining(local bool) error { log.Error("Cannot start mining without etherbase", "err", err) return fmt.Errorf("etherbase missing: %v", err) } - if clique, ok := s.engine.(*clique.Clique); ok && !clique.IsRunning() { + if clique, ok := s.engine.(*clique.Clique); ok { wallet, err := s.accountManager.Find(accounts.Account{Address: eb}) if wallet == nil || err != nil { log.Error("Etherbase account unavailable locally", "err", err) diff --git a/miner/agent.go b/miner/agent.go index 8d37921ef520..95d835bd78e7 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -62,9 +62,7 @@ func (self *CpuAgent) Stop() { if !atomic.CompareAndSwapInt32(&self.started, 1, 0) { return // agent already stopped } - // Close the pending routines. - close(self.stop) - + self.stop <- struct{}{} done: // Empty work channel for { @@ -105,7 +103,7 @@ func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash()) self.returnCh <- &Result{work, result} } else { - if err != nil && err != consensus.ErrEngineNotStart { + if err != nil { log.Warn("Block sealing failed", "err", err) } self.returnCh <- nil diff --git a/miner/miner.go b/miner/miner.go index 949ce0f2b268..4c5717c8ad60 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -54,13 +54,12 @@ type Miner struct { shouldStart int32 // should start indicates whether we should start after sync } -func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, coinbase common.Address) *Miner { +func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner { miner := &Miner{ eth: eth, mux: mux, engine: engine, - coinbase: coinbase, - worker: newWorker(config, engine, coinbase, eth, mux), + worker: newWorker(config, engine, eth, mux), canStart: 1, } miner.Register(NewCpuAgent(eth.BlockChain(), engine)) @@ -110,6 +109,7 @@ func (self *Miner) Start(coinbase common.Address) { return } self.worker.start() + self.worker.commitNewWork() } func (self *Miner) Stop() { @@ -126,7 +126,7 @@ func (self *Miner) Unregister(agent Agent) { } func (self *Miner) Mining() bool { - return self.engine.IsRunning() + return self.worker.isRunning() } func (self *Miner) HashRate() uint64 { diff --git a/miner/worker.go b/miner/worker.go index 1cd666f17c4d..f1194fa18a8a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -126,10 +126,11 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - atWork int32 // The number of in-flight consensus engine work. + atWork int32 // The number of in-flight consensus engine work. + running int32 // The indicator whether the consensus engine is running or not. } -func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { +func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { worker := &worker{ config: config, engine: engine, @@ -143,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com chain: eth.BlockChain(), proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), - coinbase: coinbase, agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } @@ -189,8 +189,7 @@ func (self *worker) pendingBlock() *types.Block { func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() - - self.engine.Start() + atomic.StoreInt32(&self.running, 1) for agent := range self.agents { agent.Start() } @@ -200,19 +199,25 @@ func (self *worker) stop() { self.mu.Lock() defer self.mu.Unlock() - self.engine.Stop() + atomic.StoreInt32(&self.running, 0) for agent := range self.agents { agent.Stop() } atomic.StoreInt32(&self.atWork, 0) } +func (self *worker) isRunning() bool { + return atomic.LoadInt32(&self.running) == 1 +} + func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() self.agents[agent] = struct{}{} agent.SetReturnCh(self.recv) - agent.Start() + if self.isRunning() { + agent.Start() + } } func (self *worker) unregister(agent Agent) { @@ -227,11 +232,6 @@ func (self *worker) update() { defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - - var started bool // Indication whether consensus engine is started - for { // A real event arrived, process interesting content select { @@ -252,7 +252,7 @@ func (self *worker) update() { // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if !self.engine.IsRunning() { + if !self.isRunning() { self.currentMu.Lock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { @@ -270,17 +270,6 @@ func (self *worker) update() { } } - // Commit new work when consensus engine is started. - case <-ticker.C: - if self.engine.IsRunning() { - if !started { - self.commitNewWork() - started = true - } - } else { - started = false - } - // System stopped case <-self.txsSub.Err(): return @@ -340,11 +329,6 @@ func (self *worker) wait() { // push sends a new work task to currently live miner agents. func (self *worker) push(work *Work) { - // Never send task to consensus engine if the etherbase is not specified. - if self.engine.IsRunning() && work.header.Coinbase == (common.Address{}) { - log.Info("Please explicitly specifies the etherbase") - return - } for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) if ch := agent.Work(); ch != nil { @@ -416,7 +400,11 @@ func (self *worker) commitNewWork() { Time: big.NewInt(tstamp), } // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) - if self.engine.IsRunning() { + if self.isRunning() { + if self.coinbase == (common.Address{}) { + log.Error("Refusing to mine without etherbase") + return + } header.Coinbase = self.coinbase } if err := self.engine.Prepare(self.chain, header); err != nil { @@ -479,10 +467,10 @@ func (self *worker) commitNewWork() { // Push empty work in advance without applying pending transaction. // The reason is transactions execution can cost a lot and sealer need to // take advantage of this part time. - if self.engine.IsRunning() { + if self.isRunning() { log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles)) + self.push(work) } - self.push(work) } // Fill the block with all available pending transactions. @@ -500,12 +488,11 @@ func (self *worker) commitNewWork() { return } // We only care about logging if we're actually mining. - if self.engine.IsRunning() { + if self.isRunning() { log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) + self.push(work) } - // Push full work to sealer, which will replace the empty work sent before automatically. - self.push(work) self.updateSnapshot() } From 2a08b49f4984184e5bc5912908df1bbd57029d3c Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 3 Aug 2018 15:44:37 +0800 Subject: [PATCH 12/12] consensus: polish a bit --- consensus/clique/clique.go | 2 +- consensus/consensus.go | 2 +- consensus/ethash/api.go | 32 +++++++++++++++----------------- consensus/ethash/ethash.go | 21 ++++++++++----------- consensus/ethash/sealer.go | 18 ++++++++---------- 5 files changed, 35 insertions(+), 40 deletions(-) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 5963900c9723..59bb3d40b42b 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -672,7 +672,7 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { return new(big.Int).Set(diffNoTurn) } -// Close implements consensus.Engine, returning internal error and close the clique. +// Close implements consensus.Engine. It's a noop for clique as there is are no background threads. func (c *Clique) Close() error { return nil } diff --git a/consensus/consensus.go b/consensus/consensus.go index ae0fefb490d1..82717544455c 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -97,7 +97,7 @@ type Engine interface { // APIs returns the RPC APIs this consensus engine provides. APIs(chain ChainReader) []rpc.API - // Close closes the consensus engine. + // Close terminates any background threads maintained by the consensus engine. Close() error } diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index b950fbd92cbe..a04ea235d925 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -24,10 +24,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var ( - errEthashStopped = errors.New("ethash stopped") - errAPINotSupported = errors.New("the current ethash running mode does not support this API") -) +var errEthashStopped = errors.New("ethash stopped") // API exposes ethash related methods for the RPC interface. type API struct { @@ -42,25 +39,26 @@ type API struct { // result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty func (api *API) GetWork() ([3]string, error) { if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { - return [3]string{}, errAPINotSupported + return [3]string{}, errors.New("not supported") } var ( workCh = make(chan [3]string, 1) - errCh = make(chan error, 1) - err error + errc = make(chan error, 1) ) select { - case api.ethash.fetchWorkCh <- &sealWork{errCh: errCh, resCh: workCh}: + case api.ethash.fetchWorkCh <- &sealWork{errc: errc, res: workCh}: case <-api.ethash.exitCh: return [3]string{}, errEthashStopped } - if err = <-errCh; err == nil { - return <-workCh, nil + select { + case work := <-workCh: + return work, nil + case err := <-errc: + return [3]string{}, err } - return [3]string{}, err } // SubmitWork can be used by external miner to submit their POW solution. @@ -71,20 +69,20 @@ func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) boo return false } - var errCh = make(chan error, 1) + var errc = make(chan error, 1) select { case api.ethash.submitWorkCh <- &mineResult{ nonce: nonce, mixDigest: digest, hash: hash, - errCh: errCh, + errc: errc, }: case <-api.ethash.exitCh: return false } - err := <-errCh + err := <-errc return err == nil } @@ -99,16 +97,16 @@ func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool { return false } - var doneCh = make(chan struct{}, 1) + var done = make(chan struct{}, 1) select { - case api.ethash.submitRateCh <- &hashrate{done: doneCh, rate: uint64(rate), id: id}: + case api.ethash.submitRateCh <- &hashrate{done: done, rate: uint64(rate), id: id}: case <-api.ethash.exitCh: return false } // Block until hash rate submitted successfully. - <-doneCh + <-done return true } diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 3fff266410c1..0cb3059b9dfd 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -397,7 +397,7 @@ type mineResult struct { mixDigest common.Hash hash common.Hash - errCh chan error + errc chan error } // hashrate wraps the hash rate submitted by the remote sealer. @@ -411,8 +411,8 @@ type hashrate struct { // sealWork wraps a seal work package for remote sealer. type sealWork struct { - errCh chan error - resCh chan [3]string + errc chan error + res chan [3]string } // Ethash is a consensus engine based on proof-of-work implementing the ethash @@ -447,7 +447,7 @@ type Ethash struct { exitCh chan chan error // Notification channel to exiting backend threads } -// New creates a full sized ethash PoW scheme. +// New creates a full sized ethash PoW scheme and starts a background thread for remote mining. func New(config Config) *Ethash { if config.CachesInMem <= 0 { log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) @@ -557,9 +557,9 @@ func (ethash *Ethash) Close() error { if ethash.exitCh == nil { return } - errCh := make(chan error) - ethash.exitCh <- errCh - err = <-errCh + errc := make(chan error) + ethash.exitCh <- errc + err = <-errc close(ethash.exitCh) }) return err @@ -644,18 +644,17 @@ func (ethash *Ethash) Hashrate() float64 { if ethash.config.PowMode != ModeNormal && ethash.config.PowMode != ModeTest { return ethash.hashrate.Rate1() } - var resCh = make(chan uint64, 1) + var res = make(chan uint64, 1) select { - case ethash.fetchRateCh <- resCh: + case ethash.fetchRateCh <- res: case <-ethash.exitCh: // Return local hashrate only if ethash is stopped. return ethash.hashrate.Rate1() } // Gather total submitted hash rate of remote sealers. - total := <-resCh - return ethash.hashrate.Rate1() + float64(total) + return ethash.hashrate.Rate1() + float64(<-res) } // APIs implements consensus.Engine, returning the user facing RPC APIs. diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 539e783a6c64..a9449d406079 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -235,7 +235,6 @@ func (ethash *Ethash) remote() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() -running: for { select { case block := <-ethash.workCh: @@ -251,18 +250,17 @@ running: // Return current mining work to remote miner. miningWork, err := getWork() if err != nil { - work.errCh <- err + work.errc <- err } else { - close(work.errCh) - work.resCh <- miningWork + work.res <- miningWork } case result := <-ethash.submitWorkCh: // Verify submitted PoW solution based on maintained mining blocks. if submitWork(result.nonce, result.mixDigest, result.hash) { - close(result.errCh) + result.errc <- nil } else { - result.errCh <- errInvalidSealResult + result.errc <- errInvalidSealResult } case result := <-ethash.submitRateCh: @@ -287,11 +285,11 @@ running: } } - case errCh := <-ethash.exitCh: + case errc := <-ethash.exitCh: // Exit remote loop if ethash is closed and return relevant error. - errCh <- nil - break running + errc <- nil + log.Trace("Ethash remote sealer is exiting") + return } } - log.Trace("Ethash remote sealer is exiting") }