Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 109 additions & 64 deletions internal/controllers/machinedeployment/machinedeployment_rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package machinedeployment

import (
"context"
"fmt"
"sort"

"github.com/pkg/errors"
Expand All @@ -33,6 +34,7 @@ import (

// rolloutRolling implements the logic for rolling a new MachineSet.
func (r *Reconciler) rolloutRolling(ctx context.Context, md *clusterv1.MachineDeployment, msList []*clusterv1.MachineSet, templateExists bool) error {
// TODO(in-place): move create newMS into rolloutPlanner
newMS, oldMSs, err := r.getAllMachineSetsAndSyncRevision(ctx, md, msList, true, templateExists)
if err != nil {
return err
Expand All @@ -47,18 +49,23 @@ func (r *Reconciler) rolloutRolling(ctx context.Context, md *clusterv1.MachineDe

allMSs := append(oldMSs, newMS)

// Scale up, if we can.
if err := r.reconcileNewMachineSet(ctx, allMSs, newMS, md); err != nil {
// TODO(in-place): also apply/remove labels to MS should go into rolloutPlanner
if err := r.cleanupDisableMachineCreateAnnotation(ctx, newMS); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, md); err != nil {
planner := newRolloutPlanner()
if err := planner.Plan(ctx, md, newMS, oldMSs); err != nil {
return err
}

// Scale down, if we can.
if err := r.reconcileOldMachineSets(ctx, allMSs, oldMSs, newMS, md); err != nil {
return err
// TODO(in-place): this should be changed as soon as rolloutPlanner support MS creation and adding/removing labels from MS
for _, ms := range allMSs {
if scaleIntent, ok := planner.scaleIntents[ms.Name]; ok {
if err := r.scaleMachineSet(ctx, ms, scaleIntent, md); err != nil {
return err
}
}
}

if err := r.syncDeploymentStatus(allMSs, newMS, md); err != nil {
Expand All @@ -74,48 +81,78 @@ func (r *Reconciler) rolloutRolling(ctx context.Context, md *clusterv1.MachineDe
return nil
}

func (r *Reconciler) reconcileNewMachineSet(ctx context.Context, allMSs []*clusterv1.MachineSet, newMS *clusterv1.MachineSet, deployment *clusterv1.MachineDeployment) error {
if err := r.cleanupDisableMachineCreateAnnotation(ctx, newMS); err != nil {
return err
type rolloutPlanner struct {
scaleIntents map[string]int32
}

func newRolloutPlanner() *rolloutPlanner {
return &rolloutPlanner{
scaleIntents: make(map[string]int32),
}
}

if deployment.Spec.Replicas == nil {
return errors.Errorf("spec.replicas for MachineDeployment %v is nil, this is unexpected", client.ObjectKeyFromObject(deployment))
// Plan determine if it is.
func (p *rolloutPlanner) Plan(ctx context.Context, md *clusterv1.MachineDeployment, newMS *clusterv1.MachineSet, oldMSs []*clusterv1.MachineSet) error {
if md.Spec.Replicas == nil {
return errors.Errorf("spec.replicas for MachineDeployment %v is nil, this is unexpected", client.ObjectKeyFromObject(md))
}

if newMS.Spec.Replicas == nil {
return errors.Errorf("spec.replicas for MachineSet %v is nil, this is unexpected", client.ObjectKeyFromObject(newMS))
}

if *(newMS.Spec.Replicas) == *(deployment.Spec.Replicas) {
for _, oldMS := range oldMSs {
if oldMS.Spec.Replicas == nil {
return errors.Errorf("spec.replicas for MachineSet %v is nil, this is unexpected", client.ObjectKeyFromObject(oldMS))
}
}

// Scale up, if we can.
if err := p.reconcileNewMachineSet(ctx, md, newMS, oldMSs); err != nil {
return err
}

// Scale down, if we can.
return p.reconcileOldMachineSets(ctx, md, newMS, oldMSs)
}

func (p *rolloutPlanner) reconcileNewMachineSet(ctx context.Context, md *clusterv1.MachineDeployment, newMS *clusterv1.MachineSet, oldMSs []*clusterv1.MachineSet) error {
log := ctrl.LoggerFrom(ctx)
allMSs := append(oldMSs, newMS)

if *(newMS.Spec.Replicas) == *(md.Spec.Replicas) {
// Scaling not required.
return nil
}

if *(newMS.Spec.Replicas) > *(deployment.Spec.Replicas) {
if *(newMS.Spec.Replicas) > *(md.Spec.Replicas) {
// Scale down.
return r.scaleMachineSet(ctx, newMS, *(deployment.Spec.Replicas), deployment)
log.V(5).Info(fmt.Sprintf("Setting scale down intent for %s to %d replicas", newMS.Name, *(md.Spec.Replicas)), "machineset", client.ObjectKeyFromObject(newMS).String())
p.scaleIntents[newMS.Name] = *(md.Spec.Replicas)
return nil
}

newReplicasCount, err := mdutil.NewMSNewReplicas(deployment, allMSs, *newMS.Spec.Replicas)
newReplicasCount, err := mdutil.NewMSNewReplicas(md, allMSs, *newMS.Spec.Replicas)
if err != nil {
return err
}
return r.scaleMachineSet(ctx, newMS, newReplicasCount, deployment)
}

func (r *Reconciler) reconcileOldMachineSets(ctx context.Context, allMSs []*clusterv1.MachineSet, oldMSs []*clusterv1.MachineSet, newMS *clusterv1.MachineSet, deployment *clusterv1.MachineDeployment) error {
log := ctrl.LoggerFrom(ctx)

if deployment.Spec.Replicas == nil {
return errors.Errorf("spec.replicas for MachineDeployment %v is nil, this is unexpected",
client.ObjectKeyFromObject(deployment))
if newReplicasCount < *(newMS.Spec.Replicas) {
scaleDownCount := *(newMS.Spec.Replicas) - newReplicasCount
log.V(5).Info(fmt.Sprintf("Setting scale down intent for %s to %d replicas (-%d)", newMS.Name, newReplicasCount, scaleDownCount), "machineset", client.ObjectKeyFromObject(newMS).String())
p.scaleIntents[newMS.Name] = newReplicasCount
}

if newMS.Spec.Replicas == nil {
return errors.Errorf("spec.replicas for MachineSet %v is nil, this is unexpected",
client.ObjectKeyFromObject(newMS))
if newReplicasCount > *(newMS.Spec.Replicas) {
scaleUpCount := newReplicasCount - *(newMS.Spec.Replicas)
log.V(5).Info(fmt.Sprintf("Setting scale up intent for %s to %d replicas (+%d)", newMS.Name, newReplicasCount, scaleUpCount), "machineset", client.ObjectKeyFromObject(newMS).String())
p.scaleIntents[newMS.Name] = newReplicasCount
}
return nil
}

func (p *rolloutPlanner) reconcileOldMachineSets(ctx context.Context, md *clusterv1.MachineDeployment, newMS *clusterv1.MachineSet, oldMSs []*clusterv1.MachineSet) error {
log := ctrl.LoggerFrom(ctx)
allMSs := append(oldMSs, newMS)

oldMachinesCount := mdutil.GetReplicaCountForMachineSets(oldMSs)
if oldMachinesCount == 0 {
Expand All @@ -126,7 +163,7 @@ func (r *Reconciler) reconcileOldMachineSets(ctx context.Context, allMSs []*clus
allMachinesCount := mdutil.GetReplicaCountForMachineSets(allMSs)
log.V(4).Info("New MachineSet has available machines",
"machineset", client.ObjectKeyFromObject(newMS).String(), "available-replicas", ptr.Deref(newMS.Status.AvailableReplicas, 0))
maxUnavailable := mdutil.MaxUnavailable(*deployment)
maxUnavailable := mdutil.MaxUnavailable(*md)

// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old MachineSets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
Expand Down Expand Up @@ -160,7 +197,7 @@ func (r *Reconciler) reconcileOldMachineSets(ctx context.Context, allMSs []*clus
// allow the new MachineSet to be scaled up by 5.
availableReplicas := ptr.Deref(newMS.Status.AvailableReplicas, 0)

minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
minAvailable := *(md.Spec.Replicas) - maxUnavailable
newMSUnavailableMachineCount := *(newMS.Spec.Replicas) - availableReplicas
maxScaledDown := allMachinesCount - minAvailable - newMSUnavailableMachineCount
if maxScaledDown <= 0 {
Expand All @@ -169,17 +206,15 @@ func (r *Reconciler) reconcileOldMachineSets(ctx context.Context, allMSs []*clus

// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldMSs, cleanupCount, err := r.cleanupUnhealthyReplicas(ctx, oldMSs, deployment, maxScaledDown)
oldMSs, cleanupCount, err := p.cleanupUnhealthyReplicas(ctx, oldMSs, maxScaledDown)
if err != nil {
return err
}

log.V(4).Info("Cleaned up unhealthy replicas from old MachineSets", "count", cleanupCount)

// Scale down old MachineSets, need check maxUnavailable to ensure we can scale down
allMSs = oldMSs
allMSs = append(allMSs, newMS)
scaledDownCount, err := r.scaleDownOldMachineSetsForRollingUpdate(ctx, allMSs, oldMSs, deployment)
scaledDownCount, err := p.scaleDownOldMachineSetsForRollingUpdate(ctx, md, newMS, oldMSs)
if err != nil {
return err
}
Expand All @@ -189,7 +224,7 @@ func (r *Reconciler) reconcileOldMachineSets(ctx context.Context, allMSs []*clus
}

// cleanupUnhealthyReplicas will scale down old MachineSets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (r *Reconciler) cleanupUnhealthyReplicas(ctx context.Context, oldMSs []*clusterv1.MachineSet, deployment *clusterv1.MachineDeployment, maxCleanupCount int32) ([]*clusterv1.MachineSet, int32, error) {
func (p *rolloutPlanner) cleanupUnhealthyReplicas(ctx context.Context, oldMSs []*clusterv1.MachineSet, maxCleanupCount int32) ([]*clusterv1.MachineSet, int32, error) {
log := ctrl.LoggerFrom(ctx)

sort.Sort(mdutil.MachineSetsByCreationTimestamp(oldMSs))
Expand All @@ -200,42 +235,47 @@ func (r *Reconciler) cleanupUnhealthyReplicas(ctx context.Context, oldMSs []*clu
// This results in a best effort to remove machines backing unhealthy nodes.
totalScaledDown := int32(0)

for _, targetMS := range oldMSs {
if targetMS.Spec.Replicas == nil {
return nil, 0, errors.Errorf("spec.replicas for MachineSet %v is nil, this is unexpected", client.ObjectKeyFromObject(targetMS))
for _, oldMS := range oldMSs {
if oldMS.Spec.Replicas == nil {
return nil, 0, errors.Errorf("spec.replicas for MachineSet %v is nil, this is unexpected", client.ObjectKeyFromObject(oldMS))
}

if totalScaledDown >= maxCleanupCount {
break
}

oldMSReplicas := *(targetMS.Spec.Replicas)
oldMSReplicas := *(oldMS.Spec.Replicas)
if oldMSReplicas == 0 {
// cannot scale down this MachineSet.
continue
}

oldMSAvailableReplicas := ptr.Deref(targetMS.Status.AvailableReplicas, 0)
oldMSAvailableReplicas := ptr.Deref(oldMS.Status.AvailableReplicas, 0)
log.V(4).Info("Found available Machines in old MachineSet",
"count", oldMSAvailableReplicas, "target-machineset", client.ObjectKeyFromObject(targetMS).String())
"count", oldMSAvailableReplicas, "target-machineset", client.ObjectKeyFromObject(oldMS).String())
if oldMSReplicas == oldMSAvailableReplicas {
// no unhealthy replicas found, no scaling required.
continue
}

// TODO(in-place): fix this logic
// It looks like that the current logic fails when the MD controller is called twice in a row, without MS controller being triggered in the between, e.g.
// - first reconcile scales down ms1, 6-->5 (-1)
// - second reconcile is not taking into account scales down already in progress, unhealthy count is wrongly computed as -1 instead of 0, this leads to increasing replica count instead of keeping it as it is (or scaling down), and then the safeguard below errors out.

remainingCleanupCount := maxCleanupCount - totalScaledDown
unhealthyCount := oldMSReplicas - oldMSAvailableReplicas
scaledDownCount := min(remainingCleanupCount, unhealthyCount)
newReplicasCount := oldMSReplicas - scaledDownCount

if newReplicasCount > oldMSReplicas {
return nil, 0, errors.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %v: %d -> %d",
client.ObjectKeyFromObject(targetMS), oldMSReplicas, newReplicasCount)
client.ObjectKeyFromObject(oldMS), oldMSReplicas, newReplicasCount)
}

if err := r.scaleMachineSet(ctx, targetMS, newReplicasCount, deployment); err != nil {
return nil, totalScaledDown, err
}
scaleDownCount := *(oldMS.Spec.Replicas) - newReplicasCount
log.V(5).Info(fmt.Sprintf("Setting scale down intent for %s to %d replicas (-%d)", oldMS.Name, newReplicasCount, scaleDownCount), "machineset", client.ObjectKeyFromObject(oldMS).String())
p.scaleIntents[oldMS.Name] = newReplicasCount

totalScaledDown += scaledDownCount
}
Expand All @@ -245,15 +285,12 @@ func (r *Reconciler) cleanupUnhealthyReplicas(ctx context.Context, oldMSs []*clu

// scaleDownOldMachineSetsForRollingUpdate scales down old MachineSets when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability.
func (r *Reconciler) scaleDownOldMachineSetsForRollingUpdate(ctx context.Context, allMSs []*clusterv1.MachineSet, oldMSs []*clusterv1.MachineSet, deployment *clusterv1.MachineDeployment) (int32, error) {
func (p *rolloutPlanner) scaleDownOldMachineSetsForRollingUpdate(ctx context.Context, md *clusterv1.MachineDeployment, newMS *clusterv1.MachineSet, oldMSs []*clusterv1.MachineSet) (int32, error) {
log := ctrl.LoggerFrom(ctx)
allMSs := append(oldMSs, newMS)

if deployment.Spec.Replicas == nil {
return 0, errors.Errorf("spec.replicas for MachineDeployment %v is nil, this is unexpected", client.ObjectKeyFromObject(deployment))
}

maxUnavailable := mdutil.MaxUnavailable(*deployment)
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
maxUnavailable := mdutil.MaxUnavailable(*md)
minAvailable := *(md.Spec.Replicas) - maxUnavailable

// Find the number of available machines.
availableMachineCount := ptr.Deref(mdutil.GetAvailableReplicaCountForMachineSets(allMSs), 0)
Expand All @@ -268,34 +305,42 @@ func (r *Reconciler) scaleDownOldMachineSetsForRollingUpdate(ctx context.Context

sort.Sort(mdutil.MachineSetsByCreationTimestamp(oldMSs))

// TODO(in-place): fix this logic
// It looks like that the current logic fails when the MD controller is called twice in a row e.g. reconcile of md 6 replicas MaxSurge=3, MaxUnavailable=1 and
// ms1, 6/5 replicas << one is scaling down, but scale down not yet processed by the MS controller.
// ms2, 3/3 replicas
// Leads to:
// ms1, 6/1 replicas << it further scaled down by 4, which leads to totAvailable machines is less than MinUnavailable, which should not happen
// ms2, 3/3 replicas

totalScaledDown := int32(0)
totalScaleDownCount := availableMachineCount - minAvailable
for _, targetMS := range oldMSs {
if targetMS.Spec.Replicas == nil {
return 0, errors.Errorf("spec.replicas for MachineSet %v is nil, this is unexpected", client.ObjectKeyFromObject(targetMS))
}

for _, oldMS := range oldMSs {
if totalScaledDown >= totalScaleDownCount {
// No further scaling required.
break
}

if *(targetMS.Spec.Replicas) == 0 {
scaleIntent := ptr.Deref(oldMS.Spec.Replicas, 0)
if v, ok := p.scaleIntents[oldMS.Name]; ok {
scaleIntent = v
}

if scaleIntent == 0 {
// cannot scale down this MachineSet.
continue
}

// Scale down.
scaleDownCount := min(*(targetMS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
newReplicasCount := *(targetMS.Spec.Replicas) - scaleDownCount
if newReplicasCount > *(targetMS.Spec.Replicas) {
scaleDownCount := min(scaleIntent, totalScaleDownCount-totalScaledDown)
newReplicasCount := scaleIntent - scaleDownCount
if newReplicasCount > scaleIntent {
return totalScaledDown, errors.Errorf("when scaling down old MachineSet, got invalid request to scale down %v: %d -> %d",
client.ObjectKeyFromObject(targetMS), *(targetMS.Spec.Replicas), newReplicasCount)
client.ObjectKeyFromObject(oldMS), scaleIntent, newReplicasCount)
}

if err := r.scaleMachineSet(ctx, targetMS, newReplicasCount, deployment); err != nil {
return totalScaledDown, err
}
log.V(5).Info(fmt.Sprintf("Setting scale down intent for %s to %d replicas (-%d)", oldMS.Name, newReplicasCount, scaleDownCount), "machineset", client.ObjectKeyFromObject(oldMS).String())
p.scaleIntents[oldMS.Name] = newReplicasCount

totalScaledDown += scaleDownCount
}
Expand Down
Loading