Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"sync"
"time"
)

// Controller for sleep with backoff when the client attempts reconnection
// It has statuses for each situations cause reconnection.
type backoffController struct {
sync.RWMutex
statusMap map[string]*backoffStatus
}

type backoffStatus struct {
lastSleepPeriod time.Duration
lastErrorTime time.Time
}

func newBackoffController() *backoffController {
return &backoffController{
statusMap: map[string]*backoffStatus{},
}
}

// Calculate next sleep period from the specified parameters.
// Returned values are next sleep period and whether the error situation is continual.
// If connection errors continuouslly occurs, its sleep period is exponentially increased.
// Also if there is a lot of time between last and this error, sleep period is initialized.
func (b *backoffController) getBackoffSleepTime(
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
) (time.Duration, bool) {
// Decide first sleep time if the situation is not continual.
var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) {
if skip {
status.lastSleepPeriod = 0
return 0, false
}
status.lastSleepPeriod = init
return init, false
}

// Prioritize maxSleep.
if initSleepPeriod > maxSleepPeriod {
initSleepPeriod = maxSleepPeriod
}
b.Lock()
defer b.Unlock()

status, exist := b.statusMap[situation]
if !exist {
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst)
}

oldTime := status.lastErrorTime
status.lastErrorTime = time.Now()

// When there is a lot of time between last and this error, sleep period is initialized.
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
return firstProcess(status, initSleepPeriod, skipFirst)
}

if status.lastSleepPeriod == 0 {
status.lastSleepPeriod = initSleepPeriod
return initSleepPeriod, true
}

if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
status.lastSleepPeriod = nextSleepPeriod
} else {
status.lastSleepPeriod = maxSleepPeriod
}

return status.lastSleepPeriod, true
}

// Execute sleep the time returned from getBackoffSleepTime.
func (b *backoffController) sleepWithBackoff(
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
) (time.Duration, bool) {
sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst)
if sleep != 0 {
time.Sleep(sleep)
}
return sleep, isFirst
}
68 changes: 68 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"testing"
"time"
)

func TestGetBackoffSleepTime(t *testing.T) {
// Test for adding new situation
controller := newBackoffController()
if s, c := controller.getBackoffSleepTime("not-exist", 1 * time.Second, 5 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
t.Errorf("When new situation is added, period should be initSleepPeriod and naturally it shouldn't be continual error. s:%d c%t", s, c)
}

// Test for the continual error in the same situation and suppression of sleep period by maxSleepPeriod
controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 20 * time.Second) && c) {
t.Errorf("When same situation is called again, period should be increased and it should be regarded as a continual error. s:%d c%t", s, c)
}
if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 30 * time.Second) && c) {
t.Errorf("A same situation is called three times. 10 * 2 * 2 = 40 but maxSleepPeriod is 30. So the next period should be 30. s:%d c%t", s, c)
}

// Test for initialization by elapsed time.
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false)
time.Sleep((1 * 2 + 1 * 2 + 1) * time.Second)
if s, c := controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) {
t.Errorf("Initialization should be triggered by elapsed time. s:%d c%t", s, c)
}

// Test when initial and max period is same.
controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
t.Errorf("Sleep time should be always 2. s:%d c%t", s, c)
}

// Test when initial period > max period.
controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false)
if s, c := controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) {
t.Errorf("Sleep time should be 2. s:%d c%t", s, c)
}

// Test when first sleep is skipped.
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 0) && !c) {
t.Errorf("Sleep time should be 0 because of skip. s:%d c%t", s, c)
}
if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 3 * time.Second) && c) {
t.Errorf("Sleep time should be 3. s:%d c%t", s, c)
}
}
22 changes: 12 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type client struct {
stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)

backoff *backoffController
}

// NewClient will create an MQTT v3.1.1 client with all of the options specified
Expand Down Expand Up @@ -169,6 +171,7 @@ func NewClient(o *ClientOptions) Client {
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
c.obound = make(chan *PacketAndToken)
c.oboundP = make(chan *PacketAndToken)
c.backoff = newBackoffController()
return c
}

Expand Down Expand Up @@ -302,10 +305,16 @@ func (c *client) Connect() Token {
func (c *client) reconnect(connectionUp connCompletedFn) {
DEBUG.Println(CLI, "enter reconnect")
var (
sleep = 1 * time.Second
initSleep = 1 * time.Second
conn net.Conn
)

// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
// Sleep time is exponentially increased as the same situation continues
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual {
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
}

for {
if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options)
Expand All @@ -315,15 +324,8 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
if err == nil {
break
}
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
time.Sleep(sleep)
if sleep < c.options.MaxReconnectInterval {
sleep *= 2
}

if sleep > c.options.MaxReconnectInterval {
sleep = c.options.MaxReconnectInterval
}
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)

if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
if err := connectionUp(false); err != nil { // Should always return an error
Expand Down