diff --git a/pkg/network/common/egressip.go b/pkg/network/common/egressip.go index 258204c2851b..554307627b29 100644 --- a/pkg/network/common/egressip.go +++ b/pkg/network/common/egressip.go @@ -450,22 +450,32 @@ func (eit *EgressIPTracker) SetNodeOffline(nodeIP string, offline bool) { eit.egressIPChanged(eg) } } + + if node.requestedCIDRs.Len() != 0 { + eit.updateEgressCIDRs = true + } + eit.syncEgressIPs() } +func (eit *EgressIPTracker) lookupNodeIP(ip string) string { + eit.Lock() + defer eit.Unlock() + + if node := eit.nodesByNodeIP[ip]; node != nil { + return node.sdnIP + } + return ip +} + // Ping a node and return whether or not it is online. We do this by trying to open a TCP // connection to the "discard" service (port 9); if the node is offline, the attempt will // time out with no response (and we will return false). If the node is online then we // presumably will get a "connection refused" error; the code below assumes that anything // other than timing out indicates that the node is online. func (eit *EgressIPTracker) Ping(ip string, timeout time.Duration) bool { - eit.Lock() - defer eit.Unlock() - // If the caller used a public node IP, replace it with the SDN IP - if node := eit.nodesByNodeIP[ip]; node != nil { - ip = node.sdnIP - } + ip = eit.lookupNodeIP(ip) conn, err := net.DialTimeout("tcp", ip+":9", timeout) if conn != nil { @@ -485,10 +495,10 @@ func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[str otherNodes := false for _, node := range eit.nodes { - egressIPs, exists := allocation[node.nodeName] - if !exists { + if node.offline { continue } + egressIPs := allocation[node.nodeName] for _, parsed := range node.parsedCIDRs { if parsed.Contains(ip) { if bestNode != "" { @@ -506,24 +516,22 @@ func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[str return bestNode, otherNodes } -// ReallocateEgressIPs returns a map from Node name to array-of-Egress-IP. Unchanged nodes are not included. -func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string { - eit.Lock() - defer eit.Unlock() +func (eit *EgressIPTracker) makeEmptyAllocation() (map[string][]string, map[string]bool) { + return make(map[string][]string), make(map[string]bool) +} + +func (eit *EgressIPTracker) allocateExistingEgressIPs(allocation map[string][]string, alreadyAllocated map[string]bool) bool { + removedEgressIPs := false - allocation := make(map[string][]string) - changed := make(map[string]bool) - alreadyAllocated := make(map[string]bool) for _, node := range eit.nodes { if len(node.parsedCIDRs) > 0 { allocation[node.nodeName] = make([]string, 0, node.requestedIPs.Len()) } } // For each active egress IP, if it still fits within some egress CIDR on its node, - // add it to that node's allocation. (Otherwise add the node to the "changed" map, - // since we'll be removing this egress IP from it.) + // add it to that node's allocation. for egressIP, eip := range eit.egressIPs { - if eip.assignedNodeIP == "" { + if eip.assignedNodeIP == "" || alreadyAllocated[egressIP] { continue } node := eip.nodes[0] @@ -534,10 +542,10 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string { break } } - if found { + if found && !node.offline { allocation[node.nodeName] = append(allocation[node.nodeName], egressIP) } else { - changed[node.nodeName] = true + removedEgressIPs = true } // (We set alreadyAllocated even if the egressIP will be removed from // its current node; we can't assign it to a new node until the next @@ -545,6 +553,10 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string { alreadyAllocated[egressIP] = true } + return removedEgressIPs +} + +func (eit *EgressIPTracker) allocateNewEgressIPs(allocation map[string][]string, alreadyAllocated map[string]bool) { // Allocate pending egress IPs that can only go to a single node for egressIP, eip := range eit.egressIPs { if alreadyAllocated[egressIP] || len(eip.namespaces) == 0 { @@ -553,7 +565,6 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string { nodeName, otherNodes := eit.findEgressIPAllocation(eip.parsed, allocation) if nodeName != "" && !otherNodes { allocation[nodeName] = append(allocation[nodeName], egressIP) - changed[nodeName] = true alreadyAllocated[egressIP] = true } } @@ -565,15 +576,54 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string { nodeName, _ := eit.findEgressIPAllocation(eip.parsed, allocation) if nodeName != "" { allocation[nodeName] = append(allocation[nodeName], egressIP) - changed[nodeName] = true } } +} - // Remove unchanged nodes from the return value - for _, node := range eit.nodes { - if !changed[node.nodeName] { - delete(allocation, node.nodeName) +// ReallocateEgressIPs returns a map from Node name to array-of-Egress-IP for all auto-allocated egress IPs +func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string { + eit.Lock() + defer eit.Unlock() + + allocation, alreadyAllocated := eit.makeEmptyAllocation() + removedEgressIPs := eit.allocateExistingEgressIPs(allocation, alreadyAllocated) + eit.allocateNewEgressIPs(allocation, alreadyAllocated) + if removedEgressIPs { + // Process the removals now; we'll get called again afterward and can + // check for balance then. + return allocation + } + + // Compare the allocation to what we would have gotten if we started from scratch, + // to see if things have gotten too unbalanced. (In particular, if a node goes + // offline, gets emptied, and then comes back online, we want to move a bunch of + // egress IPs back onto that node.) + fullReallocation, alreadyAllocated := eit.makeEmptyAllocation() + eit.allocateNewEgressIPs(fullReallocation, alreadyAllocated) + + emptyNodes := []string{} + for nodeName, fullEgressIPs := range fullReallocation { + incrementalEgressIPs := allocation[nodeName] + if len(incrementalEgressIPs) < len(fullEgressIPs)/2 { + emptyNodes = append(emptyNodes, nodeName) + } + } + + if len(emptyNodes) > 0 { + // Make a new incremental allocation, but skipping all of the egress IPs + // that got assigned to the "empty" nodes in the full reallocation; this + // will cause them to be dropped from their current nodes and then later + // reassigned (to one of the "empty" nodes, for balance). + allocation, alreadyAllocated = eit.makeEmptyAllocation() + for _, nodeName := range emptyNodes { + for _, egressIP := range fullReallocation[nodeName] { + alreadyAllocated[egressIP] = true + } } + eit.allocateExistingEgressIPs(allocation, alreadyAllocated) + eit.allocateNewEgressIPs(allocation, alreadyAllocated) + eit.updateEgressCIDRs = true } + return allocation } diff --git a/pkg/network/common/egressip_test.go b/pkg/network/common/egressip_test.go index 91143892ab88..393f9a92a9f7 100644 --- a/pkg/network/common/egressip_test.go +++ b/pkg/network/common/egressip_test.go @@ -73,6 +73,20 @@ func (w *testEIPWatcher) assertNoChanges() error { return w.assertChanges() } +func (w *testEIPWatcher) flushChanges() { + w.changes = []string{} +} + +func (w *testEIPWatcher) assertUpdateEgressCIDRsNotification() error { + for _, change := range w.changes { + if change == "update egress CIDRs" { + w.flushChanges() + return nil + } + } + return fmt.Errorf("expected change \"update egress CIDRs\", got %#v", w.changes) +} + func setupEgressIPTracker(t *testing.T) (*EgressIPTracker, *testEIPWatcher) { watcher := &testEIPWatcher{} return NewEgressIPTracker(watcher), watcher @@ -864,9 +878,6 @@ func TestEgressCIDRAllocation(t *testing.T) { t.Fatalf("%v", err) } allocation = eit.ReallocateEgressIPs() - if len(allocation) != 0 { - t.Fatalf("Unexpected allocation: %#v", allocation) - } updateAllocations(eit, allocation) err = w.assertNoChanges() if err != nil { @@ -947,24 +958,33 @@ func TestEgressCIDRAllocation(t *testing.T) { t.Fatalf("%v", err) } - // Changing the EgressIPs of a namespace should drop the old allocation and create a new one + // Changing/Removing the EgressIPs of a namespace should drop the old allocation and create a new one updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ NetID: 46, EgressIPs: []string{"172.17.0.202"}, // was 172.17.0.200 }) + updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ + NetID: 44, + EgressIPs: []string{}, // was 172.17.1.1 + }) err = w.assertChanges( "release 172.17.0.200 on 172.17.0.4", "namespace 46 dropped", "update egress CIDRs", + "release 172.17.1.1 on 172.17.0.3", + "namespace 44 normal", + "update egress CIDRs", ) if err != nil { t.Fatalf("%v", err) } allocation = eit.ReallocateEgressIPs() - for _, ip := range allocation["node-4"] { - if ip == "172.17.0.200" { - t.Fatalf("reallocation failed to drop unused egress IP 172.17.0.200: %#v", allocation) + for _, nodeAllocation := range allocation { + for _, ip := range nodeAllocation { + if ip == "172.17.1.1" || ip == "172.17.0.200" { + t.Fatalf("reallocation failed to drop unused egress IP %s: %#v", ip, allocation) + } } } updateAllocations(eit, allocation) @@ -972,6 +992,7 @@ func TestEgressCIDRAllocation(t *testing.T) { "claim 172.17.0.202 on 172.17.0.4 for namespace 46", "namespace 46 via 172.17.0.202 on 172.17.0.4", "update egress CIDRs", + "update egress CIDRs", ) if err != nil { t.Fatalf("%v", err) @@ -1031,3 +1052,132 @@ func TestEgressNodeRenumbering(t *testing.T) { t.Fatalf("%v", err) } } + +func TestEgressCIDRAllocationOffline(t *testing.T) { + eit, w := setupEgressIPTracker(t) + + // Create nodes... + updateHostSubnetEgress(eit, &networkapi.HostSubnet{ + HostIP: "172.17.0.3", + EgressIPs: []string{}, + EgressCIDRs: []string{"172.17.0.0/24", "172.17.1.0/24"}, + }) + updateHostSubnetEgress(eit, &networkapi.HostSubnet{ + HostIP: "172.17.0.4", + EgressIPs: []string{}, + EgressCIDRs: []string{"172.17.0.0/24"}, + }) + updateHostSubnetEgress(eit, &networkapi.HostSubnet{ + HostIP: "172.17.0.5", + EgressIPs: []string{}, + EgressCIDRs: []string{"172.17.1.0/24"}, + }) + + // Create namespaces + updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ + NetID: 100, + EgressIPs: []string{"172.17.0.100"}, + }) + updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ + NetID: 101, + EgressIPs: []string{"172.17.0.101"}, + }) + updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ + NetID: 102, + EgressIPs: []string{"172.17.0.102"}, + }) + updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ + NetID: 200, + EgressIPs: []string{"172.17.1.200"}, + }) + updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ + NetID: 201, + EgressIPs: []string{"172.17.1.201"}, + }) + updateNetNamespaceEgress(eit, &networkapi.NetNamespace{ + NetID: 202, + EgressIPs: []string{"172.17.1.202"}, + }) + + // In a perfect world, we'd get 2 IPs on each node, but depending on processing + // order, this isn't guaranteed. Eg, if the three 172.17.0.x IPs get processed + // first, we could get two of them on node-3 and one on node-4. Then the first two + // 172.17.1.x IPs get assigned to node-5, and the last one could go to either + // node-3 or node-5. Regardless of order, node-3 is guaranteed to get at least + // two IPs since there's no way either node-4 or node-5 could be assigned a + // third IP if node-3 still only had one. + allocation := eit.ReallocateEgressIPs() + node3ips := allocation["node-3"] + node4ips := allocation["node-4"] + node5ips := allocation["node-5"] + if len(node3ips) < 2 || len(node4ips) == 0 || len(node5ips) == 0 || + len(node3ips)+len(node4ips)+len(node5ips) != 6 { + t.Fatalf("Bad IP allocation: %#v", allocation) + } + updateAllocations(eit, allocation) + + w.flushChanges() + + // Now take node-3 offline + eit.SetNodeOffline("172.17.0.3", true) + err := w.assertUpdateEgressCIDRsNotification() + if err != nil { + t.Fatalf("%v", err) + } + + // First reallocation should empty out node-3 + allocation = eit.ReallocateEgressIPs() + if node3ips, ok := allocation["node-3"]; !ok || len(node3ips) != 0 { + t.Fatalf("Bad IP allocation: %#v", allocation) + } + updateAllocations(eit, allocation) + + err = w.assertUpdateEgressCIDRsNotification() + if err != nil { + t.Fatalf("%v", err) + } + + // Next reallocation should reassign egress IPs to node-4 and node-5 + allocation = eit.ReallocateEgressIPs() + node3ips = allocation["node-3"] + node4ips = allocation["node-4"] + node5ips = allocation["node-5"] + if len(node3ips) != 0 || len(node4ips) != 3 || len(node5ips) != 3 { + t.Fatalf("Bad IP allocation: %#v", allocation) + } + updateAllocations(eit, allocation) + + // Bring node-3 back + eit.SetNodeOffline("172.17.0.3", false) + err = w.assertUpdateEgressCIDRsNotification() + if err != nil { + t.Fatalf("%v", err) + } + + // First reallocation should remove some IPs from node-4 and node-5 but not add + // them to node-3. As above, the "balanced" allocation we're aiming for may not + // be perfect, but it has to be planning to assign at least 2 IPs to node-3. + allocation = eit.ReallocateEgressIPs() + node3ips = allocation["node-3"] + node4ips = allocation["node-4"] + node5ips = allocation["node-5"] + if len(node3ips) != 0 || len(node4ips)+len(node5ips) > 4 { + t.Fatalf("Bad IP allocation: %#v", allocation) + } + updateAllocations(eit, allocation) + + err = w.assertUpdateEgressCIDRsNotification() + if err != nil { + t.Fatalf("%v", err) + } + + // Next reallocation should reassign egress IPs to node-3 + allocation = eit.ReallocateEgressIPs() + node3ips = allocation["node-3"] + node4ips = allocation["node-4"] + node5ips = allocation["node-5"] + if len(node3ips) < 2 || len(node4ips) == 0 || len(node5ips) == 0 { + t.Fatalf("Bad IP allocation: %#v", allocation) + } + updateAllocations(eit, allocation) +} diff --git a/pkg/network/master/egressip.go b/pkg/network/master/egressip.go index 38b149e48c8d..528d69d2a2c6 100644 --- a/pkg/network/master/egressip.go +++ b/pkg/network/master/egressip.go @@ -5,7 +5,10 @@ import ( "sync" "time" + "github.com/golang/glog" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" @@ -23,6 +26,15 @@ type egressIPManager struct { updatePending bool updatedAgain bool + + monitorNodes map[string]*egressNode + stop chan struct{} +} + +type egressNode struct { + ip string + offline bool + retries int } func newEgressIPManager() *egressIPManager { @@ -75,6 +87,7 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) { // we won't process that until this reallocation is complete. allocation := eim.tracker.ReallocateEgressIPs() + monitorNodes := make(map[string]*egressNode, len(allocation)) for nodeName, egressIPs := range allocation { resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error { hs, err := eim.hostSubnetInformer.Lister().Get(nodeName) @@ -82,8 +95,18 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) { return err } - hs.EgressIPs = egressIPs - _, err = eim.networkClient.Network().HostSubnets().Update(hs) + if node := eim.monitorNodes[hs.HostIP]; node != nil { + monitorNodes[hs.HostIP] = node + } else { + monitorNodes[hs.HostIP] = &egressNode{ip: hs.HostIP} + } + + oldIPs := sets.NewString(hs.EgressIPs...) + newIPs := sets.NewString(egressIPs...) + if !oldIPs.Equal(newIPs) { + hs.EgressIPs = egressIPs + _, err = eim.networkClient.Network().HostSubnets().Update(hs) + } return err }) if resultErr != nil { @@ -91,9 +114,82 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) { } } + eim.monitorNodes = monitorNodes + if len(monitorNodes) > 0 { + if eim.stop == nil { + eim.stop = make(chan struct{}) + go eim.poll(eim.stop) + } + } else { + if eim.stop != nil { + close(eim.stop) + eim.stop = nil + } + } + return true, nil } +const ( + pollInterval = 5 * time.Second + repollInterval = time.Second + maxRetries = 2 +) + +func (eim *egressIPManager) poll(stop chan struct{}) { + retry := false + for { + select { + case <-stop: + return + default: + } + + start := time.Now() + retry := eim.check(retry) + if !retry { + // If less than pollInterval has passed since start, then sleep until it has + time.Sleep(start.Add(pollInterval).Sub(time.Now())) + } + } +} + +func (eim *egressIPManager) check(retrying bool) bool { + var timeout time.Duration + if retrying { + timeout = repollInterval + } else { + timeout = pollInterval + } + + needRetry := false + for _, node := range eim.monitorNodes { + if retrying && node.retries == 0 { + continue + } + + online := eim.tracker.Ping(node.ip, timeout) + if node.offline && online { + glog.Infof("Node %s is back online", node.ip) + node.offline = false + eim.tracker.SetNodeOffline(node.ip, false) + } else if !node.offline && !online { + node.retries++ + if node.retries > maxRetries { + glog.Warningf("Node %s is offline", node.ip) + node.retries = 0 + node.offline = true + eim.tracker.SetNodeOffline(node.ip, true) + } else { + glog.V(2).Infof("Node %s may be offline... retrying", node.ip) + needRetry = true + } + } + } + + return needRetry +} + func (eim *egressIPManager) ClaimEgressIP(vnid uint32, egressIP, nodeIP string) { }