Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func TestSessionReconnectsIfDispatcherErrors(t *testing.T) {
return fmt.Errorf("expecting 2 closed sessions, got %d", len(closedSessions))
}
return nil
}, 5*time.Second))
}, 10*time.Second))
}

type testSessionTracker struct {
Expand Down
36 changes: 18 additions & 18 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc
ctx := context.Background()
// Start the global orchestrator.
global := NewGlobalOrchestrator(store)
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, global.Run(ctx))
}()
})

addService(t, store, service1)
testutils.Expect(t, watch, api.EventCreateService{})
Expand Down Expand Up @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2")
Expand Down Expand Up @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Nothing should happen because both tasks are up to date.
select {
Expand Down Expand Up @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask1.ID, "task2")
Expand Down Expand Up @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Fail the running task
s.Update(func(tx store.Tx) error {
Expand Down
118 changes: 94 additions & 24 deletions manager/orchestrator/replicated/update_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package replicated

import (
"sync/atomic"
"sync"
"testing"
"time"

"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/orchestrator/testutils"
"github.com/docker/swarmkit/manager/state"
Expand All @@ -26,19 +27,25 @@ func TestUpdaterRollback(t *testing.T) {
}

func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) {
ctx := context.Background()
// this test should complete within 30 seconds. if not, bail out
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

orchestrator := NewReplicatedOrchestrator(s)
defer orchestrator.Stop()

// These variables will be used to signal that The Fail Loop should start
// failing these tasks. Once they're closed, The Failing Can Begin.
var (
failImage1 uint32
failImage2 uint32
failMu sync.Mutex
failImage1 bool
)

// create a watch for task creates, which we will use to verify that the
// updater works correctly.
watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{})
defer cancelCreate()

Expand All @@ -48,23 +55,44 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
// Fail new tasks the updater tries to run
watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancelUpdate()
go func() {

// We're gonna call this big chunk here "The Fail Loop". its job is to put
// tasks into a Failed state in certain conditions.
testutils.EnsureRuns(func() {
failedLast := false
// typical go pattern: infinite for loop in a goroutine, exits on
// ctx.Done
for {
e := <-watchUpdate
var e events.Event
select {
case e = <-watchUpdate:
case <-ctx.Done():
return
}
task := e.(api.EventUpdateTask).Task
if task.DesiredState == task.Status.State {
continue
}
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning {
// This used to have a 3rd clause,
// "&& task.Status.State != api.TaskStateRunning"
// however, this is unneeded. If DesiredState is Running, then
// actual state cannot be Running, because that would get caught
// in the condition about (DesiredState == State)
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed {
err := s.Update(func(tx store.Tx) error {
task = store.GetTask(tx, task.ID)
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 {
// lock mutex governing access to failImage1.
failMu.Lock()
defer failMu.Unlock()
// we should start failing tasks with image1 only after1
if task.Spec.GetContainer().Image == "image1" && failImage1 {
// only fail the task if we can read from failImage1
// (which will only be true if it's closed)
task.Status.State = api.TaskStateFailed
failedLast = true
} else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast {
} else if task.Spec.GetContainer().Image == "image2" && !failedLast {
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
task.Status.State = api.TaskStateFailed
failedLast = true
} else {
Expand All @@ -83,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.NoError(t, err)
}
}
}()
})

// Create a service with four replicas specified before the orchestrator
// is started. This should result in two tasks when the orchestrator
Expand Down Expand Up @@ -141,8 +169,18 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.NoError(t, err)

// Start the orchestrator.
go func() {
assert.NoError(t, orchestrator.Run(ctx))
var orchestratorError error
orchestratorDone := testutils.EnsureRuns(func() {
orchestratorError = orchestrator.Run(ctx)
})

defer func() {
orchestrator.Stop()
select {
case <-ctx.Done():
case <-orchestratorDone:
assert.NoError(t, orchestratorError)
}
}()

observedTask := testutils.WatchTaskCreate(t, watchCreate)
Expand All @@ -161,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.Equal(t, observedTask.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1")

atomic.StoreUint32(&failImage2, 1)

// Start a rolling update
err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
Expand Down Expand Up @@ -195,7 +231,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

// Should get to the ROLLBACK_STARTED state
for {
e := <-watchServiceUpdate
var e events.Event
select {
case e = <-watchServiceUpdate:
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
}
if e.(api.EventUpdateService).Service.UpdateStatus == nil {
continue
}
Expand Down Expand Up @@ -224,16 +266,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

// Should end up in ROLLBACK_COMPLETED state
for {
e := <-watchServiceUpdate
var e events.Event
select {
case e = <-watchServiceUpdate:
t.Log("service was updated")
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
}

if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_COMPLETED {
break
}
}

atomic.StoreUint32(&failImage1, 1)

// Repeat the rolling update but this time fail the tasks that the
// rollback creates.
failMu.Lock()
failImage1 = true
failMu.Unlock()

err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
require.NotNil(t, s1)
Expand Down Expand Up @@ -265,7 +317,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

// Should get to the ROLLBACK_STARTED state
for {
e := <-watchServiceUpdate
var e events.Event
select {
case e = <-watchServiceUpdate:
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
}
if e.(api.EventUpdateService).Service.UpdateStatus == nil {
continue
}
Expand All @@ -290,15 +348,27 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
case api.UpdateConfig_PAUSE:
// Should end up in ROLLBACK_PAUSED state
for {
e := <-watchServiceUpdate
var e events.Event
select {
case e = <-watchServiceUpdate:
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
}
if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_PAUSED {
return
}
}
case api.UpdateConfig_CONTINUE:
// Should end up in ROLLBACK_COMPLETE state
for {
e := <-watchServiceUpdate
var e events.Event
select {
case e = <-watchServiceUpdate:
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
}
if e.(api.EventUpdateService).Service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_COMPLETED {
return
}
Expand Down
13 changes: 9 additions & 4 deletions manager/orchestrator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,27 @@ func SetServiceTasksRemove(ctx context.Context, s *store.MemoryStore, service *a
err = s.Batch(func(batch *store.Batch) error {
for _, t := range tasks {
err := batch.Update(func(tx store.Tx) error {
// the task may have changed for some reason in the meantime
// since we read it out, so we need to get from the store again
// within the boundaries of a transaction
latestTask := store.GetTask(tx, t.ID)

// time travel is not allowed. if the current desired state is
// above the one we're trying to go to we can't go backwards.
// we have nothing to do and we should skip to the next task
if t.DesiredState > api.TaskStateRemove {
if latestTask.DesiredState > api.TaskStateRemove {
// log a warning, though. we shouln't be trying to rewrite
// a state to an earlier state
log.G(ctx).Warnf(
"cannot update task %v in desired state %v to an earlier desired state %v",
t.ID, t.DesiredState, api.TaskStateRemove,
latestTask.ID, latestTask.DesiredState, api.TaskStateRemove,
)
return nil
}
// update desired state to REMOVE
t.DesiredState = api.TaskStateRemove
latestTask.DesiredState = api.TaskStateRemove

if err := store.UpdateTask(tx, t); err != nil {
if err := store.UpdateTask(tx, latestTask); err != nil {
log.G(ctx).WithError(err).Errorf("failed transaction: update task desired state to REMOVE")
}
return nil
Expand Down
Loading