Skip to content

Commit 65d7070

Browse files
only start hole punching service after the host has a public address
1 parent 7388d1f commit 65d7070

File tree

2 files changed

+126
-32
lines changed

2 files changed

+126
-32
lines changed

p2p/protocol/holepunch/coordination.go

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type Service struct {
5656
closed bool
5757
refCount sync.WaitGroup
5858

59+
hasPublicAddrsChan chan struct{} // this chan is closed as soon as we have a public address
60+
5961
// active hole punches for deduplicating
6062
activeMx sync.Mutex
6163
active map[peer.ID]struct{}
@@ -71,11 +73,12 @@ func NewService(h host.Host, ids identify.IDService, opts ...Option) (*Service,
7173

7274
ctx, cancel := context.WithCancel(context.Background())
7375
hs := &Service{
74-
ctx: ctx,
75-
ctxCancel: cancel,
76-
host: h,
77-
ids: ids,
78-
active: make(map[peer.ID]struct{}),
76+
ctx: ctx,
77+
ctxCancel: cancel,
78+
host: h,
79+
ids: ids,
80+
active: make(map[peer.ID]struct{}),
81+
hasPublicAddrsChan: make(chan struct{}),
7982
}
8083

8184
for _, opt := range opts {
@@ -85,11 +88,47 @@ func NewService(h host.Host, ids identify.IDService, opts ...Option) (*Service,
8588
}
8689
}
8790

88-
h.SetStreamHandler(Protocol, hs.handleNewStream)
91+
hs.refCount.Add(1)
92+
go hs.watchForPublicAddr()
93+
8994
h.Network().Notify((*netNotifiee)(hs))
9095
return hs, nil
9196
}
9297

98+
func (hs *Service) watchForPublicAddr() {
99+
defer hs.refCount.Done()
100+
101+
log.Debug("waiting until we have at least one public address", "peer", hs.host.ID())
102+
103+
// TODO: We should have an event here that fires when identify discovers a new
104+
// address (and when autonat confirms that address).
105+
// As we currently don't have an event like this, just check our observed addresses
106+
// regularly (exponential backoff starting at 250 ms, capped at 5s).
107+
duration := 250 * time.Millisecond
108+
const maxDuration = 5 * time.Second
109+
t := time.NewTimer(duration)
110+
defer t.Stop()
111+
for {
112+
if containsPublicAddr(hs.ids.OwnObservedAddrs()) {
113+
log.Debug("Host now has a public address. Starting holepunch protocol.")
114+
hs.host.SetStreamHandler(Protocol, hs.handleNewStream)
115+
close(hs.hasPublicAddrsChan)
116+
return
117+
}
118+
119+
select {
120+
case <-hs.ctx.Done():
121+
return
122+
case <-t.C:
123+
duration *= 2
124+
if duration > maxDuration {
125+
duration = maxDuration
126+
}
127+
t.Reset(duration)
128+
}
129+
}
130+
}
131+
93132
// Close closes the Hole Punch Service.
94133
func (hs *Service) Close() error {
95134
hs.closeMx.Lock()
@@ -176,7 +215,6 @@ func (hs *Service) beginDirectConnect(p peer.ID) error {
176215
// It first attempts a direct dial (if we have a public address of that peer), and then
177216
// coordinates a hole punch over the given relay connection.
178217
func (hs *Service) DirectConnect(p peer.ID) error {
179-
log.Debugw("got inbound proxy conn", "peer", p)
180218
if err := hs.beginDirectConnect(p); err != nil {
181219
return err
182220
}
@@ -221,8 +259,16 @@ func (hs *Service) directConnect(rp peer.ID) error {
221259
}
222260
}
223261

224-
if len(hs.ids.OwnObservedAddrs()) == 0 {
262+
log.Debugw("got inbound proxy conn", "peer", rp)
263+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
264+
defer cancel()
265+
select {
266+
case <-hs.ctx.Done():
267+
return hs.ctx.Err()
268+
case <-ctx.Done():
269+
log.Debug("didn't find any public host address")
225270
return errors.New("can't initiate hole punch, as we don't have any public addresses")
271+
case <-hs.hasPublicAddrsChan:
226272
}
227273

228274
// hole punch
@@ -341,11 +387,6 @@ func (hs *Service) handleNewStream(s network.Stream) {
341387
err = hs.holePunchConnect(pi, false)
342388
dt := time.Since(start)
343389
hs.tracer.EndHolePunch(rp, dt, err)
344-
if err != nil {
345-
log.Debugw("hole punching failed", "peer", rp, "time", dt, "error", err)
346-
} else {
347-
log.Debugw("hole punching succeeded", "peer", rp, "time", dt)
348-
}
349390
}
350391

351392
func (hs *Service) holePunchConnect(pi peer.AddrInfo, isClient bool) error {
@@ -363,6 +404,16 @@ func (hs *Service) holePunchConnect(pi peer.AddrInfo, isClient bool) error {
363404
return nil
364405
}
365406

407+
func containsPublicAddr(addrs []ma.Multiaddr) bool {
408+
for _, addr := range addrs {
409+
if isRelayAddress(addr) || !manet.IsPublicAddr(addr) {
410+
continue
411+
}
412+
return true
413+
}
414+
return false
415+
}
416+
366417
func removeRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
367418
result := make([]ma.Multiaddr, 0, len(addrs))
368419
for _, addr := range addrs {
@@ -414,6 +465,7 @@ func (nn *netNotifiee) Connected(_ network.Network, conn network.Conn) {
414465
// that we can dial to for a hole punch.
415466
case <-hs.ids.IdentifyWait(conn):
416467
case <-hs.ctx.Done():
468+
return
417469
}
418470

419471
_ = hs.DirectConnect(conn.RemotePeer())

p2p/protocol/holepunch/coordination_test.go

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,22 @@ func (m *mockEventTracer) getEvents() []*holepunch.Event {
4747

4848
var _ holepunch.EventTracer = &mockEventTracer{}
4949

50+
type mockIDService struct {
51+
identify.IDService
52+
}
53+
54+
var _ identify.IDService = &mockIDService{}
55+
56+
func newMockIDService(t *testing.T, h host.Host) identify.IDService {
57+
ids, err := identify.NewIDService(h)
58+
require.NoError(t, err)
59+
return &mockIDService{IDService: ids}
60+
}
61+
62+
func (s *mockIDService) OwnObservedAddrs() []ma.Multiaddr {
63+
return append(s.IDService.OwnObservedAddrs(), ma.StringCast("/ip4/1.1.1.1/tcp/1234"))
64+
}
65+
5066
func TestNoHolePunchIfDirectConnExists(t *testing.T) {
5167
tr := &mockEventTracer{}
5268
h1, hps := mkHostWithHolePunchSvc(t, holepunch.WithTracer(tr))
@@ -95,7 +111,7 @@ func TestDirectDialWorks(t *testing.T) {
95111

96112
func TestEndToEndSimConnect(t *testing.T) {
97113
tr := &mockEventTracer{}
98-
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), true)
114+
h1, h2, relay, _ := makeRelayedHosts(t, nil, holepunch.WithTracer(tr), true)
99115
defer h1.Close()
100116
defer h2.Close()
101117
defer relay.Close()
@@ -158,11 +174,11 @@ func TestFailuresOnInitiator(t *testing.T) {
158174
}
159175

160176
tr := &mockEventTracer{}
161-
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), false)
177+
h1, h2, relay, _ := makeRelayedHosts(t, nil, nil, false)
162178
defer h1.Close()
163179
defer h2.Close()
164180
defer relay.Close()
165-
hps := addHolePunchService(t, h2)
181+
hps := addHolePunchService(t, h2, holepunch.WithTracer(tr))
166182

167183
if tc.rhandler != nil {
168184
h1.SetStreamHandler(holepunch.Protocol, tc.rhandler)
@@ -180,6 +196,14 @@ func TestFailuresOnInitiator(t *testing.T) {
180196
}
181197
}
182198

199+
func addrsToBytes(as []ma.Multiaddr) [][]byte {
200+
bzs := make([][]byte, 0, len(as))
201+
for _, a := range as {
202+
bzs = append(bzs, a.Bytes())
203+
}
204+
return bzs
205+
}
206+
183207
func TestFailuresOnResponder(t *testing.T) {
184208
tcs := map[string]struct {
185209
initiator func(s network.Stream)
@@ -192,22 +216,36 @@ func TestFailuresOnResponder(t *testing.T) {
192216
},
193217
errMsg: "expected CONNECT message",
194218
},
195-
"initiator does NOT send a SYNC message after a Connect message": {
219+
"initiator does NOT send a SYNC message after a CONNECT message": {
196220
initiator: func(s network.Stream) {
197221
w := protoio.NewDelimitedWriter(s)
198-
w.WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()})
222+
w.WriteMsg(&holepunch_pb.HolePunch{
223+
Type: holepunch_pb.HolePunch_CONNECT.Enum(),
224+
ObsAddrs: addrsToBytes([]ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/1234")}),
225+
})
199226
w.WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()})
200227
},
201228
errMsg: "expected SYNC message",
202229
},
203230
"initiator does NOT reply within hole punch deadline": {
204231
holePunchTimeout: 10 * time.Millisecond,
205232
initiator: func(s network.Stream) {
206-
protoio.NewDelimitedWriter(s).WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()})
233+
protoio.NewDelimitedWriter(s).WriteMsg(&holepunch_pb.HolePunch{
234+
Type: holepunch_pb.HolePunch_CONNECT.Enum(),
235+
ObsAddrs: addrsToBytes([]ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/1234")}),
236+
})
207237
time.Sleep(10 * time.Second)
208238
},
209239
errMsg: "i/o deadline reached",
210240
},
241+
"initiator does NOT send any addresses in CONNECT": {
242+
holePunchTimeout: 10 * time.Millisecond,
243+
initiator: func(s network.Stream) {
244+
protoio.NewDelimitedWriter(s).WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()})
245+
time.Sleep(10 * time.Second)
246+
},
247+
errMsg: "expected CONNECT message to contain at least one message",
248+
},
211249
}
212250

213251
for name, tc := range tcs {
@@ -219,7 +257,7 @@ func TestFailuresOnResponder(t *testing.T) {
219257
}
220258

221259
tr := &mockEventTracer{}
222-
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), false)
260+
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), nil, false)
223261
defer h1.Close()
224262
defer h2.Close()
225263
defer relay.Close()
@@ -293,7 +331,7 @@ func ensureDirectConn(t *testing.T, h1, h2 host.Host) {
293331
}, 5*time.Second, 50*time.Millisecond)
294332
}
295333

296-
func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Host) host.Host {
334+
func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
297335
if race.WithRace() {
298336
t.Skip("modifying manet.Private4 is racy")
299337
}
@@ -327,19 +365,23 @@ func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Hos
327365
return h
328366
}
329367

330-
func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option, addHolePuncher bool) (h1, h2, relay host.Host, hps *holepunch.Service) {
368+
func makeRelayedHosts(t *testing.T, h1opt, h2opt holepunch.Option, addHolePuncher bool) (h1, h2, relay host.Host, hps *holepunch.Service) {
331369
t.Helper()
332-
h1, _ = mkHostWithHolePunchSvc(t, h1Opt)
370+
var h1opts []holepunch.Option
371+
if h1opt != nil {
372+
h1opts = append(h1opts, h1opt)
373+
}
374+
h1, _ = mkHostWithHolePunchSvc(t, h1opts...)
333375
var err error
334376
relay, err = libp2p.New(libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")), libp2p.DisableRelay())
335377
require.NoError(t, err)
336378

337379
_, err = relayv1.NewRelay(relay)
338380
require.NoError(t, err)
339381

340-
h2 = mkHostWithStaticAutoRelay(t, context.Background(), relay)
382+
h2 = mkHostWithStaticAutoRelay(t, relay)
341383
if addHolePuncher {
342-
hps = addHolePunchService(t, h2)
384+
hps = addHolePunchService(t, h2, h2opt)
343385
}
344386

345387
// h1 has a relay addr
@@ -359,11 +401,13 @@ func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option, addHolePuncher bool)
359401
return
360402
}
361403

362-
func addHolePunchService(t *testing.T, h host.Host) *holepunch.Service {
404+
func addHolePunchService(t *testing.T, h host.Host, opt holepunch.Option) *holepunch.Service {
363405
t.Helper()
364-
ids, err := identify.NewIDService(h)
365-
require.NoError(t, err)
366-
hps, err := holepunch.NewService(h, ids)
406+
var opts []holepunch.Option
407+
if opt != nil {
408+
opts = append(opts, opt)
409+
}
410+
hps, err := holepunch.NewService(h, newMockIDService(t, h), opts...)
367411
require.NoError(t, err)
368412
return hps
369413
}
@@ -372,9 +416,7 @@ func mkHostWithHolePunchSvc(t *testing.T, opts ...holepunch.Option) (host.Host,
372416
t.Helper()
373417
h, err := libp2p.New(libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0"), ma.StringCast("/ip6/::1/tcp/0")))
374418
require.NoError(t, err)
375-
ids, err := identify.NewIDService(h)
376-
require.NoError(t, err)
377-
hps, err := holepunch.NewService(h, ids, opts...)
419+
hps, err := holepunch.NewService(h, newMockIDService(t, h), opts...)
378420
require.NoError(t, err)
379421
return h, hps
380422
}

0 commit comments

Comments
 (0)