Skip to content

Commit 5cf0cfd

Browse files
committed
les: handle disconnect logic more safely
1 parent 3f8b3a5 commit 5cf0cfd

File tree

1 file changed

+68
-62
lines changed

1 file changed

+68
-62
lines changed

les/serverpool.go

Lines changed: 68 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,23 @@ const (
8989

9090
// connReq represents a request for peer connection.
9191
type connReq struct {
92-
p *peer
93-
ip net.IP
94-
port uint16
95-
cont chan *poolEntry
92+
p *peer
93+
ip net.IP
94+
port uint16
95+
result chan *poolEntry
9696
}
9797

98-
// discReq represents a request for peer disconnection.
99-
type discReq struct {
100-
entry *poolEntry
101-
cont chan struct{}
98+
// disconnReq represents a request for peer disconnection.
99+
type disconnReq struct {
100+
entry *poolEntry
101+
stopped bool
102+
done chan struct{}
102103
}
103104

104105
// registerReq represents a request for peer registration.
105106
type registerReq struct {
106107
entry *poolEntry
107-
cont chan struct{}
108+
done chan struct{}
108109
}
109110

110111
// serverPool implements a pool for storing and selecting newly discovered and already
@@ -129,7 +130,7 @@ type serverPool struct {
129130
adjustStats chan poolStatAdjust
130131

131132
connCh chan *connReq
132-
discCh chan *discReq
133+
disconnCh chan *disconnReq
133134
registerCh chan *registerReq
134135

135136
knownQueue, newQueue poolEntryQueue
@@ -149,7 +150,7 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s
149150
adjustStats: make(chan poolStatAdjust, 100),
150151
enableRetry: make(chan *poolEntry, 1),
151152
connCh: make(chan *connReq),
152-
discCh: make(chan *discReq),
153+
disconnCh: make(chan *disconnReq),
153154
registerCh: make(chan *registerReq),
154155
knownSelect: newWeightedRandomSelect(),
155156
newSelect: newWeightedRandomSelect(),
@@ -184,62 +185,43 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
184185
// disconnect should also always be called.
185186
func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
186187
log.Debug("Connect new entry", "enode", p.id)
187-
req := &connReq{p: p, ip: ip, port: port, cont: make(chan *poolEntry, 1)}
188+
req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)}
188189
select {
189190
case pool.connCh <- req:
190191
case <-pool.quit:
191192
return nil
192193
}
193-
return <-req.cont
194+
return <-req.result
194195
}
195196

196197
// registered should be called after a successful handshake
197198
func (pool *serverPool) registered(entry *poolEntry) {
198199
log.Debug("Registered new entry", "enode", entry.id)
199-
req := &registerReq{entry: entry, cont: make(chan struct{})}
200+
req := &registerReq{entry: entry, done: make(chan struct{})}
200201
select {
201202
case pool.registerCh <- req:
202203
case <-pool.quit:
203204
return
204205
}
205-
<-req.cont
206-
return
206+
<-req.done
207207
}
208208

209209
// disconnect should be called when ending a connection. Service quality statistics
210210
// can be updated optionally (not updated if no registration happened, in this case
211211
// only connection statistics are updated, just like in case of timeout)
212212
func (pool *serverPool) disconnect(entry *poolEntry) {
213-
log.Debug("Disconnected old entry", "enode", entry.id)
214213
stopped := false
215214
select {
216215
case <-pool.quit:
217216
stopped = true
218217
default:
219218
}
219+
log.Debug("Disconnected old entry", "enode", entry.id)
220+
req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
220221

221-
if stopped {
222-
// Request is emitted by ourselves, handle the logic here since eventloop doesn't
223-
// serve requests anymore.
224-
if entry.state == psRegistered {
225-
connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
226-
if connAdjust > 1 {
227-
connAdjust = 1
228-
}
229-
entry.connectStats.add(1, connAdjust)
230-
}
231-
entry.state = psNotConnected
232-
pool.connWg.Done()
233-
} else {
234-
// Request is emitted by the server side
235-
req := &discReq{entry: entry, cont: make(chan struct{})}
236-
select {
237-
case pool.discCh <- req:
238-
case <-pool.quit:
239-
return
240-
}
241-
<-req.cont
242-
}
222+
// Block until disconnection request be served.
223+
pool.disconnCh <- req
224+
<-req.done
243225
}
244226

245227
const (
@@ -282,6 +264,37 @@ func (pool *serverPool) eventLoop() {
282264
if pool.discSetPeriod != nil {
283265
pool.discSetPeriod <- time.Millisecond * 100
284266
}
267+
268+
// disconnect updates service quality statistics depending on the connection time
269+
// and disconnection initiator.
270+
disconnect := func(req *disconnReq, stopped bool) {
271+
// Handle peer disconnection requests.
272+
entry := req.entry
273+
if entry.state == psRegistered {
274+
connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
275+
if connAdjust > 1 {
276+
connAdjust = 1
277+
}
278+
if stopped {
279+
// disconnect requested by ourselves.
280+
entry.connectStats.add(1, connAdjust)
281+
} else {
282+
// disconnect requested by server side.
283+
entry.connectStats.add(connAdjust, 1)
284+
}
285+
}
286+
entry.state = psNotConnected
287+
288+
if entry.knownSelected {
289+
pool.knownSelected--
290+
} else {
291+
pool.newSelected--
292+
}
293+
pool.setRetryDial(entry)
294+
pool.connWg.Done()
295+
close(req.done)
296+
}
297+
285298
for {
286299
select {
287300
case entry := <-pool.timeout:
@@ -331,7 +344,7 @@ func (pool *serverPool) eventLoop() {
331344
entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port)
332345
}
333346
if entry.state == psConnected || entry.state == psRegistered {
334-
req.cont <- nil
347+
req.result <- nil
335348
continue
336349
}
337350
pool.connWg.Add(1)
@@ -347,7 +360,7 @@ func (pool *serverPool) eventLoop() {
347360
entry.addr[addr.strKey()] = addr
348361
entry.addrSelect = *newWeightedRandomSelect()
349362
entry.addrSelect.update(addr)
350-
req.cont <- entry
363+
req.result <- entry
351364

352365
case req := <-pool.registerCh:
353366
// Handle peer registration requests.
@@ -360,34 +373,27 @@ func (pool *serverPool) eventLoop() {
360373
}
361374
pool.knownQueue.setLatest(entry)
362375
entry.shortRetry = shortRetryCnt
363-
close(req.cont)
376+
close(req.done)
364377

365-
case req := <-pool.discCh:
378+
case req := <-pool.disconnCh:
366379
// Handle peer disconnection requests.
367-
entry := req.entry
368-
if entry.state == psRegistered {
369-
connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
370-
if connAdjust > 1 {
371-
connAdjust = 1
372-
}
373-
entry.connectStats.add(connAdjust, 1)
374-
}
375-
entry.state = psNotConnected
376-
377-
if entry.knownSelected {
378-
pool.knownSelected--
379-
} else {
380-
pool.newSelected--
381-
}
382-
pool.setRetryDial(entry)
383-
pool.connWg.Done()
384-
close(req.cont)
380+
disconnect(req, req.stopped)
385381

386382
case <-pool.quit:
387383
if pool.discSetPeriod != nil {
388384
close(pool.discSetPeriod)
389385
}
390-
pool.connWg.Wait()
386+
387+
// Spawn a goroutine to close the disconnCh until all connections are disconnected.
388+
go func() {
389+
pool.connWg.Wait()
390+
close(pool.disconnCh)
391+
}()
392+
393+
// Handle all remain disconnection requests before exit.
394+
for req := range pool.disconnCh {
395+
disconnect(req, true)
396+
}
391397
pool.saveNodes()
392398
pool.wg.Done()
393399
return

0 commit comments

Comments
 (0)