Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package eth

import (
"context"
"encoding/json"
"fmt"
"math/big"
Expand Down Expand Up @@ -62,6 +63,26 @@ import (
gethversion "github.com/ethereum/go-ethereum/version"
)

const (
// This is the fairness knob for the discovery mixer. When looking for peers, we'll
// wait this long for a single source of candidates before moving on and trying other
// sources. If this timeout expires, the source will be skipped in this round, but it
// will continue to fetch in the background and will have a chance with a new timeout
// in the next rounds, giving it overall more time but a proportionally smaller share.
// We expect a normal source to produce ~10 candidates per second.
discmixTimeout = 100 * time.Millisecond

// discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery
// source. It is useful to avoid the negative effects of potential longer timeouts
// in the discovery, keeping dial progress while waiting for the next batch of
// candidates.
discoveryPrefetchBuffer = 32

// maxParallelENRRequests is the maximum number of parallel ENR requests that can be
// performed by a disc/v4 source.
maxParallelENRRequests = 16
)

// Config contains the configuration options of the ETH protocol.
// Deprecated: use ethconfig.Config instead.
type Config = ethconfig.Config
Expand Down Expand Up @@ -169,7 +190,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
networkID: networkID,
gasPrice: config.Miner.GasPrice,
p2pServer: stack.Server(),
discmix: enode.NewFairMix(0),
discmix: enode.NewFairMix(discmixTimeout),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
Expand Down Expand Up @@ -487,10 +508,27 @@ func (s *Ethereum) setupDiscovery() error {
s.discmix.AddSource(iter)
}

// Add DHT nodes from discv4.
if s.p2pServer.DiscoveryV4() != nil {
iter := s.p2pServer.DiscoveryV4().RandomNodes()
resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node {
// RequestENR does not yet support context. It will simply time out.
// If the ENR can't be resolved, RequestENR will return nil. We don't
// care about the specific error here, so we ignore it.
nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr)
return nn
}
iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests)
iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
}

// Add DHT nodes from discv5.
if s.p2pServer.DiscoveryV5() != nil {
filter := eth.NewNodeFilter(s.blockchain)
iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter)
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
}

Expand Down
168 changes: 156 additions & 12 deletions p2p/enode/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package enode

import (

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0xB7Bf0782C40380B73917C776dE24d2878cFd47df

"context"
"sync"
"time"
)
Expand Down Expand Up @@ -59,6 +60,11 @@ func (it sourceIter) NodeSource() string {
return it.name
}

type iteratorItem struct {
n *Node
source string
}

// ReadNodes reads at most n nodes from the given iterator. The return value contains no
// duplicates and no nil values. To prevent looping indefinitely for small repeating node
// sequences, this function calls Next at most n times.
Expand Down Expand Up @@ -152,6 +158,149 @@ func (f *filterIter) Next() bool {
return false
}

// asyncFilterIter wraps an iterator such that Next only returns nodes for which
// the 'check' function returns a (possibly modified) node.
type asyncFilterIter struct {
it SourceIterator // the iterator to filter
slots chan struct{} // the slots for parallel checking
passed chan iteratorItem // channel to collect passed nodes
cur iteratorItem // buffer to serve the Node call
cancel context.CancelFunc
closeOnce sync.Once
}

type AsyncFilterFunc func(context.Context, *Node) *Node

// AsyncFilter creates an iterator which checks nodes in parallel.
// The 'check' function is called on multiple goroutines to filter each node
// from the upstream iterator. When check returns nil, the node will be skipped.
// It can also return a new node to be returned by the iterator instead of the .
func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator {
f := &asyncFilterIter{
it: ensureSourceIter(it),
slots: make(chan struct{}, workers+1),
passed: make(chan iteratorItem),
}
for range cap(f.slots) {
f.slots <- struct{}{}
}
ctx, cancel := context.WithCancel(context.Background())
f.cancel = cancel

go func() {
select {
case <-ctx.Done():
return
case <-f.slots:
}
// read from the iterator and start checking nodes in parallel
// when a node is checked, it will be sent to the passed channel
// and the slot will be released
for f.it.Next() {
node := f.it.Node()
nodeSource := f.it.NodeSource()

// check the node async, in a separate goroutine
<-f.slots
go func() {
if nn := check(ctx, node); nn != nil {
item := iteratorItem{nn, nodeSource}
select {
case f.passed <- item:
case <-ctx.Done(): // bale out if downstream is already closed and not calling Next
}
}
f.slots <- struct{}{}
}()
}
// the iterator has ended
f.slots <- struct{}{}
}()

return f
}

// Next blocks until a node is available or the iterator is closed.
func (f *asyncFilterIter) Next() bool {
var ok bool
f.cur, ok = <-f.passed
return ok
}

// Node returns the current node.
func (f *asyncFilterIter) Node() *Node {
return f.cur.n
}

// NodeSource implements IteratorSource.
func (f *asyncFilterIter) NodeSource() string {
return f.cur.source
}

// Close ends the iterator, also closing the wrapped iterator.
func (f *asyncFilterIter) Close() {
f.closeOnce.Do(func() {
f.it.Close()
f.cancel()
for range cap(f.slots) {
<-f.slots
}
close(f.slots)
close(f.passed)
})
}

// bufferIter wraps an iterator and buffers the nodes it returns.
// The buffer is pre-filled with the given size from the wrapped iterator.
type bufferIter struct {
it SourceIterator
buffer chan iteratorItem
head iteratorItem
closeOnce sync.Once
}

// NewBufferIter creates a new pre-fetch buffer of a given size.
func NewBufferIter(it Iterator, size int) Iterator {
b := bufferIter{
it: ensureSourceIter(it),
buffer: make(chan iteratorItem, size),
}

go func() {
// if the wrapped iterator ends, the buffer content will still be served.
defer close(b.buffer)
// If instead the bufferIterator is closed, we bail out of the loop.
for b.it.Next() {
item := iteratorItem{b.it.Node(), b.it.NodeSource()}
b.buffer <- item
}
}()
return &b
}

func (b *bufferIter) Next() bool {
var ok bool
b.head, ok = <-b.buffer
return ok
}

func (b *bufferIter) Node() *Node {
return b.head.n
}

func (b *bufferIter) NodeSource() string {
return b.head.source
}

func (b *bufferIter) Close() {
b.closeOnce.Do(func() {
b.it.Close()
// Drain buffer and wait for the goroutine to end.
for range b.buffer {
}
})
}

// FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends
// only when Close is called. Source iterators added via AddSource are removed from the
// mix when they end.
Expand All @@ -164,9 +313,9 @@ func (f *filterIter) Next() bool {
// It's safe to call AddSource and Close concurrently with Next.
type FairMix struct {
wg sync.WaitGroup
fromAny chan mixItem
fromAny chan iteratorItem
timeout time.Duration
cur mixItem
cur iteratorItem

mu sync.Mutex
closed chan struct{}
Expand All @@ -176,15 +325,10 @@ type FairMix struct {

type mixSource struct {
it SourceIterator
next chan mixItem
next chan iteratorItem
timeout time.Duration
}

type mixItem struct {
n *Node
source string
}

// NewFairMix creates a mixer.
//
// The timeout specifies how long the mixer will wait for the next fairly-chosen source
Expand All @@ -193,7 +337,7 @@ type mixItem struct {
// timeout makes the mixer completely fair.
func NewFairMix(timeout time.Duration) *FairMix {
m := &FairMix{
fromAny: make(chan mixItem),
fromAny: make(chan iteratorItem),
closed: make(chan struct{}),
timeout: timeout,
}
Expand All @@ -211,7 +355,7 @@ func (m *FairMix) AddSource(it Iterator) {
m.wg.Add(1)
source := &mixSource{
it: ensureSourceIter(it),
next: make(chan mixItem),
next: make(chan iteratorItem),
timeout: m.timeout,
}
m.sources = append(m.sources, source)
Expand Down Expand Up @@ -239,7 +383,7 @@ func (m *FairMix) Close() {

// Next returns a node from a random source.
func (m *FairMix) Next() bool {
m.cur = mixItem{}
m.cur = iteratorItem{}

for {
source := m.pickSource()
Expand Down Expand Up @@ -327,7 +471,7 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
defer m.wg.Done()
defer close(s.next)
for s.it.Next() {
item := mixItem{s.it.Node(), s.it.NodeSource()}
item := iteratorItem{s.it.Node(), s.it.NodeSource()}
select {
case s.next <- item:
case m.fromAny <- item:
Expand Down
23 changes: 16 additions & 7 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ import (
const (
defaultDialTimeout = 15 * time.Second

// This is the fairness knob for the discovery mixer. When looking for peers, we'll
// wait this long for a single source of candidates before moving on and trying other
// sources.
discmixTimeout = 5 * time.Second

// Connectivity defaults.
defaultMaxPendingPeers = 50
defaultDialRatio = 3
Expand Down Expand Up @@ -447,7 +442,9 @@ func (srv *Server) setupLocalNode() error {
}

func (srv *Server) setupDiscovery() error {
srv.discmix = enode.NewFairMix(discmixTimeout)
// Set up the discovery source mixer. Here, we don't care about the
// fairness of the mix, it's just for putting the
srv.discmix = enode.NewFairMix(0)

// Don't listen on UDP endpoint if DHT is disabled.
if srv.NoDiscovery {
Expand Down Expand Up @@ -483,7 +480,6 @@ func (srv *Server) setupDiscovery() error {
return err
}
srv.discv4 = ntab
srv.discmix.AddSource(ntab.RandomNodes())
}
if srv.Config.DiscoveryV5 {
cfg := discover.Config{
Expand All @@ -506,6 +502,19 @@ func (srv *Server) setupDiscovery() error {
added[proto.Name] = true
}
}

// Set up default non-protocol-specific discovery feeds if no protocol
// has configured discovery.
if len(added) == 0 {
if srv.discv4 != nil {
it := srv.discv4.RandomNodes()
srv.discmix.AddSource(enode.WithSourceName("discv4-default", it))
}
if srv.discv5 != nil {
it := srv.discv5.RandomNodes()
srv.discmix.AddSource(enode.WithSourceName("discv5-default", it))
}
}
return nil
}

Expand Down