Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope) error {
}

if md.Spec.Rollout.Strategy.Type == clusterv1.RollingUpdateMachineDeploymentStrategyType {
return r.rolloutRolling(ctx, md, s.machineSets, templateExists)
return r.rolloutRollingUpdate(ctx, md, s.machineSets, templateExists)
}

if md.Spec.Rollout.Strategy.Type == clusterv1.OnDeleteMachineDeploymentStrategyType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,53 @@ package machinedeployment
import (
"context"
"fmt"
"sort"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
"sigs.k8s.io/cluster-api/util/patch"
)

// rolloutOnDelete implements the logic for the OnDelete rollout strategy.
// rolloutOnDelete reconcile machine sets controlled by a MachineDeployment that is using the OnDelete strategy.
func (r *Reconciler) rolloutOnDelete(ctx context.Context, md *clusterv1.MachineDeployment, msList []*clusterv1.MachineSet, templateExists bool) error {
// TODO(in-place): move create newMS into rolloutPlanner
newMS, oldMSs, err := r.getAllMachineSetsAndSyncRevision(ctx, md, msList, true, templateExists)
if err != nil {
return err
}

// newMS can be nil in case there is already a MachineSet associated with this deployment,
// but there are only either changes in annotations or MinReadySeconds. Or in other words,
// this can be nil if there are changes, but no replacement of existing machines is needed.
if newMS == nil {
return nil
planner := newRolloutPlanner()
planner.md = md
planner.newMS = newMS
planner.oldMSs = oldMSs

if err := planner.planOnDelete(ctx); err != nil {
return err
}

allMSs := append(oldMSs, newMS)

// Scale up, if we can.
if err := r.reconcileNewMachineSetOnDelete(ctx, md, oldMSs, newMS); err != nil {
// TODO(in-place): also apply/remove labels to MS should go into rolloutPlanner
if err := r.cleanupDisableMachineCreateAnnotation(ctx, newMS); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, md); err != nil {
if err := r.addDisableMachineCreateAnnotation(ctx, oldMSs); err != nil {
return err
}

// Scale down, if we can.
if err := r.reconcileOldMachineSetsOnDelete(ctx, oldMSs, allMSs, md); err != nil {
return err
// TODO(in-place): this should be changed as soon as rolloutPlanner support MS creation and adding/removing labels from MS
for _, ms := range allMSs {
scaleIntent := ptr.Deref(ms.Spec.Replicas, 0)
if v, ok := planner.scaleIntents[ms.Name]; ok {
scaleIntent = v
}
if err := r.scaleMachineSet(ctx, ms, scaleIntent, md); err != nil {
return err
}
}

if err := r.syncDeploymentStatus(allMSs, newMS, md); err != nil {
Expand All @@ -75,115 +81,109 @@ func (r *Reconciler) rolloutOnDelete(ctx context.Context, md *clusterv1.MachineD
return nil
}

// planOnDelete determine how to proceed with the rollout when using the OnDelete strategy if we are not yet at the desired state.
func (p *rolloutPlanner) planOnDelete(ctx context.Context) error {
// Scale up, if we can.
if err := p.reconcileNewMachineSet(ctx); err != nil {
return err
}

// Scale down, if we can.
p.reconcileOldMachineSetsOnDelete(ctx)
return nil
}

// reconcileOldMachineSetsOnDelete handles reconciliation of Old MachineSets associated with the MachineDeployment in the OnDelete rollout strategy.
func (r *Reconciler) reconcileOldMachineSetsOnDelete(ctx context.Context, oldMSs []*clusterv1.MachineSet, allMSs []*clusterv1.MachineSet, deployment *clusterv1.MachineDeployment) error {
func (p *rolloutPlanner) reconcileOldMachineSetsOnDelete(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)
if deployment.Spec.Replicas == nil {
return errors.Errorf("spec replicas for MachineDeployment %q/%q is nil, this is unexpected",
deployment.Namespace, deployment.Name)
oldMachinesCount := mdutil.GetReplicaCountForMachineSets(p.oldMSs)
if oldMachinesCount == 0 {
// Can't scale down further
return
}
log.V(4).Info("Checking to see if machines have been deleted or are in the process of deleting for old machine sets")
totalReplicas := mdutil.GetReplicaCountForMachineSets(allMSs)
scaleDownAmount := totalReplicas - *deployment.Spec.Replicas
for _, oldMS := range oldMSs {
log := log.WithValues("MachineSet", klog.KObj(oldMS))
if oldMS.Spec.Replicas == nil || *oldMS.Spec.Replicas <= 0 {
log.V(4).Info("fully scaled down")

// Determine if there are replicas to be scaled down due to a scale down in MD.
newMSReplicas := ptr.Deref(p.newMS.Spec.Replicas, 0)
if v, ok := p.scaleIntents[p.newMS.Name]; ok {
newMSReplicas = v
}
totReplicas := oldMachinesCount + newMSReplicas
totalScaleDownCount := max(totReplicas-ptr.Deref(p.md.Spec.Replicas, 0), 0)

// Sort oldMSs so the system will start deleting from the oldest MS first.
sort.Sort(mdutil.MachineSetsByCreationTimestamp(p.oldMSs))

// Start scaling down old machine sets to acknowledge spec.replicas without corresponding status.replicas.
// Note: spec.replicas without corresponding status.replicas exists
// - after a user manually deletes a replica
// - when a newMS non yet fully provisioned suddenly becomes an oldMS.
// In both cases spec.replicas without corresponding status.replicas should be dropped, no matter
// if there are replicas to be scaled down due to a scale down in MD or not.
// However, if there are replicas to be scaled down due to a scale down in MD, deleted replicas should
// be deducted from the totalScaleDownCount.
for _, oldMS := range p.oldMSs {
// No op if this MS has been already scaled down to zero.
if ptr.Deref(oldMS.Spec.Replicas, 0) <= 0 {
continue
}
if oldMS.Annotations == nil {
oldMS.Annotations = map[string]string{}

scaleDownCount := max(ptr.Deref(oldMS.Spec.Replicas, 0)-ptr.Deref(oldMS.Status.Replicas, 0), 0)
if scaleDownCount > 0 {
newScaleIntent := max(ptr.Deref(oldMS.Spec.Replicas, 0)-scaleDownCount, 0)
log.V(5).Info(fmt.Sprintf("Setting scale down intent for MachineSet %s to %d replicas (-%d)", oldMS.Name, newScaleIntent, scaleDownCount), "MachineSet", klog.KObj(oldMS))
p.scaleIntents[oldMS.Name] = newScaleIntent

totalScaleDownCount -= scaleDownCount
}
}

// Scale down additional replicas if replicas removed in the for loop above were not enough to align to MD replicas.
for _, oldMS := range p.oldMSs {
// No op if there is no scaling down left.
if totalScaleDownCount <= 0 {
break
}

// No op if this MS has been already scaled down to zero.
scaleIntent := ptr.Deref(oldMS.Spec.Replicas, 0)
if v, ok := p.scaleIntents[oldMS.Name]; ok {
scaleIntent = min(scaleIntent, v)
}

if scaleIntent <= 0 {
continue
}

scaleDownCount := min(scaleIntent, totalScaleDownCount)
if scaleDownCount > 0 {
newScaleIntent := max(ptr.Deref(oldMS.Spec.Replicas, 0)-scaleDownCount, 0)
log.V(5).Info(fmt.Sprintf("Setting scale down intent for MachineSet %s to %d replicas (-%d)", oldMS.Name, newScaleIntent, scaleDownCount), "MachineSet", klog.KObj(oldMS))
p.scaleIntents[oldMS.Name] = newScaleIntent

totalScaleDownCount -= scaleDownCount
}
}
}

// addDisableMachineCreateAnnotation will add the disable machine create annotation to old MachineSets.
func (r *Reconciler) addDisableMachineCreateAnnotation(ctx context.Context, oldMSs []*clusterv1.MachineSet) error {
for _, oldMS := range oldMSs {
log := ctrl.LoggerFrom(ctx, "MachineSet", klog.KObj(oldMS))
if _, ok := oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation]; !ok {
log.V(4).Info("setting annotation on old MachineSet to disable machine creation")
log.V(4).Info("adding annotation on old MachineSet to disable machine creation")
patchHelper, err := patch.NewHelper(oldMS, r.Client)
if err != nil {
return err
}
if oldMS.Annotations == nil {
oldMS.Annotations = map[string]string{}
}
oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation] = "true"
if err := patchHelper.Patch(ctx, oldMS); err != nil {
err = patchHelper.Patch(ctx, oldMS)
if err != nil {
return err
}
}
selectorMap, err := metav1.LabelSelectorAsMap(&oldMS.Spec.Selector)
if err != nil {
log.V(4).Info("Failed to convert MachineSet label selector to a map", "err", err)
continue
}
log.V(4).Info("Fetching Machines associated with MachineSet")
// Get all Machines linked to this MachineSet.
allMachinesInOldMS := &clusterv1.MachineList{}
if err := r.Client.List(ctx,
allMachinesInOldMS,
client.InNamespace(oldMS.Namespace),
client.MatchingLabels(selectorMap),
); err != nil {
return errors.Wrap(err, "failed to list machines")
}
totalMachineCount := int32(len(allMachinesInOldMS.Items))
log.V(4).Info("Retrieved machines", "totalMachineCount", totalMachineCount)
updatedReplicaCount := totalMachineCount - mdutil.GetDeletingMachineCount(allMachinesInOldMS)
if updatedReplicaCount < 0 {
return errors.Errorf("negative updated replica count %d for MachineSet %q, this is unexpected", updatedReplicaCount, oldMS.Name)
}
machineSetScaleDownAmountDueToMachineDeletion := *oldMS.Spec.Replicas - updatedReplicaCount
if machineSetScaleDownAmountDueToMachineDeletion < 0 {
log.V(4).Info(fmt.Sprintf("Error reconciling MachineSet %s", oldMS.Name), "err", errors.Errorf("Unexpected negative scale down amount: %d", machineSetScaleDownAmountDueToMachineDeletion))
}
scaleDownAmount -= machineSetScaleDownAmountDueToMachineDeletion
log.V(4).Info("Adjusting replica count for deleted machines", "oldReplicas", oldMS.Spec.Replicas, "newReplicas", updatedReplicaCount)
log.V(4).Info("Scaling down", "replicas", updatedReplicaCount)
if err := r.scaleMachineSet(ctx, oldMS, updatedReplicaCount, deployment); err != nil {
return err
}
}
log.V(4).Info("Finished reconcile of Old MachineSets to account for deleted machines. Now analyzing if there's more potential to scale down")
for _, oldMS := range oldMSs {
log := log.WithValues("MachineSet", klog.KObj(oldMS))
if scaleDownAmount <= 0 {
break
}
if oldMS.Spec.Replicas == nil || *oldMS.Spec.Replicas <= 0 {
log.V(4).Info("Fully scaled down")
continue
}
updatedReplicaCount := *oldMS.Spec.Replicas
if updatedReplicaCount >= scaleDownAmount {
updatedReplicaCount -= scaleDownAmount
scaleDownAmount = 0
} else {
scaleDownAmount -= updatedReplicaCount
updatedReplicaCount = 0
}
log.V(4).Info("Scaling down", "replicas", updatedReplicaCount)
if err := r.scaleMachineSet(ctx, oldMS, updatedReplicaCount, deployment); err != nil {
return err
}
}
log.V(4).Info("Finished reconcile of all old MachineSets")
return nil
}

// reconcileNewMachineSetOnDelete handles reconciliation of the latest MachineSet associated with the MachineDeployment in the OnDelete rollout strategy.
func (r *Reconciler) reconcileNewMachineSetOnDelete(ctx context.Context, md *clusterv1.MachineDeployment, oldMSs []*clusterv1.MachineSet, newMS *clusterv1.MachineSet) error {
// TODO(in-place): also apply/remove labels should go into rolloutPlanner
if err := r.cleanupDisableMachineCreateAnnotation(ctx, newMS); err != nil {
return err
}

planner := newRolloutPlanner()
planner.md = md
planner.newMS = newMS
planner.oldMSs = oldMSs

if err := planner.reconcileNewMachineSet(ctx); err != nil {
return err
}

// TODO(in-place): this should be changed as soon as rolloutPlanner support MS creation and adding/removing labels from MS
scaleIntent := ptr.Deref(newMS.Spec.Replicas, 0)
if v, ok := planner.scaleIntents[newMS.Name]; ok {
scaleIntent = v
}
return r.scaleMachineSet(ctx, newMS, scaleIntent, md)
}
Loading