Skip to content

Commit e3fa503

Browse files
authored
Merge pull request #625 from tomatod/connect_retry_backoff - Prevent reconnect loops
Add back-off controller for sleep time of reconnection when connection lost is detected immediately after connecting. #589 This issue could be caused by an invalid publish request (which leads to the broker dropping the connection immediately).
2 parents 4b066a0 + d174b9a commit e3fa503

File tree

3 files changed

+184
-10
lines changed

3 files changed

+184
-10
lines changed

backoff.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (c) 2021 IBM Corp and others.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v2.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* https://www.eclipse.org/legal/epl-2.0/
10+
* and the Eclipse Distribution License is available at
11+
* http://www.eclipse.org/org/documents/edl-v10.php.
12+
*
13+
* Contributors:
14+
* Matt Brittan
15+
* Daichi Tomaru
16+
*/
17+
18+
package mqtt
19+
20+
import (
21+
"sync"
22+
"time"
23+
)
24+
25+
// Controller for sleep with backoff when the client attempts reconnection
26+
// It has statuses for each situations cause reconnection.
27+
type backoffController struct {
28+
sync.RWMutex
29+
statusMap map[string]*backoffStatus
30+
}
31+
32+
type backoffStatus struct {
33+
lastSleepPeriod time.Duration
34+
lastErrorTime time.Time
35+
}
36+
37+
func newBackoffController() *backoffController {
38+
return &backoffController{
39+
statusMap: map[string]*backoffStatus{},
40+
}
41+
}
42+
43+
// Calculate next sleep period from the specified parameters.
44+
// Returned values are next sleep period and whether the error situation is continual.
45+
// If connection errors continuouslly occurs, its sleep period is exponentially increased.
46+
// Also if there is a lot of time between last and this error, sleep period is initialized.
47+
func (b *backoffController) getBackoffSleepTime(
48+
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
49+
) (time.Duration, bool) {
50+
// Decide first sleep time if the situation is not continual.
51+
var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) {
52+
if skip {
53+
status.lastSleepPeriod = 0
54+
return 0, false
55+
}
56+
status.lastSleepPeriod = init
57+
return init, false
58+
}
59+
60+
// Prioritize maxSleep.
61+
if initSleepPeriod > maxSleepPeriod {
62+
initSleepPeriod = maxSleepPeriod
63+
}
64+
b.Lock()
65+
defer b.Unlock()
66+
67+
status, exist := b.statusMap[situation]
68+
if !exist {
69+
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
70+
return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst)
71+
}
72+
73+
oldTime := status.lastErrorTime
74+
status.lastErrorTime = time.Now()
75+
76+
// When there is a lot of time between last and this error, sleep period is initialized.
77+
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
78+
return firstProcess(status, initSleepPeriod, skipFirst)
79+
}
80+
81+
if status.lastSleepPeriod == 0 {
82+
status.lastSleepPeriod = initSleepPeriod
83+
return initSleepPeriod, true
84+
}
85+
86+
if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
87+
status.lastSleepPeriod = nextSleepPeriod
88+
} else {
89+
status.lastSleepPeriod = maxSleepPeriod
90+
}
91+
92+
return status.lastSleepPeriod, true
93+
}
94+
95+
// Execute sleep the time returned from getBackoffSleepTime.
96+
func (b *backoffController) sleepWithBackoff(
97+
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
98+
) (time.Duration, bool) {
99+
sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst)
100+
if sleep != 0 {
101+
time.Sleep(sleep)
102+
}
103+
return sleep, isFirst
104+
}

backoff_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) 2021 IBM Corp and others.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v2.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* https://www.eclipse.org/legal/epl-2.0/
10+
* and the Eclipse Distribution License is available at
11+
* http://www.eclipse.org/org/documents/edl-v10.php.
12+
*
13+
* Contributors:
14+
* Matt Brittan
15+
* Daichi Tomaru
16+
*/
17+
18+
package mqtt
19+
20+
import (
21+
"testing"
22+
"time"
23+
)
24+
25+
func TestGetBackoffSleepTime(t *testing.T) {
26+
// Test for adding new situation
27+
controller := newBackoffController()
28+
if s, c := controller.getBackoffSleepTime("not-exist", 1 * time.Second, 5 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
29+
t.Errorf("When new situation is added, period should be initSleepPeriod and naturally it shouldn't be continual error. s:%d c%t", s, c)
30+
}
31+
32+
// Test for the continual error in the same situation and suppression of sleep period by maxSleepPeriod
33+
controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false)
34+
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 20 * time.Second) && c) {
35+
t.Errorf("When same situation is called again, period should be increased and it should be regarded as a continual error. s:%d c%t", s, c)
36+
}
37+
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 30 * time.Second) && c) {
38+
t.Errorf("A same situation is called three times. 10 * 2 * 2 = 40 but maxSleepPeriod is 30. So the next period should be 30. s:%d c%t", s, c)
39+
}
40+
41+
// Test for initialization by elapsed time.
42+
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
43+
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
44+
time.Sleep((1 * 2 + 1 * 2 + 1) * time.Second)
45+
if s, c := controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
46+
t.Errorf("Initialization should be triggered by elapsed time. s:%d c%t", s, c)
47+
}
48+
49+
// Test when initial and max period is same.
50+
controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false)
51+
if s, c := controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
52+
t.Errorf("Sleep time should be always 2. s:%d c%t", s, c)
53+
}
54+
55+
// Test when initial period > max period.
56+
controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false)
57+
if s, c := controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
58+
t.Errorf("Sleep time should be 2. s:%d c%t", s, c)
59+
}
60+
61+
// Test when first sleep is skipped.
62+
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 0) && !c) {
63+
t.Errorf("Sleep time should be 0 because of skip. s:%d c%t", s, c)
64+
}
65+
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 3 * time.Second) && c) {
66+
t.Errorf("Sleep time should be 3. s:%d c%t", s, c)
67+
}
68+
}

client.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ type client struct {
141141
stop chan struct{} // Closed to request that workers stop
142142
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
143143
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
144+
145+
backoff *backoffController
144146
}
145147

146148
// NewClient will create an MQTT v3.1.1 client with all of the options specified
@@ -169,6 +171,7 @@ func NewClient(o *ClientOptions) Client {
169171
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
170172
c.obound = make(chan *PacketAndToken)
171173
c.oboundP = make(chan *PacketAndToken)
174+
c.backoff = newBackoffController()
172175
return c
173176
}
174177

@@ -302,10 +305,16 @@ func (c *client) Connect() Token {
302305
func (c *client) reconnect(connectionUp connCompletedFn) {
303306
DEBUG.Println(CLI, "enter reconnect")
304307
var (
305-
sleep = 1 * time.Second
308+
initSleep = 1 * time.Second
306309
conn net.Conn
307310
)
308311

312+
// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
313+
// Sleep time is exponentially increased as the same situation continues
314+
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual {
315+
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
316+
}
317+
309318
for {
310319
if nil != c.options.OnReconnecting {
311320
c.options.OnReconnecting(c, &c.options)
@@ -315,15 +324,8 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
315324
if err == nil {
316325
break
317326
}
318-
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
319-
time.Sleep(sleep)
320-
if sleep < c.options.MaxReconnectInterval {
321-
sleep *= 2
322-
}
323-
324-
if sleep > c.options.MaxReconnectInterval {
325-
sleep = c.options.MaxReconnectInterval
326-
}
327+
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
328+
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
327329

328330
if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
329331
if err := connectionUp(false); err != nil { // Should always return an error

0 commit comments

Comments
 (0)