Skip to content

Commit a417615

Browse files
fjlholiman
authored andcommitted
p2p/discover: improved node revalidation (ethereum#29572)
Node discovery periodically revalidates the nodes in its table by sending PING, checking if they are still alive. I recently noticed some issues with the implementation of this process, which can cause strange results such as nodes dropping unexpectedly, certain nodes not getting revalidated often enough, and bad results being returned to incoming FINDNODE queries. In this change, the revalidation process is improved with the following logic: - We maintain two 'revalidation lists' containing the table nodes, named 'fast' and 'slow'. - The process chooses random nodes from each list on a randomized interval, the interval being faster for the 'fast' list, and performs revalidation for the chosen node. - Whenever a node is newly inserted into the table, it goes into the 'fast' list. Once validation passes, it transfers to the 'slow' list. If a request fails, or the node changes endpoint, it transfers back into 'fast'. - livenessChecks is incremented by one for successful checks. Unlike the old implementation, we will not drop the node on the first failing check. We instead quickly decay the livenessChecks give it another chance. - Order of nodes in bucket doesn't matter anymore. I am also adding a debug API endpoint to dump the node table content. Co-authored-by: Martin HS <[email protected]>
1 parent 4333c49 commit a417615

File tree

16 files changed

+899
-542
lines changed

16 files changed

+899
-542
lines changed

les/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.R
218218
return nil
219219
}
220220
reqsEnc, _ := rlp.EncodeToBytes(&reqs)
221-
repliesEnc, _ := s.p2pServer.DiscV5.TalkRequest(s.serverPool.DialNode(n), "vfx", reqsEnc)
221+
repliesEnc, _ := s.p2pServer.DiscoveryV5().TalkRequest(s.serverPool.DialNode(n), "vfx", reqsEnc)
222222
var replies vflux.Replies
223223
if len(repliesEnc) == 0 || rlp.DecodeBytes(repliesEnc, &replies) != nil {
224224
return nil
@@ -234,7 +234,7 @@ func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
234234
if !s.udpEnabled {
235235
return 0
236236
}
237-
if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
237+
if n, err = s.p2pServer.DiscoveryV5().RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
238238
s.serverPool.Persist(n)
239239
} else {
240240
return 0
@@ -351,7 +351,7 @@ func (s *LightEthereum) Start() error {
351351
// Regularly update shutdown marker
352352
s.shutdownTracker.Start()
353353

354-
if s.udpEnabled && s.p2pServer.DiscV5 == nil {
354+
if s.udpEnabled && s.p2pServer.DiscoveryV5() == nil {
355355
s.udpEnabled = false
356356
log.Error("Discovery v5 is not initialized")
357357
}

les/enr_entry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) {
5656

5757
// Enable DHT.
5858
if eth.udpEnabled {
59-
it.AddSource(eth.p2pServer.DiscV5.RandomNodes())
59+
it.AddSource(eth.p2pServer.DiscoveryV5().RandomNodes())
6060
}
6161

6262
forkFilter := forkid.NewFilter(eth.blockchain)

les/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ func (s *LesServer) Start() error {
196196
s.handler.start()
197197
s.wg.Add(1)
198198
go s.capacityManagement()
199-
if s.p2pSrv.DiscV5 != nil {
200-
s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
199+
if discv5 := s.p2pSrv.DiscoveryV5(); discv5 != nil {
200+
discv5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
201201
}
202202
return nil
203203
}

node/api.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/ethereum/go-ethereum/internal/debug"
2727
"github.com/ethereum/go-ethereum/log"
2828
"github.com/ethereum/go-ethereum/p2p"
29+
"github.com/ethereum/go-ethereum/p2p/discover"
2930
"github.com/ethereum/go-ethereum/p2p/enode"
3031
"github.com/ethereum/go-ethereum/rpc"
3132
)
@@ -39,6 +40,9 @@ func (n *Node) apis() []rpc.API {
3940
}, {
4041
Namespace: "debug",
4142
Service: debug.Handler,
43+
}, {
44+
Namespace: "debug",
45+
Service: &p2pDebugAPI{n},
4246
}, {
4347
Namespace: "web3",
4448
Service: &web3API{n},
@@ -327,3 +331,16 @@ func (s *web3API) ClientVersion() string {
327331
func (s *web3API) Sha3(input hexutil.Bytes) hexutil.Bytes {
328332
return crypto.Keccak256(input)
329333
}
334+
335+
// p2pDebugAPI provides access to p2p internals for debugging.
336+
type p2pDebugAPI struct {
337+
stack *Node
338+
}
339+
340+
func (s *p2pDebugAPI) DiscoveryV4Table() [][]discover.BucketNode {
341+
disc := s.stack.server.DiscoveryV4()
342+
if disc != nil {
343+
return disc.TableBuckets()
344+
}
345+
return nil
346+
}

p2p/discover/common.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ package discover
1818

1919
import (
2020
"crypto/ecdsa"
21+
crand "crypto/rand"
22+
"encoding/binary"
23+
"math/rand"
2124
"net"
25+
"sync"
2226
"time"
2327

2428
"github.com/ethereum/go-ethereum/common/mclock"
@@ -63,7 +67,7 @@ func (cfg Config) withDefaults() Config {
6367
// Node table configuration:
6468
if cfg.PingInterval == 0 {
6569
// MODIFIED by Jakub Pajek (reduce disc traffic)
66-
//cfg.PingInterval = 10 * time.Second
70+
//cfg.PingInterval = 3 * time.Second
6771
cfg.PingInterval = 30 * time.Second
6872
}
6973
if cfg.RefreshInterval == 0 {
@@ -95,9 +99,57 @@ type ReadPacket struct {
9599
Addr *net.UDPAddr
96100
}
97101

102+
type randomSource interface {
103+
Intn(int) int
104+
Int63n(int64) int64
105+
Shuffle(int, func(int, int))
106+
}
107+
108+
// reseedingRandom is a random number generator that tracks when it was last re-seeded.
109+
type reseedingRandom struct {
110+
mu sync.Mutex
111+
cur *rand.Rand
112+
}
113+
114+
func (r *reseedingRandom) seed() {
115+
var b [8]byte
116+
crand.Read(b[:])
117+
seed := binary.BigEndian.Uint64(b[:])
118+
new := rand.New(rand.NewSource(int64(seed)))
119+
120+
r.mu.Lock()
121+
r.cur = new
122+
r.mu.Unlock()
123+
}
124+
125+
func (r *reseedingRandom) Intn(n int) int {
126+
r.mu.Lock()
127+
defer r.mu.Unlock()
128+
return r.cur.Intn(n)
129+
}
130+
131+
func (r *reseedingRandom) Int63n(n int64) int64 {
132+
r.mu.Lock()
133+
defer r.mu.Unlock()
134+
return r.cur.Int63n(n)
135+
}
136+
137+
func (r *reseedingRandom) Shuffle(n int, swap func(i, j int)) {
138+
r.mu.Lock()
139+
defer r.mu.Unlock()
140+
r.cur.Shuffle(n, swap)
141+
}
142+
98143
func min(x, y int) int {
99144
if x > y {
100145
return y
101146
}
102147
return x
103148
}
149+
150+
func min64(x, y int64) int64 {
151+
if x > y {
152+
return y
153+
}
154+
return x
155+
}

p2p/discover/lookup.go

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (it *lookup) startQueries() bool {
133133
}
134134

135135
// MODIFIED by Jakub Pajek (reduce disc traffic)
136-
//func (it *lookup) slowdown() {
136+
// func (it *lookup) slowdown() {
137137
func (it *lookup) slowdown(d time.Duration) {
138138
// MODIFIED by Jakub Pajek (reduce disc traffic)
139139
//sleep := time.NewTimer(1 * time.Second)
@@ -148,32 +148,13 @@ func (it *lookup) slowdown(d time.Duration) {
148148
func (it *lookup) query(n *node, reply chan<- []*node) {
149149
// ADDED by Jakub Pajek (reduce disc traffic)
150150
it.slowdown(1 * time.Second)
151-
fails := it.tab.db.FindFails(n.ID(), n.IP())
152151
r, err := it.queryfunc(n)
153-
if errors.Is(err, errClosed) {
154-
// Avoid recording failures on shutdown.
155-
reply <- nil
156-
return
157-
} else if len(r) == 0 {
158-
fails++
159-
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
160-
// Remove the node from the local table if it fails to return anything useful too
161-
// many times, but only if there are enough other nodes in the bucket.
162-
dropped := false
163-
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
164-
dropped = true
165-
it.tab.delete(n)
152+
if !errors.Is(err, errClosed) { // avoid recording failures on shutdown.
153+
success := len(r) > 0
154+
it.tab.trackRequest(n, success, r)
155+
if err != nil {
156+
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "err", err)
166157
}
167-
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
168-
} else if fails > 0 {
169-
// Reset failure counter because it counts _consecutive_ failures.
170-
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
171-
}
172-
173-
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
174-
// just remove those again during revalidation.
175-
for _, n := range r {
176-
it.tab.addSeenNode(n)
177158
}
178159
reply <- r
179160
}

p2p/discover/node.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,22 @@ import (
2929
"github.com/ethereum/go-ethereum/p2p/enode"
3030
)
3131

32+
type BucketNode struct {
33+
Node *enode.Node `json:"node"`
34+
AddedToTable time.Time `json:"addedToTable"`
35+
AddedToBucket time.Time `json:"addedToBucket"`
36+
Checks int `json:"checks"`
37+
Live bool `json:"live"`
38+
}
39+
3240
// node represents a host on the network.
3341
// The fields of Node may not be modified.
3442
type node struct {
35-
enode.Node
36-
addedAt time.Time // time when the node was added to the table
37-
livenessChecks uint // how often liveness was checked
43+
*enode.Node
44+
addedToTable time.Time // first time node was added to bucket or replacement list
45+
addedToBucket time.Time // time it was added in the actual bucket
46+
livenessChecks uint // how often liveness was checked
47+
isValidatedLive bool // true if existence of node is considered validated right now
3848
}
3949

4050
type encPubkey [64]byte
@@ -65,7 +75,7 @@ func (e encPubkey) id() enode.ID {
6575
}
6676

6777
func wrapNode(n *enode.Node) *node {
68-
return &node{Node: *n}
78+
return &node{Node: n}
6979
}
7080

7181
func wrapNodes(ns []*enode.Node) []*node {
@@ -77,7 +87,7 @@ func wrapNodes(ns []*enode.Node) []*node {
7787
}
7888

7989
func unwrapNode(n *node) *enode.Node {
80-
return &n.Node
90+
return n.Node
8191
}
8292

8393
func unwrapNodes(ns []*node) []*enode.Node {

0 commit comments

Comments
 (0)