Skip to content

Commit 878891a

Browse files
fjlholiman
authored andcommitted
p2p/discover: improved node revalidation (ethereum#29572)
Node discovery periodically revalidates the nodes in its table by sending PING, checking if they are still alive. I recently noticed some issues with the implementation of this process, which can cause strange results such as nodes dropping unexpectedly, certain nodes not getting revalidated often enough, and bad results being returned to incoming FINDNODE queries. In this change, the revalidation process is improved with the following logic: - We maintain two 'revalidation lists' containing the table nodes, named 'fast' and 'slow'. - The process chooses random nodes from each list on a randomized interval, the interval being faster for the 'fast' list, and performs revalidation for the chosen node. - Whenever a node is newly inserted into the table, it goes into the 'fast' list. Once validation passes, it transfers to the 'slow' list. If a request fails, or the node changes endpoint, it transfers back into 'fast'. - livenessChecks is incremented by one for successful checks. Unlike the old implementation, we will not drop the node on the first failing check. We instead quickly decay the livenessChecks give it another chance. - Order of nodes in bucket doesn't matter anymore. I am also adding a debug API endpoint to dump the node table content. Co-authored-by: Martin HS <[email protected]>
1 parent 68efcf9 commit 878891a

File tree

15 files changed

+931
-524
lines changed

15 files changed

+931
-524
lines changed

cmd/devp2p/discv4cmd.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"net"
23+
"net/http"
2324
"strconv"
2425
"strings"
2526
"time"
@@ -28,9 +29,11 @@ import (
2829
"github.com/ethereum/go-ethereum/common"
2930
"github.com/ethereum/go-ethereum/crypto"
3031
"github.com/ethereum/go-ethereum/internal/flags"
32+
"github.com/ethereum/go-ethereum/log"
3133
"github.com/ethereum/go-ethereum/p2p/discover"
3234
"github.com/ethereum/go-ethereum/p2p/enode"
3335
"github.com/ethereum/go-ethereum/params"
36+
"github.com/ethereum/go-ethereum/rpc"
3437
"github.com/urfave/cli/v2"
3538
)
3639

@@ -45,6 +48,7 @@ var (
4548
discv4ResolveJSONCommand,
4649
discv4CrawlCommand,
4750
discv4TestCommand,
51+
discv4ListenCommand,
4852
},
4953
}
5054
discv4PingCommand = &cli.Command{
@@ -75,6 +79,14 @@ var (
7579
Flags: discoveryNodeFlags,
7680
ArgsUsage: "<nodes.json file>",
7781
}
82+
discv4ListenCommand = &cli.Command{
83+
Name: "listen",
84+
Usage: "Runs a discovery node",
85+
Action: discv4Listen,
86+
Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{
87+
httpAddrFlag,
88+
}),
89+
}
7890
discv4CrawlCommand = &cli.Command{
7991
Name: "crawl",
8092
Usage: "Updates a nodes.json file with random nodes found in the DHT",
@@ -131,6 +143,10 @@ var (
131143
Usage: "Enode of the remote node under test",
132144
EnvVars: []string{"REMOTE_ENODE"},
133145
}
146+
httpAddrFlag = &cli.StringFlag{
147+
Name: "rpc",
148+
Usage: "HTTP server listening address",
149+
}
134150
)
135151

136152
var discoveryNodeFlags = []cli.Flag{
@@ -154,6 +170,27 @@ func discv4Ping(ctx *cli.Context) error {
154170
return nil
155171
}
156172

173+
func discv4Listen(ctx *cli.Context) error {
174+
disc, _ := startV4(ctx)
175+
defer disc.Close()
176+
177+
fmt.Println(disc.Self())
178+
179+
httpAddr := ctx.String(httpAddrFlag.Name)
180+
if httpAddr == "" {
181+
// Non-HTTP mode.
182+
select {}
183+
}
184+
185+
api := &discv4API{disc}
186+
log.Info("Starting RPC API server", "addr", httpAddr)
187+
srv := rpc.NewServer()
188+
srv.RegisterName("discv4", api)
189+
http.DefaultServeMux.Handle("/", srv)
190+
httpsrv := http.Server{Addr: httpAddr, Handler: http.DefaultServeMux}
191+
return httpsrv.ListenAndServe()
192+
}
193+
157194
func discv4RequestRecord(ctx *cli.Context) error {
158195
n := getNodeArg(ctx)
159196
disc, _ := startV4(ctx)
@@ -362,3 +399,23 @@ func parseBootnodes(ctx *cli.Context) ([]*enode.Node, error) {
362399
}
363400
return nodes, nil
364401
}
402+
403+
type discv4API struct {
404+
host *discover.UDPv4
405+
}
406+
407+
func (api *discv4API) LookupRandom(n int) (ns []*enode.Node) {
408+
it := api.host.RandomNodes()
409+
for len(ns) < n && it.Next() {
410+
ns = append(ns, it.Node())
411+
}
412+
return ns
413+
}
414+
415+
func (api *discv4API) Buckets() [][]discover.BucketNode {
416+
return api.host.TableBuckets()
417+
}
418+
419+
func (api *discv4API) Self() *enode.Node {
420+
return api.host.Self()
421+
}

internal/testlog/testlog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (h *bufHandler) Handle(_ context.Context, r slog.Record) error {
5858
}
5959

6060
func (h *bufHandler) Enabled(_ context.Context, lvl slog.Level) bool {
61-
return lvl <= h.level
61+
return lvl >= h.level
6262
}
6363

6464
func (h *bufHandler) WithAttrs(attrs []slog.Attr) slog.Handler {

node/api.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/ethereum/go-ethereum/internal/debug"
2727
"github.com/ethereum/go-ethereum/log"
2828
"github.com/ethereum/go-ethereum/p2p"
29+
"github.com/ethereum/go-ethereum/p2p/discover"
2930
"github.com/ethereum/go-ethereum/p2p/enode"
3031
"github.com/ethereum/go-ethereum/rpc"
3132
)
@@ -39,6 +40,9 @@ func (n *Node) apis() []rpc.API {
3940
}, {
4041
Namespace: "debug",
4142
Service: debug.Handler,
43+
}, {
44+
Namespace: "debug",
45+
Service: &p2pDebugAPI{n},
4246
}, {
4347
Namespace: "web3",
4448
Service: &web3API{n},
@@ -333,3 +337,16 @@ func (s *web3API) ClientVersion() string {
333337
func (s *web3API) Sha3(input hexutil.Bytes) hexutil.Bytes {
334338
return crypto.Keccak256(input)
335339
}
340+
341+
// p2pDebugAPI provides access to p2p internals for debugging.
342+
type p2pDebugAPI struct {
343+
stack *Node
344+
}
345+
346+
func (s *p2pDebugAPI) DiscoveryV4Table() [][]discover.BucketNode {
347+
disc := s.stack.server.DiscoveryV4()
348+
if disc != nil {
349+
return disc.TableBuckets()
350+
}
351+
return nil
352+
}

p2p/discover/common.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ package discover
1818

1919
import (
2020
"crypto/ecdsa"
21+
crand "crypto/rand"
22+
"encoding/binary"
23+
"math/rand"
2124
"net"
25+
"sync"
2226
"time"
2327

2428
"github.com/ethereum/go-ethereum/common/mclock"
@@ -62,7 +66,7 @@ type Config struct {
6266
func (cfg Config) withDefaults() Config {
6367
// Node table configuration:
6468
if cfg.PingInterval == 0 {
65-
cfg.PingInterval = 10 * time.Second
69+
cfg.PingInterval = 3 * time.Second
6670
}
6771
if cfg.RefreshInterval == 0 {
6872
cfg.RefreshInterval = 30 * time.Minute
@@ -92,3 +96,44 @@ type ReadPacket struct {
9296
Data []byte
9397
Addr *net.UDPAddr
9498
}
99+
100+
type randomSource interface {
101+
Intn(int) int
102+
Int63n(int64) int64
103+
Shuffle(int, func(int, int))
104+
}
105+
106+
// reseedingRandom is a random number generator that tracks when it was last re-seeded.
107+
type reseedingRandom struct {
108+
mu sync.Mutex
109+
cur *rand.Rand
110+
}
111+
112+
func (r *reseedingRandom) seed() {
113+
var b [8]byte
114+
crand.Read(b[:])
115+
seed := binary.BigEndian.Uint64(b[:])
116+
new := rand.New(rand.NewSource(int64(seed)))
117+
118+
r.mu.Lock()
119+
r.cur = new
120+
r.mu.Unlock()
121+
}
122+
123+
func (r *reseedingRandom) Intn(n int) int {
124+
r.mu.Lock()
125+
defer r.mu.Unlock()
126+
return r.cur.Intn(n)
127+
}
128+
129+
func (r *reseedingRandom) Int63n(n int64) int64 {
130+
r.mu.Lock()
131+
defer r.mu.Unlock()
132+
return r.cur.Int63n(n)
133+
}
134+
135+
func (r *reseedingRandom) Shuffle(n int, swap func(i, j int)) {
136+
r.mu.Lock()
137+
defer r.mu.Unlock()
138+
r.cur.Shuffle(n, swap)
139+
}

p2p/discover/lookup.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,32 +140,13 @@ func (it *lookup) slowdown() {
140140
}
141141

142142
func (it *lookup) query(n *node, reply chan<- []*node) {
143-
fails := it.tab.db.FindFails(n.ID(), n.IP())
144143
r, err := it.queryfunc(n)
145-
if errors.Is(err, errClosed) {
146-
// Avoid recording failures on shutdown.
147-
reply <- nil
148-
return
149-
} else if len(r) == 0 {
150-
fails++
151-
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
152-
// Remove the node from the local table if it fails to return anything useful too
153-
// many times, but only if there are enough other nodes in the bucket.
154-
dropped := false
155-
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
156-
dropped = true
157-
it.tab.delete(n)
144+
if !errors.Is(err, errClosed) { // avoid recording failures on shutdown.
145+
success := len(r) > 0
146+
it.tab.trackRequest(n, success, r)
147+
if err != nil {
148+
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "err", err)
158149
}
159-
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
160-
} else if fails > 0 {
161-
// Reset failure counter because it counts _consecutive_ failures.
162-
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
163-
}
164-
165-
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
166-
// just remove those again during revalidation.
167-
for _, n := range r {
168-
it.tab.addSeenNode(n)
169150
}
170151
reply <- r
171152
}

p2p/discover/node.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,22 @@ import (
2929
"github.com/ethereum/go-ethereum/p2p/enode"
3030
)
3131

32+
type BucketNode struct {
33+
Node *enode.Node `json:"node"`
34+
AddedToTable time.Time `json:"addedToTable"`
35+
AddedToBucket time.Time `json:"addedToBucket"`
36+
Checks int `json:"checks"`
37+
Live bool `json:"live"`
38+
}
39+
3240
// node represents a host on the network.
3341
// The fields of Node may not be modified.
3442
type node struct {
35-
enode.Node
36-
addedAt time.Time // time when the node was added to the table
37-
livenessChecks uint // how often liveness was checked
43+
*enode.Node
44+
addedToTable time.Time // first time node was added to bucket or replacement list
45+
addedToBucket time.Time // time it was added in the actual bucket
46+
livenessChecks uint // how often liveness was checked
47+
isValidatedLive bool // true if existence of node is considered validated right now
3848
}
3949

4050
type encPubkey [64]byte
@@ -65,7 +75,7 @@ func (e encPubkey) id() enode.ID {
6575
}
6676

6777
func wrapNode(n *enode.Node) *node {
68-
return &node{Node: *n}
78+
return &node{Node: n}
6979
}
7080

7181
func wrapNodes(ns []*enode.Node) []*node {
@@ -77,7 +87,7 @@ func wrapNodes(ns []*enode.Node) []*node {
7787
}
7888

7989
func unwrapNode(n *node) *enode.Node {
80-
return &n.Node
90+
return n.Node
8191
}
8292

8393
func unwrapNodes(ns []*node) []*enode.Node {

0 commit comments

Comments
 (0)