From 96daaaf22f197da7922a3b46a1882cb0e85b401e Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Mon, 29 Oct 2018 10:47:59 -0500 Subject: [PATCH] Update new allocator to allocate node attachments as needed Forward ports the changes in #2725 to the new allocator. Signed-off-by: Drew Erny --- manager/allocator/allocator.go | 338 +++++++------------- manager/allocator/allocator_test.go | 168 +++++++--- manager/allocator/network/allocator.go | 198 ++++++++++-- manager/allocator/network/allocator_test.go | 104 ++++-- manager/allocator/network/ipam/ipam.go | 2 +- 5 files changed, 511 insertions(+), 299 deletions(-) diff --git a/manager/allocator/allocator.go b/manager/allocator/allocator.go index 34b01e22f3..60cc03d908 100644 --- a/manager/allocator/allocator.go +++ b/manager/allocator/allocator.go @@ -29,7 +29,6 @@ const ( var ( batchProcessingDuration metrics.Timer - storeLockHeld metrics.LabeledTimer allocatorActions metrics.LabeledTimer ) @@ -42,12 +41,6 @@ func init() { "object", "action", ) - storeLockHeld = ns.NewLabeledTimer( - "store_lock_held", - "The number of seconds the allocator holds open store transactions", - "object", "action", "lock_type", - ) - batchProcessingDuration = ns.NewTimer( "batch_duration", "The number of seconds it takes to process a full batch of allocations", @@ -383,52 +376,7 @@ func (a *Allocator) handleEvent(ctx context.Context, event events.Event) { // deallocated return } - // when a network is deallocated, if it's an overlay network, before we - // can deallocate it we have to deallocate all of the attachments for - // its nodes. - if ev.Network.DriverState != nil && ev.Network.DriverState.Name == "overlay" { - // TODO(dperny): doing a store update in this event loop is - // probably pretty much the worst thing I can imagine doing for - // performance. it blocks store access in the whole cluster. - // however, we don't really have a choice; this is our one chance - // to deallocate nodes. Luckily, network removes are fairly - // infrequent - log.G(ctx).WithField("network.id", ev.Network.ID).Info( - "an overlay network was removed, deallocating it from nodes", - ) - if err := a.store.Update(func(tx store.Tx) error { - storeDone := metrics.StartTimer(storeLockHeld.WithValues("network", "deallocate", "write")) - defer storeDone() - - // see NOTE(dperny): FindByAll - nodes, _ := store.FindNodes(tx, store.All) - for _, node := range nodes { - nwids := map[string]struct{}{} - for _, attachment := range node.Attachments { - if attachment.Network.ID != ev.Network.ID { - nwids[attachment.Network.ID] = struct{}{} - } - } - allocDone := metrics.StartTimer(allocatorActions.WithValues("network", "allocate")) - err := a.network.AllocateNode(node, nwids) - allocDone() - if err != nil { - if !errors.IsErrAlreadyAllocated(err) { - // it shouldn't be possible for this reallocation - // to fail. if it does fail, our only option is to - // return an error and log it. - return err - } - } - if err := store.UpdateNode(tx, node); err != nil { - return err - } - } - return nil - }); err != nil { - log.G(ctx).WithError(err).Error("deallocating nodes for an overlay network failed") - } - } + allocDone := metrics.StartTimer(allocatorActions.WithValues("network", "deallocate")) err := a.network.DeallocateNetwork(ev.Network) allocDone() @@ -494,6 +442,11 @@ func (a *Allocator) handleEvent(ctx context.Context, event events.Event) { } // if the task was pending before now, it certainly isn't anymore delete(a.pendingTasks, t.ID) + // we'll need to reallocate this node next time around, so that we + // drop this network attachment + if t.NodeID != "" { + a.pendingNodes[t.NodeID] = struct{}{} + } } else { a.pendingTasks[t.ID] = struct{}{} } @@ -521,6 +474,11 @@ func (a *Allocator) handleEvent(ctx context.Context, event events.Event) { if err != nil { log.G(ctx).WithField("task.id", ev.Task.ID).WithError(err).Error("error deallocating task") } + // add the node that this task was assigned to to the pendingNodes, so + // we can deallocate the network attachment on that node if necessary + if ev.Task.NodeID != "" { + a.pendingNodes[ev.Task.NodeID] = struct{}{} + } delete(a.pendingTasks, ev.Task.ID) case api.EventCreateNode: if ev.Node != nil { @@ -567,15 +525,6 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { done := metrics.StartTimer(batchProcessingDuration) defer done() - // overlayAllocated is a variable that tracks whether a new overlay network - // has been allocated. If it is true, then regardless of which nodes are - // currently pending, all nodes will need to be reallocated with the new - // network. we will do this after network allocation, but before task - // allocation, to prefer the case where nodes exhaust all available IP - // addresses leaving some tasks without. Tasks can just be failed, but - // nodes need IP addresses - overlayAllocated := false - // capture the pending maps and reinitialize them, so they'll be clean // after this runs pendingNetworks := a.pendingNetworks @@ -588,8 +537,7 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { // networks. networks can't be updated, so we don't need to worry about a // race condition. - // if no networks are pending, skip this block. this saves us the time it - // takes to acquire the store lock + // if no networks are pending, skip this block if len(pendingNetworks) > 0 { // we'll need two slices. the first is to hold all of the networks we get // from the store. the second holds all of the successfully allocated @@ -597,7 +545,6 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { networks := make([]*api.Network, 0, len(pendingNetworks)) allocatedNetworks := make([]*api.Network, 0, len(pendingNetworks)) - batchDone := metrics.StartTimer(storeLockHeld.WithValues("networks", "allocate", "read")) a.store.View(func(tx store.ReadTx) { for id := range pendingNetworks { nw := store.GetNetwork(tx, id) @@ -608,7 +555,6 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { networks = append(networks, nw) } }) - batchDone() // now, allocate each network for _, network := range networks { @@ -644,7 +590,6 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { // again, if we haven't actually successfully allocated any networks, // we should skip this part to avoid holding the lock if len(allocatedNetworks) > 0 { - batchDone = metrics.StartTimer(storeLockHeld.WithValues("network", "allocate", "write")) // now, batch update the networks we just allocated if err := a.store.Batch(func(batch *store.Batch) error { for _, network := range allocatedNetworks { @@ -662,13 +607,6 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { if err := store.UpdateNetwork(tx, network); err != nil { return err } - // if the network is successfully updated, check if - // it's an overlay network. if so, set the - // overlayAllocated flag so we reallocate nodes - if network.DriverState != nil && network.DriverState.Name == "overlay" { - log.G(ctx).Info("an overlay network was allocated, reallocating nodes") - overlayAllocated = true - } return nil }); err != nil { log.G(ctx).WithError(err).Error("error writing network to store") @@ -678,126 +616,6 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { }); err != nil { log.G(ctx).WithError(err).Error("error committing allocated networks") } - batchDone() - } - } - - // next, allocate pending nodes, if there are any, or if an overlay network - // has been allocated - // - // NOTE(dperny): there is a case where IP address overlap can occur because - // of node attachments. basically, after we've allocated all of the - // networks, we need to re-allocate all of the nodes to have the correct - // attachments. if we are deallocating an attachment, because the network - // backing it is gone, the node may still have that attachment until the - // point where it actually receives the update and deallocates locally the - // attachment. in the meantime, the IP addresses the attachment uses are - // freed and can be used by other objects. - if overlayAllocated || len(pendingNodes) > 0 { - // nodes is the set of node objects we'll allocate - var nodes []*api.Node - - // networks is a set of all overlay networks currently in use - overlayNetworks := map[string]struct{}{} - - readTxDone := metrics.StartTimer(storeLockHeld.WithValues("node", "allocate", "read")) - a.store.View(func(tx store.ReadTx) { - if overlayAllocated { - // see NOTE(dperny): FindByAll - nodes, _ = store.FindNodes(tx, store.All) - } else { - nodes = make([]*api.Node, 0, len(pendingNodes)) - for id := range pendingNodes { - node := store.GetNode(tx, id) - if node == nil { - log.G(ctx).WithField("node.id", id).Debug("pending node not in store, probably deleted") - continue - } - nodes = append(nodes, node) - } - } - - // now, get all overlay network IDs that are in use, so we can - // allocate them to the nodes - // see NOTE(dperny): FindByAll - networks, _ := store.FindNetworks(tx, store.All) - for _, network := range networks { - // because we're looking at the DriverState, not the spec, and - // because only allocated networks have the DriverState - // allocated, we will only attach allocated networks. if for - // some reason an unallocated network slips through, we'll get - // ErrDepedencyNotAllocated when we try to allocate the nodes. - if network.DriverState != nil && network.DriverState.Name == "overlay" { - overlayNetworks[network.ID] = struct{}{} - } - } - }) - readTxDone() - - // make space for the successfully allocated nodes - allocatedNodes := make([]*api.Node, 0, len(nodes)) - - for _, node := range nodes { - ctx := log.WithField(ctx, "node.id", node.ID) - allocDone := metrics.StartTimer(allocatorActions.WithValues("node", "allocate")) - err := a.network.AllocateNode(node, overlayNetworks) - allocDone() - - switch { - case err == nil: - // first, if no error occurred, then we can add this to the list of - // networks we will commit - allocatedNodes = append(allocatedNodes, node) - case errors.IsErrAlreadyAllocated(err): - // if the network is already allocated, there is nothing to do. log - // that and do not add this network to the batch - log.G(ctx).Debug("node already fully allocated") - case errors.IsErrRetryable(err): - // if the error is retryable, then we should log that error and - // re-add this network to our pending networks - log.G(ctx).WithError(err).Error("node could not be allocated, but will be retried") - a.pendingNodes[node.ID] = struct{}{} - default: - // default covers any other error case. specifically, error that - // are not retryable. for these, we should fail and not re-add them - // to the pending map. they will never succeed - log.G(ctx).WithError(err).Error("node cannot be allocated") - } - } - - // if any nodes successfully allocated, commit them - if len(allocatedNodes) > 0 { - if err := a.store.Batch(func(batch *store.Batch) error { - for _, node := range allocatedNodes { - writeTxDone := metrics.StartTimer(storeLockHeld.WithValues("node", "allocate", "write")) - if err := batch.Update(func(tx store.Tx) error { - currentNode := store.GetNode(tx, node.ID) - if currentNode == nil { - // if there is no node, then it must have been - // deleted. deallocate our changes. - - log.G(ctx).WithField("node.id", node.ID).Debug("node was deleted after allocation but before committing") - a.network.DeallocateNode(node) - a.deletedObjects[node.ID] = struct{}{} - return nil - } - - // the node may have changed in the meantime since we - // read it before allocation. but since we're the only - // one who changes the attachments, we can just plop - // our new attachments into that slice with no danger - currentNode.Attachments = node.Attachments - - return store.UpdateNode(tx, currentNode) - }); err != nil { - log.G(ctx).WithError(err).WithField("node.id", node.ID).Error("error in committing allocated node") - } - writeTxDone() - } - return nil - }); err != nil { - log.G(ctx).WithError(err).Error("error in batch update of allocated nodes") - } } } @@ -817,28 +635,33 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { // tasks is a map from service id to a slice of task objects belonging // to that service tasks := map[string][]*api.Task{} - storeDone := metrics.StartTimer(storeLockHeld.WithValues("service", "allocate", "read")) a.store.View(func(tx store.ReadTx) { for taskid := range pendingTasks { t := store.GetTask(tx, taskid) - // if the task is not nil, is in state NEW, and its desired - // state is past NEW but not terminal, then the task still - // needs allocation. + // if the task is nil, then that means it has been deleted if t != nil { - if t.Status.State == api.TaskStateNew && api.TaskStateNew < t.DesiredState && t.DesiredState <= api.TaskStateRunning { - // if the task has an empty service, then we don't need to - // allocate the service. however, we'll keep those - // serviceless tasks (network attachment tasks) under the - // emptystring key in the map and specialcase that key - // later. - service := store.GetService(tx, t.ServiceID) - if service != nil { - services = append(services, service) + // only new tasks need to be allocated. However, if a task + // has a node assignment, we should update that now. + if api.TaskStateNew < t.DesiredState && t.DesiredState <= api.TaskStateRunning { + if t.Status.State == api.TaskStateNew { + // if the task has an empty service, then we don't need to + // allocate the service. however, we'll keep those + // serviceless tasks (network attachment tasks) under the + // emptystring key in the map and specialcase that key + // later. + service := store.GetService(tx, t.ServiceID) + if service != nil { + services = append(services, service) + } + // thus, if t.ServiceID == "", this will still work + // this is safe without initializing each slice, because + // appending to a nil slice is valid + tasks[t.ServiceID] = append(tasks[t.ServiceID], t) + } else if t.NodeID != "" { + a.network.AddTaskNode(t) + // we'll need to reallocate the node + pendingNodes[t.NodeID] = struct{}{} } - // thus, if t.ServiceID == "", this will still work - // this is safe without initializing each slice, because - // appending to a nil slice is valid - tasks[t.ServiceID] = append(tasks[t.ServiceID], t) } else { log.G(ctx).WithField("task.id", taskid).Debug("task is no longer in a state requiring allocation") } @@ -847,7 +670,6 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { } } }) - storeDone() // allocatedServices will store the services we have actually // allocated, not including the services that didn't need allocation @@ -921,6 +743,13 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { // remove these tasks from the map log.G(ctx).WithError(err).Error("task cannot be allocated") } + + // if the task has a node assignment, we need to add that node + // to pendingNodes, because we may need to update that node's + // allocations + if task.NodeID != "" { + pendingNodes[task.NodeID] = struct{}{} + } } } @@ -951,12 +780,17 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { // remove these tasks from the map log.G(ctx).WithError(err).Error("task cannot be allocated") } + // if the task has a node assignment, we need to add that node + // to pendingNodes, because we may need to update that node's + // allocations + if task.NodeID != "" { + pendingNodes[task.NodeID] = struct{}{} + } } // finally, if we have any services or tasks to commit, open a // batch and commit them if len(allocatedServices)+len(allocatedTasks) > 0 { - taskWriteTxDone := metrics.StartTimer(storeLockHeld.WithValues("allocate", "task", "write")) if err := a.store.Batch(func(batch *store.Batch) error { for _, service := range allocatedServices { if err := batch.Update(func(tx store.Tx) error { @@ -1009,7 +843,83 @@ func (a *Allocator) processPendingAllocations(ctx context.Context) { }); err != nil { // TODO(dperny): see batch update above. } - taskWriteTxDone() + } + } + + if len(pendingNodes) > 0 { + nodes := make([]*api.Node, 0, len(pendingNodes)) + allocatedNodes := make([]*api.Node, 0, len(pendingNodes)) + + // get the freshest copy of the node. + a.store.View(func(tx store.ReadTx) { + for nodeID := range pendingNodes { + node := store.GetNode(tx, nodeID) + if node == nil { + log.G(ctx).WithField("node.id", nodeID).Debug("node not found, probably deleted") + continue + } + nodes = append(nodes, node) + } + }) + + // now go and try to allocate each node + for _, node := range nodes { + ctx := log.WithField(ctx, "node.id", node.ID) + allocDone := metrics.StartTimer(allocatorActions.WithValues("node", "allocate")) + err := a.network.AllocateNode(node) + allocDone() + + switch { + case err == nil: + allocatedNodes = append(allocatedNodes, node) + case errors.IsErrAlreadyAllocated(err): + // if the network is already allocated, there is nothing to do. log + // that and do not add this network to the batch + log.G(ctx).Debug("node already fully allocated") + case errors.IsErrRetryable(err): + // if the error is retryable, then we should log that error and + // re-add this network to our pending networks + log.G(ctx).WithError(err).Error("node could not be allocated, but will be retried") + a.pendingNodes[node.ID] = struct{}{} + default: + // default covers any other error case. specifically, error that + // are not retryable. for these, we should fail and not re-add them + // to the pending map. they will never succeed + log.G(ctx).WithError(err).Error("node cannot be allocated") + } + } + + // if any nodes successfully allocated, commit them + if len(allocatedNodes) > 0 { + if err := a.store.Batch(func(batch *store.Batch) error { + for _, node := range allocatedNodes { + if err := batch.Update(func(tx store.Tx) error { + currentNode := store.GetNode(tx, node.ID) + if currentNode == nil { + // if there is no node, then it must have been + // deleted. deallocate our changes. + + log.G(ctx).WithField("node.id", node.ID).Debug("node was deleted after allocation but before committing") + a.network.DeallocateNode(node) + a.deletedObjects[node.ID] = struct{}{} + return nil + } + + // the node may have changed in the meantime since we + // read it before allocation. but since we're the only + // one who changes the attachments, we can just plop + // our new attachments into that slice with no danger + currentNode.Attachments = node.Attachments + + return store.UpdateNode(tx, currentNode) + }); err != nil { + log.G(ctx).WithError(err).WithField("node.id", node.ID).Error("error in committing allocated node") + } + } + return nil + }); err != nil { + log.G(ctx).WithError(err).Error("error in batch update of allocated nodes") + } } } } diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 6544cc248c..c3e4b6bc63 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -1331,6 +1331,18 @@ func TestNodeAllocator(t *testing.T) { } assert.NoError(t, store.CreateNetwork(tx, n1)) + // this network will never be used -- a negative test, to ensure we + // don't allocate for networks that aren't needed + nUnused := &api.Network{ + ID: "overlayIDUnused", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "overlayIDUnused", + }, + }, + } + assert.NoError(t, store.CreateNetwork(tx, nUnused)) + assert.NoError(t, store.CreateNode(tx, node1)) return nil })) @@ -1339,6 +1351,8 @@ func TestNodeAllocator(t *testing.T) { defer cancel() netWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateNetwork{}, api.EventDeleteNetwork{}) defer cancel() + taskWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{}) + defer cancel() waitStop := make(chan struct{}) ctx, ctxCancel := context.WithCancel(context.Background()) @@ -1353,12 +1367,36 @@ func TestNodeAllocator(t *testing.T) { <-waitStop }() + assert.NoError(t, s.Update(func(tx store.Tx) error { + // create a task assigned to this node that has a network attachment to + // n1 + t1 := &api.Task{ + ID: "task1", + NodeID: node1.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID1", + }, + }, + }, + } + + return store.CreateTask(tx, t1) + })) + + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + // Validate node has 2 LB IP address (1 for each network). watchNetwork(t, netWatch, false, isValidNetwork) // ingress watchNetwork(t, netWatch, false, isValidNetwork) // overlayID1 + watchNetwork(t, netWatch, false, isValidNetwork) //overlayIDUnused watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 - // Add a node and validate it gets a LB ip on each network. + // Add a node and validate it gets a LB ip only on ingress, as it has no + // tasks assigned. node2 := &api.Node{ ID: "nodeID2", } @@ -1366,9 +1404,9 @@ func TestNodeAllocator(t *testing.T) { assert.NoError(t, store.CreateNode(tx, node2)) return nil })) - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2 + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress"}) // node2 - // Add a network and validate each node has 3 LB IP addresses + // Add a network and validate that nothing has changed on the nodes n2 := &api.Network{ ID: "overlayID2", Spec: api.NetworkSpec{ @@ -1381,18 +1419,82 @@ func TestNodeAllocator(t *testing.T) { assert.NoError(t, store.CreateNetwork(tx, n2)) return nil })) - watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 - watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1", "overlayID2"}) // node1 - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1", "overlayID2"}) // node2 + watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 + // nothing should change, no updates + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress"}) // node2 + + // add a task and validate that the node gets the network for the task + assert.NoError(t, s.Update(func(tx store.Tx) error { + // create a task assigned to node2 that has a network attachment on n2 + t2 := &api.Task{ + ID: "task2", + NodeID: node2.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID2", + }, + }, + }, + } + + return store.CreateTask(tx, t2) + })) + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + + // validate that node2 gets a new attachment and node1 stays the same + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + + // add another task with the same network to a node and validate that it + // still only has 1 attachment for that network + assert.NoError(t, s.Update(func(tx store.Tx) error { + // create a task assigned to this node that has a network attachment on + // n1 + t3 := &api.Task{ + ID: "task3", + NodeID: node1.ID, + DesiredState: api.TaskStateRunning, + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: "overlayID1", + }, + }, + }, + } + return store.CreateTask(tx, t3) + })) + + // validate that the task is created + watchTask(t, s, taskWatch, false, isValidTask) + + // validate that nothing changes + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 + + // now remove that task we just created, and validate that the node still + // has an attachment for the other task + assert.NoError(t, s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteTask(tx, "task1")) + return nil + })) + + // validate that nothing changes + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 + watchNode(t, nodeWatch, true, isValidNode, node2, []string{"ingress", "overlayID2"}) // node2 - // Remove a network and validate each node has 2 LB IP addresses + // now remove another task. this time the attachment on the node should be + // removed as well assert.NoError(t, s.Update(func(tx store.Tx) error { - assert.NoError(t, store.DeleteNetwork(tx, n2.ID)) + assert.NoError(t, store.DeleteTask(tx, "task2")) return nil })) - watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2 - watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 - watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2 + watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress"}) // node2 + watchNode(t, nodeWatch, true, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1 // Remove a node and validate remaining node has 2 LB IP addresses assert.NoError(t, s.Update(func(tx store.Tx) error { @@ -1434,6 +1536,11 @@ func TestNodeAllocator(t *testing.T) { */ } +// TestNodeAllocatorDeletingNetworkRace is a test to see that deleting a +// network immediately before a leadership change will result in the node's +// network attachment being correctly deallocated. However, the way nodes are +// allocated was changed such that network attachments are allocated as needed +// by tasks. This means the only network vulnerable to this problem is Ingress. func TestNodeAllocatorDeletingNetworkRace(t *testing.T) { s := store.NewMemoryStore(nil) assert.NotNil(t, s) @@ -1456,31 +1563,10 @@ func TestNodeAllocatorDeletingNetworkRace(t *testing.T) { } assert.NoError(t, store.CreateNetwork(tx, in)) - n1 := &api.Network{ - ID: "overlayID1", - Spec: api.NetworkSpec{ - Annotations: api.Annotations{ - Name: "overlayID1", - }, - }, - } - assert.NoError(t, store.CreateNetwork(tx, n1)) - - n2 := &api.Network{ - ID: "overlayID2", - Spec: api.NetworkSpec{ - Annotations: api.Annotations{ - Name: "overlayID2", - }, - }, - } - assert.NoError(t, store.CreateNetwork(tx, n2)) - node1 = &api.Node{ ID: "node1", } assert.NoError(t, store.CreateNode(tx, node1)) - return nil })) @@ -1505,20 +1591,10 @@ func TestNodeAllocatorDeletingNetworkRace(t *testing.T) { // now wait for the networks and nodes to be created // Validate node has 2 LB IP address (1 for each network). watchNetwork(t, netWatch, false, isValidNetwork) // ingress - watchNetwork(t, netWatch, false, isValidNetwork) // overlayID1 watchNode(t, nodeWatch, false, isValidNode, node1, - []string{"ingress", "overlayID1", "overlayID2"}, + []string{"ingress"}, ) // node1 - // now, delete network 1. We expect it to be removed from the node - assert.NoError(t, s.Update(func(tx store.Tx) error { - return store.DeleteNetwork(tx, "overlayID2") - })) - - watchNode(t, nodeWatch, false, isValidNode, node1, - []string{"ingress", "overlayID1"}, - ) - // now stop the allocator. // cancel the context cancel() @@ -1531,7 +1607,7 @@ func TestNodeAllocatorDeletingNetworkRace(t *testing.T) { // situation as in which the network is deleted and an immediate leadership // change prevents reallocation of nodes in the meantime assert.NoError(t, s.Update(func(tx store.Tx) error { - return store.DeleteNetwork(tx, "overlayID1") + return store.DeleteNetwork(tx, "ingress") })) // then, start up a fresh new allocator @@ -1554,7 +1630,7 @@ func TestNodeAllocatorDeletingNetworkRace(t *testing.T) { }() // wait for the node to be updated to remove the network - watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress"}) + watchNode(t, nodeWatch, false, isValidNode, node1, []string{}) } func isValidNode(t assert.TestingT, originalNode, updatedNode *api.Node, networks []string) bool { @@ -1709,7 +1785,7 @@ func watchNetwork(t *testing.T, watch chan events.Event, expectTimeout bool, fn fn(t, network) } - t.Fatal("timed out before watchNetwork found expected network state") + t.Fatal("timed out before watchNetwork found expected network state ", string(debug.Stack())) } return diff --git a/manager/allocator/network/allocator.go b/manager/allocator/network/allocator.go index 02922dc61d..e66269cea6 100644 --- a/manager/allocator/network/allocator.go +++ b/manager/allocator/network/allocator.go @@ -30,12 +30,18 @@ type Allocator interface { DeallocateService(*api.Service) error AllocateTask(*api.Task) error + AddTaskNode(*api.Task) DeallocateTask(*api.Task) error - AllocateNode(*api.Node, map[string]struct{}) error + AllocateNode(*api.Node) error DeallocateNode(*api.Node) error } +// these new types aren't used for any compile-time type-checking. they're just +// to make the map key and value representations more apparent. +type taskSet map[string]struct{} +type networkSet map[string]taskSet + type allocator struct { // in order to figure out if the dependencies of a task are fulfilled, we // need to keep track of what we have allocated already. this also allows @@ -52,6 +58,19 @@ type allocator struct { // with a different set of networks nodeLocalNetworks map[string]*api.Network + // we need to be able to determine what networks a node should have + // attachments for. a node should have an attachment for every network in + // use by at least 1 task. + + // nodeNetworks maps a nodeID to a set of networkIDs, which is used for + // determining if a node needs attachments allocated or deallocated + // + // this is, at its core, a nested map. it is important to note the code + // assumes that if the key exists, then the value is non-nil. if this isn't + // the case, and one of the sub-maps is nil, the program will attempt to + // add an entry to a nil map and crash. + nodeNetworks map[string]networkSet + // also attachments don't need to be kept track of, because nothing depends // on them. @@ -75,6 +94,7 @@ func newAllocatorWithComponents(ipamAlloc ipam.Allocator, driverAlloc driver.All ipam: ipamAlloc, driver: driverAlloc, port: portAlloc, + nodeNetworks: map[string]networkSet{}, } } @@ -109,6 +129,7 @@ func NewAllocator(pg plugingetter.PluginGetter) Allocator { port: port.NewAllocator(), ipam: ipam.NewAllocator(reg), driver: driver.NewAllocator(reg), + nodeNetworks: map[string]networkSet{}, } } @@ -215,6 +236,7 @@ func (a *allocator) Restore(networks []*api.Network, services []*api.Service, ta attachments = append(attachments, attachment) } } + a.addToNodeNetworkSet(task) } for _, node := range nodes { @@ -230,6 +252,19 @@ func (a *allocator) Restore(networks []*api.Network, services []*api.Service, ta if node.Attachment != nil { attachments = append(attachments, node.Attachment) } + + // make sure that every node has an entry in the nodeNetworks set. if + // not, add one + if _, ok := a.nodeNetworks[node.ID]; !ok { + a.nodeNetworks[node.ID] = networkSet{} + } + // additionally, every node should have an empty taskSet for the + // ingress networ, if an ingress network exists, because every node + // needs to be allocated on the ingress network. having an empty + // taskSet simplifies allocation code later on + if a.ingressID != "" { + a.nodeNetworks[node.ID][a.ingressID] = taskSet{} + } } // now restore the various components @@ -597,42 +632,79 @@ func (a *allocator) AllocateTask(task *api.Task) (rerr error) { } task.Networks = append(finalAttachments, localAttachments...) + // before we return, if this task has a node assigned already (for example, + // if it is a global task) then we should add it to the nodeNetworks set so + // that next time we reallocate its node, that node has network attachments + // for the task + if task.NodeID != "" { + a.addToNodeNetworkSet(task) + } + return nil } +// AddTaskNode informs the allocator that a task has been scheduled on a node. +// It should be called whenever a task that has already been allocated receives +// a node assignment. AddTaskNode is idempotent, and call be called multiple +// times on the same task, unless the task has already been removed. +// +// There is no corresponding RemoveTaskNode method, because that operation is +// performed as part of DeallocateTask +func (a *allocator) AddTaskNode(task *api.Task) { + if task.NodeID != "" { + a.addToNodeNetworkSet(task) + } +} + // DeallocateTask takes a task and frees its network resources. func (a *allocator) DeallocateTask(task *api.Task) error { + // in addition to deallocaing the network attachments, we should remove + // the task's entries in the nodeNetworks. + a.removeFromNodeNetworkSet(task) return a.deallocateAttachments(task.Networks) } -// AllocateNode allocates the network attachments for a node. The second -// argument, a set of networks, is used to indicate which networks the node -// needs to be attached to. This is necessary because the node's attachments -// are informed by its task allocations, which is a list not available in this -// context. +// AllocateNode allocates a node with the correct set of networks required, +// based on which tasks have been allocated. Despite its name, AllocateNode may +// also deallocate attachments no longer needed. // -// If this method returns nil, then the node has been fully allocated, and -// should be committed. Otherwise, the node will not be altered. -func (a *allocator) AllocateNode(node *api.Node, requestedNetworks map[string]struct{}) (rerr error) { +// If the node has been successfully allocated and should be committed, then +// this method returns nil. If the node is correctly allocated already and +// nothing new has been added, it returns errors.ErrAlreadyAllocated. +// Otherwise, it returns an error indicating what went wrong with allocation. +func (a *allocator) AllocateNode(node *api.Node) (rerr error) { // TODO(dperny): After the initial version, we should remove the code // supporting the singular "attachment" field, and require all upgrades // past this version to pass through this version in order to reallocate // out of that field. - networks := map[string]struct{}{} - // copy the networks map so we can safely mutate it - for nw := range requestedNetworks { - networks[nw] = struct{}{} + // get the networkSet for the node. + nodeNets := a.nodeNetworks[node.ID] + + // if there is no networkSet for the node, we need to create one + if nodeNets == nil { + nodeNets = networkSet{} + a.nodeNetworks[node.ID] = nodeNets } // the node always needs a network attachment to the ingress network. if it - // exists, add the ingress network to the list of requested networks now. - // it may already be in the requested networks, but we if it is, then - // nothing has been altered. + // exists, add the ingress network to the set of required networks now. it + // may already be in that list, but it may have been removed when removing + // tasks if a.ingressID != "" { - // if for some reason, the caller has already added the ingress network - // to the networks list, this will do nothing, which isn't a problem. - networks[a.ingressID] = struct{}{} + // we only need to add an empty taskSet if no taskSet for the ingress + // network exists. otherwise, we might cover up tasks that are attached + // to the ingress network + if _, ok := nodeNets[a.ingressID]; !ok { + nodeNets[a.ingressID] = taskSet{} + } + } + + networks := map[string]struct{}{} + // now, copy all the networkIDs from the node's networkSet, so we can + // figure out what needs to be newly allocated + for netID := range nodeNets { + networks[netID] = struct{}{} } // first, figure out which networks we keep and which we throw away from @@ -735,6 +807,10 @@ func (a *allocator) DeallocateNode(node *api.Node) error { if err := a.deallocateAttachments(node.Attachments); err != nil { finalErr = err } + + // before returning, remove the node's entry in the nodeNetworks set + delete(a.nodeNetworks, node.ID) + return finalErr } @@ -854,3 +930,87 @@ func (a *allocator) isServiceFullyAllocated(service *api.Service) bool { } return true } + +// addToNodeNetworkSet adds a task's networks to a node's network set. it is a +// helper method just to avoid making the AllocateTask method even bigger +func (a *allocator) addToNodeNetworkSet(task *api.Task) { + // double or triple check that this task has an actual nodeID, not just + // emptyString. emptyString is a valid map key, and adding entries to it + // will break everything. + if task.NodeID == "" { + return + } + + // if the task is assigned to a node, we need to add the task's + // networks to the node's set + // get the node's networkSet, if it yet exists + nodeNets, ok := a.nodeNetworks[task.NodeID] + if !ok { + // if there isn't yet a networkSet for this node, create + // one now. + nodeNets = networkSet{} + a.nodeNetworks[task.NodeID] = nodeNets + } + + for _, attachment := range task.Networks { + if attachment.Network == nil { + continue + } + // if the network is node-local, it does not need to have an attachment + // allocated on the node, so we don't track it. + if _, ok := a.nodeLocalNetworks[attachment.Network.ID]; ok { + continue + } + + // now, check if this node has a taskSet for this network. + netTasks, ok := nodeNets[attachment.Network.ID] + // if not, create one now + if !ok { + netTasks = taskSet{} + nodeNets[attachment.Network.ID] = netTasks + } + // and, finally, add this task to the network's taskSet + netTasks[task.ID] = struct{}{} + } +} + +// removeFromNodeNetworkSet removes the task's networks from the node's network +// set, and cleans up any now-empty network sets. this should work even if it +// has already been called on this task; that is, it should be idempotent. +func (a *allocator) removeFromNodeNetworkSet(task *api.Task) { + // like in addTo, we cannot ever do anything if a NodeID is emptystring + if task.NodeID == "" { + return + } + + nodeNets, ok := a.nodeNetworks[task.NodeID] + // if for some reason, there is not networkSet for this node, we can just + // return, as there is nothing to do + if !ok { + return + } + + for _, attachment := range task.Networks { + // handle the rare error case where an attachment's Network is nil. + // this should never occur, but bugs were hit in the old allocator + // where it did, and dereferencing a nil pointer thus causing a crash + // is a _very_ undesirable outcome + if attachment.Network == nil { + continue + } + netTasks, ok := nodeNets[attachment.Network.ID] + // if there is no taskSet for this network, then there is nothing to do + if !ok { + return + } + delete(netTasks, task.ID) + + // if there are no more tasks in this taskSet, delete the network from + // the networkSet. + if len(netTasks) == 0 { + delete(nodeNets, attachment.Network.ID) + } + // we do not need to delete the node and its networkSet from the + // top-level a.nodeNetworks object. + } +} diff --git a/manager/allocator/network/allocator_test.go b/manager/allocator/network/allocator_test.go index 3969bc4bdb..b506015c1d 100644 --- a/manager/allocator/network/allocator_test.go +++ b/manager/allocator/network/allocator_test.go @@ -441,7 +441,7 @@ var _ = Describe("network.Allocator", func() { // Before we start, do a restore of all of these pre-populated // items. - // NOTE(dperny): this doens't actually do a whole bunch for us, + // NOTE(dperny): this doesn't actually do a whole bunch for us, // because the only state persisted in this version of the // Allocator is that of allocated services, but this future-proofs // the test at basically no cost. @@ -1250,7 +1250,7 @@ var _ = Describe("network.Allocator", func() { }) }) - Describe("tasks", func() { + FDescribe("tasks", func() { var ( ingress, localnet, nw1, nw2 *api.Network ) @@ -1429,6 +1429,29 @@ var _ = Describe("network.Allocator", func() { It("should populate the task's endpoint with the service's endpoint", func() { Expect(task.Endpoint).To(Equal(service.Endpoint)) }) + + Context("when the task has a node assignment", func() { + BeforeEach(func() { + task.NodeID = "somenode" + }) + + It("should add that task's networks to the nodeNetworks set", func() { + Expect(a.nodeNetworks).To(HaveLen(1)) + Expect(a.nodeNetworks).To(HaveKeyWithValue( + "somenode", networkSet{ + nw1.ID: taskSet{ + task.ID: struct{}{}, + }, + nw2.ID: taskSet{ + task.ID: struct{}{}, + }, + ingress.ID: taskSet{ + task.ID: struct{}{}, + }, + }, + )) + }) + }) }) }) @@ -1512,9 +1535,9 @@ var _ = Describe("network.Allocator", func() { }) }) - Describe("nodes", func() { + FDescribe("nodes", func() { var ( - ingress, nw1, nw2 *api.Network + ingress, nw1, nw2, nw3 *api.Network ) BeforeEach(func() { @@ -1531,29 +1554,67 @@ var _ = Describe("network.Allocator", func() { nw2 = &api.Network{ ID: "allocNodesNw2", } + nw3 = &api.Network{ + ID: "allocNodesNw3", + } - initNetworks = append(initNetworks, ingress, nw1, nw2) + initNetworks = append(initNetworks, ingress, nw1, nw2, nw3) }) Describe("allocating", func() { var ( - node *api.Node - networks map[string]struct{} - err error + node *api.Node + err error + task1, task2 *api.Task ) BeforeEach(func() { - networks = map[string]struct{}{ - "allocNodesIngress": {}, - "allocNodesNw1": {}, - "allocNodesNw2": {}, + node = &api.Node{ + ID: "node1", } - node = &api.Node{} + task1 = &api.Task{ + ID: "task1", + NodeID: "node1", + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: nw1.ID, + }, + }, + }, + Networks: []*api.NetworkAttachment{ + { + Network: nw1, + }, + }, + } + + task2 = &api.Task{ + ID: "task2", + NodeID: "node1", + Spec: api.TaskSpec{ + Networks: []*api.NetworkAttachmentConfig{ + { + Target: nw2.ID, + }, + }, + }, + Networks: []*api.NetworkAttachment{ + { + Network: nw2, + }, + }, + } + initTasks = append(initTasks, task1, task2) + + // Note that nw3 is not in use by any task, and so should + // not be added to a node. }) JustBeforeEach(func() { - err = a.AllocateNode(node, networks) + err = a.AllocateNode(node) }) + Context("when a new node is successfully allocated", func() { BeforeEach(func() { mockIpam.EXPECT().AllocateAttachment( @@ -1617,6 +1678,7 @@ var _ = Describe("network.Allocator", func() { Expect(err).To(WithTransform(errors.IsErrAlreadyAllocated, BeTrue())) }) }) + Context("when the node has a deprecated singular network attachment", func() { BeforeEach(func() { node.Attachment = &api.NetworkAttachment{ @@ -1654,13 +1716,14 @@ var _ = Describe("network.Allocator", func() { }) + // TODO(dperny): convert to work with task-based allocation Context("when the node only has a deprecated singular network attachment", func() { // this test covers the specific case of just reallocating - // the ingres network and not adding more networks + // the ingress network and not adding more networks BeforeEach(func() { + task1.NodeID = "nonode" + task2.NodeID = "nonode" // leave just the ingress network - delete(networks, nw1.ID) - delete(networks, nw2.ID) node.Attachment = &api.NetworkAttachment{ Network: ingress, } @@ -1718,6 +1781,8 @@ var _ = Describe("network.Allocator", func() { Context("when a network is removed from an existing node", func() { BeforeEach(func() { + task2.NodeID = "notanode" + node.Attachments = []*api.NetworkAttachment{ { Network: nw1, @@ -1731,7 +1796,6 @@ var _ = Describe("network.Allocator", func() { Network: nw2, }, } - delete(networks, nw2.ID) mockIpam.EXPECT().DeallocateAttachment( &api.NetworkAttachment{Network: nw2}, @@ -1753,6 +1817,7 @@ var _ = Describe("network.Allocator", func() { )) }) }) + Context("when a node includes networks that aren't allocated", func() { BeforeEach(func() { node.Attachments = []*api.NetworkAttachment{ @@ -1770,8 +1835,9 @@ var _ = Describe("network.Allocator", func() { }, }, } + // don't include nw2 in our allocation - delete(networks, nw2.ID) + task2.NodeID = "notanode" mockIpam.EXPECT().DeallocateAttachment( node.Attachments[2], diff --git a/manager/allocator/network/ipam/ipam.go b/manager/allocator/network/ipam/ipam.go index dfbac6c8cc..6622c71d31 100644 --- a/manager/allocator/network/ipam/ipam.go +++ b/manager/allocator/network/ipam/ipam.go @@ -167,7 +167,7 @@ func (a *allocator) Restore(networks []*api.Network, endpoints []*api.Endpoint, if err != nil { return errors.ErrBadState( "error requesting already assigned gateway address %v: %v", - err, + config.Gateway, err, ) } // NOTE(dperny): this check was originally here: