diff --git a/eth/backend.go b/eth/backend.go index 15243ad5c985..762ecf518b65 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -18,6 +18,7 @@ package eth import ( + "context" "encoding/json" "fmt" "math/big" @@ -62,6 +63,26 @@ import ( gethversion "github.com/ethereum/go-ethereum/version" ) +const ( + // This is the fairness knob for the discovery mixer. When looking for peers, we'll + // wait this long for a single source of candidates before moving on and trying other + // sources. If this timeout expires, the source will be skipped in this round, but it + // will continue to fetch in the background and will have a chance with a new timeout + // in the next rounds, giving it overall more time but a proportionally smaller share. + // We expect a normal source to produce ~10 candidates per second. + discmixTimeout = 100 * time.Millisecond + + // discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery + // source. It is useful to avoid the negative effects of potential longer timeouts + // in the discovery, keeping dial progress while waiting for the next batch of + // candidates. + discoveryPrefetchBuffer = 32 + + // maxParallelENRRequests is the maximum number of parallel ENR requests that can be + // performed by a disc/v4 source. + maxParallelENRRequests = 16 +) + // Config contains the configuration options of the ETH protocol. // Deprecated: use ethconfig.Config instead. type Config = ethconfig.Config @@ -169,7 +190,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { networkID: networkID, gasPrice: config.Miner.GasPrice, p2pServer: stack.Server(), - discmix: enode.NewFairMix(0), + discmix: enode.NewFairMix(discmixTimeout), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) @@ -487,10 +508,27 @@ func (s *Ethereum) setupDiscovery() error { s.discmix.AddSource(iter) } + // Add DHT nodes from discv4. + if s.p2pServer.DiscoveryV4() != nil { + iter := s.p2pServer.DiscoveryV4().RandomNodes() + resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node { + // RequestENR does not yet support context. It will simply time out. + // If the ENR can't be resolved, RequestENR will return nil. We don't + // care about the specific error here, so we ignore it. + nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr) + return nn + } + iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests) + iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain)) + iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer) + s.discmix.AddSource(iter) + } + // Add DHT nodes from discv5. if s.p2pServer.DiscoveryV5() != nil { filter := eth.NewNodeFilter(s.blockchain) iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter) + iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer) s.discmix.AddSource(iter) } diff --git a/p2p/enode/iter.go b/p2p/enode/iter.go index 4b7e28929eeb..f8f79a943661 100644 --- a/p2p/enode/iter.go +++ b/p2p/enode/iter.go @@ -17,6 +17,7 @@ package enode import ( + "context" "sync" "time" ) @@ -59,6 +60,11 @@ func (it sourceIter) NodeSource() string { return it.name } +type iteratorItem struct { + n *Node + source string +} + // ReadNodes reads at most n nodes from the given iterator. The return value contains no // duplicates and no nil values. To prevent looping indefinitely for small repeating node // sequences, this function calls Next at most n times. @@ -152,6 +158,149 @@ func (f *filterIter) Next() bool { return false } +// asyncFilterIter wraps an iterator such that Next only returns nodes for which +// the 'check' function returns a (possibly modified) node. +type asyncFilterIter struct { + it SourceIterator // the iterator to filter + slots chan struct{} // the slots for parallel checking + passed chan iteratorItem // channel to collect passed nodes + cur iteratorItem // buffer to serve the Node call + cancel context.CancelFunc + closeOnce sync.Once +} + +type AsyncFilterFunc func(context.Context, *Node) *Node + +// AsyncFilter creates an iterator which checks nodes in parallel. +// The 'check' function is called on multiple goroutines to filter each node +// from the upstream iterator. When check returns nil, the node will be skipped. +// It can also return a new node to be returned by the iterator instead of the . +func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator { + f := &asyncFilterIter{ + it: ensureSourceIter(it), + slots: make(chan struct{}, workers+1), + passed: make(chan iteratorItem), + } + for range cap(f.slots) { + f.slots <- struct{}{} + } + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + + go func() { + select { + case <-ctx.Done(): + return + case <-f.slots: + } + // read from the iterator and start checking nodes in parallel + // when a node is checked, it will be sent to the passed channel + // and the slot will be released + for f.it.Next() { + node := f.it.Node() + nodeSource := f.it.NodeSource() + + // check the node async, in a separate goroutine + <-f.slots + go func() { + if nn := check(ctx, node); nn != nil { + item := iteratorItem{nn, nodeSource} + select { + case f.passed <- item: + case <-ctx.Done(): // bale out if downstream is already closed and not calling Next + } + } + f.slots <- struct{}{} + }() + } + // the iterator has ended + f.slots <- struct{}{} + }() + + return f +} + +// Next blocks until a node is available or the iterator is closed. +func (f *asyncFilterIter) Next() bool { + var ok bool + f.cur, ok = <-f.passed + return ok +} + +// Node returns the current node. +func (f *asyncFilterIter) Node() *Node { + return f.cur.n +} + +// NodeSource implements IteratorSource. +func (f *asyncFilterIter) NodeSource() string { + return f.cur.source +} + +// Close ends the iterator, also closing the wrapped iterator. +func (f *asyncFilterIter) Close() { + f.closeOnce.Do(func() { + f.it.Close() + f.cancel() + for range cap(f.slots) { + <-f.slots + } + close(f.slots) + close(f.passed) + }) +} + +// bufferIter wraps an iterator and buffers the nodes it returns. +// The buffer is pre-filled with the given size from the wrapped iterator. +type bufferIter struct { + it SourceIterator + buffer chan iteratorItem + head iteratorItem + closeOnce sync.Once +} + +// NewBufferIter creates a new pre-fetch buffer of a given size. +func NewBufferIter(it Iterator, size int) Iterator { + b := bufferIter{ + it: ensureSourceIter(it), + buffer: make(chan iteratorItem, size), + } + + go func() { + // if the wrapped iterator ends, the buffer content will still be served. + defer close(b.buffer) + // If instead the bufferIterator is closed, we bail out of the loop. + for b.it.Next() { + item := iteratorItem{b.it.Node(), b.it.NodeSource()} + b.buffer <- item + } + }() + return &b +} + +func (b *bufferIter) Next() bool { + var ok bool + b.head, ok = <-b.buffer + return ok +} + +func (b *bufferIter) Node() *Node { + return b.head.n +} + +func (b *bufferIter) NodeSource() string { + return b.head.source +} + +func (b *bufferIter) Close() { + b.closeOnce.Do(func() { + b.it.Close() + // Drain buffer and wait for the goroutine to end. + for range b.buffer { + } + }) +} + // FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends // only when Close is called. Source iterators added via AddSource are removed from the // mix when they end. @@ -164,9 +313,9 @@ func (f *filterIter) Next() bool { // It's safe to call AddSource and Close concurrently with Next. type FairMix struct { wg sync.WaitGroup - fromAny chan mixItem + fromAny chan iteratorItem timeout time.Duration - cur mixItem + cur iteratorItem mu sync.Mutex closed chan struct{} @@ -176,15 +325,10 @@ type FairMix struct { type mixSource struct { it SourceIterator - next chan mixItem + next chan iteratorItem timeout time.Duration } -type mixItem struct { - n *Node - source string -} - // NewFairMix creates a mixer. // // The timeout specifies how long the mixer will wait for the next fairly-chosen source @@ -193,7 +337,7 @@ type mixItem struct { // timeout makes the mixer completely fair. func NewFairMix(timeout time.Duration) *FairMix { m := &FairMix{ - fromAny: make(chan mixItem), + fromAny: make(chan iteratorItem), closed: make(chan struct{}), timeout: timeout, } @@ -211,7 +355,7 @@ func (m *FairMix) AddSource(it Iterator) { m.wg.Add(1) source := &mixSource{ it: ensureSourceIter(it), - next: make(chan mixItem), + next: make(chan iteratorItem), timeout: m.timeout, } m.sources = append(m.sources, source) @@ -239,7 +383,7 @@ func (m *FairMix) Close() { // Next returns a node from a random source. func (m *FairMix) Next() bool { - m.cur = mixItem{} + m.cur = iteratorItem{} for { source := m.pickSource() @@ -327,7 +471,7 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) { defer m.wg.Done() defer close(s.next) for s.it.Next() { - item := mixItem{s.it.Node(), s.it.NodeSource()} + item := iteratorItem{s.it.Node(), s.it.NodeSource()} select { case s.next <- item: case m.fromAny <- item: diff --git a/p2p/server.go b/p2p/server.go index f3a58bba2991..1f859089af18 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -45,11 +45,6 @@ import ( const ( defaultDialTimeout = 15 * time.Second - // This is the fairness knob for the discovery mixer. When looking for peers, we'll - // wait this long for a single source of candidates before moving on and trying other - // sources. - discmixTimeout = 5 * time.Second - // Connectivity defaults. defaultMaxPendingPeers = 50 defaultDialRatio = 3 @@ -447,7 +442,9 @@ func (srv *Server) setupLocalNode() error { } func (srv *Server) setupDiscovery() error { - srv.discmix = enode.NewFairMix(discmixTimeout) + // Set up the discovery source mixer. Here, we don't care about the + // fairness of the mix, it's just for putting the + srv.discmix = enode.NewFairMix(0) // Don't listen on UDP endpoint if DHT is disabled. if srv.NoDiscovery { @@ -483,7 +480,6 @@ func (srv *Server) setupDiscovery() error { return err } srv.discv4 = ntab - srv.discmix.AddSource(ntab.RandomNodes()) } if srv.Config.DiscoveryV5 { cfg := discover.Config{ @@ -506,6 +502,19 @@ func (srv *Server) setupDiscovery() error { added[proto.Name] = true } } + + // Set up default non-protocol-specific discovery feeds if no protocol + // has configured discovery. + if len(added) == 0 { + if srv.discv4 != nil { + it := srv.discv4.RandomNodes() + srv.discmix.AddSource(enode.WithSourceName("discv4-default", it)) + } + if srv.discv5 != nil { + it := srv.discv5.RandomNodes() + srv.discmix.AddSource(enode.WithSourceName("discv5-default", it)) + } + } return nil }