Skip to content

Commit bd5d54e

Browse files
Merge pull request #1267 from maticnetwork/psp-p2p-upstream
p2p: cherry-pick commits from geth for peering issues
2 parents d67620c + 642f552 commit bd5d54e

28 files changed

+1877
-1064
lines changed

cmd/devp2p/discv4cmd.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"net"
23+
"net/http"
2324
"strconv"
2425
"strings"
2526
"time"
@@ -30,9 +31,11 @@ import (
3031
"github.com/ethereum/go-ethereum/common"
3132
"github.com/ethereum/go-ethereum/crypto"
3233
"github.com/ethereum/go-ethereum/internal/flags"
34+
"github.com/ethereum/go-ethereum/log"
3335
"github.com/ethereum/go-ethereum/p2p/discover"
3436
"github.com/ethereum/go-ethereum/p2p/enode"
3537
"github.com/ethereum/go-ethereum/params"
38+
"github.com/ethereum/go-ethereum/rpc"
3639
)
3740

3841
var (
@@ -46,6 +49,7 @@ var (
4649
discv4ResolveJSONCommand,
4750
discv4CrawlCommand,
4851
discv4TestCommand,
52+
discv4ListenCommand,
4953
},
5054
}
5155
discv4PingCommand = &cli.Command{
@@ -76,6 +80,14 @@ var (
7680
Flags: discoveryNodeFlags,
7781
ArgsUsage: "<nodes.json file>",
7882
}
83+
discv4ListenCommand = &cli.Command{
84+
Name: "listen",
85+
Usage: "Runs a discovery node",
86+
Action: discv4Listen,
87+
Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{
88+
httpAddrFlag,
89+
}),
90+
}
7991
discv4CrawlCommand = &cli.Command{
8092
Name: "crawl",
8193
Usage: "Updates a nodes.json file with random nodes found in the DHT",
@@ -132,6 +144,10 @@ var (
132144
Usage: "Enode of the remote node under test",
133145
EnvVars: []string{"REMOTE_ENODE"},
134146
}
147+
httpAddrFlag = &cli.StringFlag{
148+
Name: "rpc",
149+
Usage: "HTTP server listening address",
150+
}
135151
)
136152

137153
var discoveryNodeFlags = []cli.Flag{
@@ -158,6 +174,27 @@ func discv4Ping(ctx *cli.Context) error {
158174
return nil
159175
}
160176

177+
func discv4Listen(ctx *cli.Context) error {
178+
disc, _ := startV4(ctx)
179+
defer disc.Close()
180+
181+
fmt.Println(disc.Self())
182+
183+
httpAddr := ctx.String(httpAddrFlag.Name)
184+
if httpAddr == "" {
185+
// Non-HTTP mode.
186+
select {}
187+
}
188+
189+
api := &discv4API{disc}
190+
log.Info("Starting RPC API server", "addr", httpAddr)
191+
srv := rpc.NewServer("", 0, 0)
192+
srv.RegisterName("discv4", api)
193+
http.DefaultServeMux.Handle("/", srv)
194+
httpsrv := http.Server{Addr: httpAddr, Handler: http.DefaultServeMux}
195+
return httpsrv.ListenAndServe()
196+
}
197+
161198
func discv4RequestRecord(ctx *cli.Context) error {
162199
n := getNodeArg(ctx)
163200
disc, _ := startV4(ctx)
@@ -401,3 +438,23 @@ func parseBootnodes(ctx *cli.Context) ([]*enode.Node, error) {
401438

402439
return nodes, nil
403440
}
441+
442+
type discv4API struct {
443+
host *discover.UDPv4
444+
}
445+
446+
func (api *discv4API) LookupRandom(n int) (ns []*enode.Node) {
447+
it := api.host.RandomNodes()
448+
for len(ns) < n && it.Next() {
449+
ns = append(ns, it.Node())
450+
}
451+
return ns
452+
}
453+
454+
func (api *discv4API) Buckets() [][]discover.BucketNode {
455+
return api.host.TableBuckets()
456+
}
457+
458+
func (api *discv4API) Self() *enode.Node {
459+
return api.host.Self()
460+
}

cmd/devp2p/internal/v4test/framework.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (te *testenv) localEndpoint(c net.PacketConn) v4wire.Endpoint {
128128
}
129129

130130
func (te *testenv) remoteEndpoint() v4wire.Endpoint {
131-
return v4wire.NewEndpoint(te.remoteAddr, 0)
131+
return v4wire.NewEndpoint(te.remoteAddr.AddrPort(), 0)
132132
}
133133

134134
func contains(ns []v4wire.Node, key v4wire.Pubkey) bool {

internal/testlog/testlog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (h *bufHandler) Handle(_ context.Context, r slog.Record) error {
5555
}
5656

5757
func (h *bufHandler) Enabled(_ context.Context, lvl slog.Level) bool {
58-
return lvl <= h.level
58+
return lvl >= h.level
5959
}
6060

6161
func (h *bufHandler) WithAttrs(attrs []slog.Attr) slog.Handler {

node/api.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/ethereum/go-ethereum/internal/debug"
2828
"github.com/ethereum/go-ethereum/log"
2929
"github.com/ethereum/go-ethereum/p2p"
30+
"github.com/ethereum/go-ethereum/p2p/discover"
3031
"github.com/ethereum/go-ethereum/p2p/enode"
3132
"github.com/ethereum/go-ethereum/rpc"
3233
)
@@ -40,6 +41,9 @@ func (n *Node) apis() []rpc.API {
4041
}, {
4142
Namespace: "debug",
4243
Service: debug.Handler,
44+
}, {
45+
Namespace: "debug",
46+
Service: &p2pDebugAPI{n},
4347
}, {
4448
Namespace: "web3",
4549
Service: &web3API{n},
@@ -477,3 +481,16 @@ func (api *adminAPI) SetHttpExecutionPoolSize(n int) *ExecutionPoolSize {
477481

478482
return api.GetExecutionPoolSize()
479483
}
484+
485+
// p2pDebugAPI provides access to p2p internals for debugging.
486+
type p2pDebugAPI struct {
487+
stack *Node
488+
}
489+
490+
func (s *p2pDebugAPI) DiscoveryV4Table() [][]discover.BucketNode {
491+
disc := s.stack.server.DiscoveryV4()
492+
if disc != nil {
493+
return disc.TableBuckets()
494+
}
495+
return nil
496+
}

p2p/dial.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
mrand "math/rand"
2626
"net"
2727
"sync"
28+
"sync/atomic"
2829
"time"
2930

3031
"github.com/ethereum/go-ethereum/common/mclock"
@@ -254,7 +255,7 @@ loop:
254255
}
255256

256257
case task := <-d.doneCh:
257-
id := task.dest.ID()
258+
id := task.dest().ID()
258259
delete(d.dialing, id)
259260
d.updateStaticPool(id)
260261
d.doneSinceLastLog++
@@ -431,7 +432,7 @@ func (d *dialScheduler) startStaticDials(n int) (started int) {
431432
// updateStaticPool attempts to move the given static dial back into staticPool.
432433
func (d *dialScheduler) updateStaticPool(id enode.ID) {
433434
task, ok := d.static[id]
434-
if ok && task.staticPoolIndex < 0 && d.checkDial(task.dest) == nil {
435+
if ok && task.staticPoolIndex < 0 && d.checkDial(task.dest()) == nil {
435436
d.addToStaticPool(task)
436437
}
437438
}
@@ -459,11 +460,11 @@ func (d *dialScheduler) removeFromStaticPool(idx int) {
459460

460461
// startDial runs the given dial task in a separate goroutine.
461462
func (d *dialScheduler) startDial(task *dialTask) {
462-
d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
463-
hkey := string(task.dest.ID().Bytes())
463+
node := task.dest()
464+
d.log.Trace("Starting p2p dial", "id", node.ID(), "ip", node.IP(), "flag", task.flags)
465+
hkey := string(node.ID().Bytes())
464466
d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
465-
d.dialing[task.dest.ID()] = task
466-
467+
d.dialing[node.ID()] = task
467468
go func() {
468469
task.run(d)
469470
d.doneCh <- task
@@ -474,39 +475,46 @@ func (d *dialScheduler) startDial(task *dialTask) {
474475
type dialTask struct {
475476
staticPoolIndex int
476477
flags connFlag
478+
477479
// These fields are private to the task and should not be
478480
// accessed by dialScheduler while the task is running.
479-
dest *enode.Node
481+
destPtr atomic.Pointer[enode.Node]
480482
lastResolved mclock.AbsTime
481483
resolveDelay time.Duration
482484
}
483485

484486
func newDialTask(dest *enode.Node, flags connFlag) *dialTask {
485-
return &dialTask{dest: dest, flags: flags, staticPoolIndex: -1}
487+
t := &dialTask{flags: flags, staticPoolIndex: -1}
488+
t.destPtr.Store(dest)
489+
return t
486490
}
487491

488492
type dialError struct {
489493
error
490494
}
491495

496+
func (t *dialTask) dest() *enode.Node {
497+
return t.destPtr.Load()
498+
}
499+
492500
func (t *dialTask) run(d *dialScheduler) {
493501
if t.needResolve() && !t.resolve(d) {
494502
return
495503
}
496504

497-
err := t.dial(d, t.dest)
505+
err := t.dial(d, t.dest())
498506
if err != nil {
499507
// For static nodes, resolve one more time if dialing fails.
500508
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
501509
if t.resolve(d) {
502-
t.dial(d, t.dest)
510+
t.dial(d, t.dest())
503511
}
504512
}
505513
}
506514
}
507515

508516
func (t *dialTask) needResolve() bool {
509-
return t.flags&staticDialedConn != 0 && t.dest.IP() == nil
517+
return t.flags&staticDialedConn != 0 && t.dest().IP() == nil
510518
}
511519

512520
// resolve attempts to find the current endpoint for the destination
@@ -528,42 +536,41 @@ func (t *dialTask) resolve(d *dialScheduler) bool {
528536
return false
529537
}
530538

531-
resolved := d.resolver.Resolve(t.dest)
539+
node := t.dest()
540+
resolved := d.resolver.Resolve(node)
532541
t.lastResolved = d.clock.Now()
533542

534543
if resolved == nil {
535544
t.resolveDelay *= 2
536545
if t.resolveDelay > maxResolveDelay {
537546
t.resolveDelay = maxResolveDelay
538547
}
539-
540-
d.log.Debug("Resolving node failed", "id", t.dest.ID(), "newdelay", t.resolveDelay)
541-
548+
d.log.Debug("Resolving node failed", "id", node.ID(), "newdelay", t.resolveDelay)
542549
return false
543550
}
544551
// The node was found.
545552
t.resolveDelay = initialResolveDelay
546-
t.dest = resolved
547-
d.log.Debug("Resolved node", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()})
548-
553+
t.destPtr.Store(resolved)
554+
d.log.Debug("Resolved node", "id", resolved.ID(), "addr", &net.TCPAddr{IP: resolved.IP(), Port: resolved.TCP()})
549555
return true
550556
}
551557

552558
// dial performs the actual connection attempt.
553559
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
554560
dialMeter.Mark(1)
555-
fd, err := d.dialer.Dial(d.ctx, t.dest)
561+
fd, err := d.dialer.Dial(d.ctx, dest)
556562
if err != nil {
557-
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
563+
d.log.Trace("Dial error", "id", dest.ID(), "addr", nodeAddr(dest), "conn", t.flags, "err", cleanupDialErr(err))
558564
dialConnectionError.Mark(1)
559565
return &dialError{err}
560566
}
561567
return d.setupFunc(newMeteredConn(fd), t.flags, dest)
562568
}
563569

564570
func (t *dialTask) String() string {
565-
id := t.dest.ID()
566-
return fmt.Sprintf("%v %x %v:%d", t.flags, id[:8], t.dest.IP(), t.dest.TCP())
571+
node := t.dest()
572+
id := node.ID()
573+
return fmt.Sprintf("%v %x %v:%d", t.flags, id[:8], node.IP(), node.TCP())
567574
}
568575

569576
func cleanupDialErr(err error) error {

p2p/discover/common.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ package discover
1818

1919
import (
2020
"crypto/ecdsa"
21+
crand "crypto/rand"
22+
"encoding/binary"
23+
"math/rand"
2124
"net"
25+
"net/netip"
26+
"sync"
2227
"time"
2328

2429
"github.com/ethereum/go-ethereum/common/mclock"
@@ -30,8 +35,8 @@ import (
3035

3136
// UDPConn is a network connection on which discovery can operate.
3237
type UDPConn interface {
33-
ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
34-
WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
38+
ReadFromUDPAddrPort(b []byte) (n int, addr netip.AddrPort, err error)
39+
WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (n int, err error)
3540
Close() error
3641
LocalAddr() net.Addr
3742
}
@@ -62,7 +67,7 @@ type Config struct {
6267
func (cfg Config) withDefaults() Config {
6368
// Node table configuration:
6469
if cfg.PingInterval == 0 {
65-
cfg.PingInterval = 10 * time.Second
70+
cfg.PingInterval = 3 * time.Second
6671
}
6772
if cfg.RefreshInterval == 0 {
6873
cfg.RefreshInterval = 30 * time.Minute
@@ -93,13 +98,46 @@ func ListenUDP(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
9398
// channel if configured.
9499
type ReadPacket struct {
95100
Data []byte
96-
Addr *net.UDPAddr
101+
Addr netip.AddrPort
97102
}
98103

99-
func min(x, y int) int {
100-
if x > y {
101-
return y
102-
}
104+
type randomSource interface {
105+
Intn(int) int
106+
Int63n(int64) int64
107+
Shuffle(int, func(int, int))
108+
}
109+
110+
// reseedingRandom is a random number generator that tracks when it was last re-seeded.
111+
type reseedingRandom struct {
112+
mu sync.Mutex
113+
cur *rand.Rand
114+
}
115+
116+
func (r *reseedingRandom) seed() {
117+
var b [8]byte
118+
crand.Read(b[:])
119+
seed := binary.BigEndian.Uint64(b[:])
120+
new := rand.New(rand.NewSource(int64(seed)))
121+
122+
r.mu.Lock()
123+
r.cur = new
124+
r.mu.Unlock()
125+
}
126+
127+
func (r *reseedingRandom) Intn(n int) int {
128+
r.mu.Lock()
129+
defer r.mu.Unlock()
130+
return r.cur.Intn(n)
131+
}
132+
133+
func (r *reseedingRandom) Int63n(n int64) int64 {
134+
r.mu.Lock()
135+
defer r.mu.Unlock()
136+
return r.cur.Int63n(n)
137+
}
103138

104-
return x
139+
func (r *reseedingRandom) Shuffle(n int, swap func(i, j int)) {
140+
r.mu.Lock()
141+
defer r.mu.Unlock()
142+
r.cur.Shuffle(n, swap)
105143
}

0 commit comments

Comments
 (0)