Skip to content

Commit a651610

Browse files
Move create MS and Sync MS to the rollout planner
1 parent 47c1d22 commit a651610

28 files changed

Lines changed: 1460 additions & 1507 deletions

File tree

internal/controllers/machinedeployment/machinedeployment_controller.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ import (
2020
"context"
2121
"fmt"
2222
"strings"
23+
"time"
2324

2425
"github.com/pkg/errors"
2526
corev1 "k8s.io/api/core/v1"
2627
apierrors "k8s.io/apimachinery/pkg/api/errors"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/labels"
2930
kerrors "k8s.io/apimachinery/pkg/util/errors"
31+
"k8s.io/apimachinery/pkg/util/wait"
3032
"k8s.io/client-go/tools/record"
3133
"k8s.io/klog/v2"
3234
"k8s.io/utils/ptr"
@@ -303,6 +305,84 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope) error {
303305
return errors.Errorf("unexpected deployment strategy type: %s", md.Spec.Rollout.Strategy.Type)
304306
}
305307

308+
// createOrUpdateMachineSets apply changes identified by the rolloutPlanner to both newMS and oldMSs.
309+
// Note: Both newMS and oldMS include the full intent for the SSA apply call with mandatory labels,
310+
// in place propagated fields, the annotations derived from the MachineDeployment, revision annotations
311+
// and also annotations influencing how to perform scale up/down operations.
312+
// scaleIntents instead are handled separately in the rolloutPlanner and should be applied to MachineSets
313+
// before persisting changes.
314+
// Note: When the newMS has been created by the rollout planner, also wait for the cache to be up to date.
315+
func (r *Reconciler) createOrUpdateMachineSets(ctx context.Context, p *rolloutPlanner) error {
316+
log := ctrl.LoggerFrom(ctx)
317+
allMSs := append(p.oldMSs, p.newMS)
318+
319+
for _, ms := range allMSs {
320+
log = log.WithValues("MachineSet", klog.KObj(ms))
321+
ctx = ctrl.LoggerInto(ctx, log)
322+
323+
originalReplicas := ptr.Deref(ms.Spec.Replicas, 0)
324+
if scaleIntent, ok := p.scaleIntents[ms.Name]; ok {
325+
ms.Spec.Replicas = &scaleIntent
326+
}
327+
328+
if ms.GetUID() == "" {
329+
// Create the MachineSet.
330+
if err := ssa.Patch(ctx, r.Client, machineDeploymentManagerName, ms); err != nil {
331+
r.recorder.Eventf(p.md, corev1.EventTypeWarning, "FailedCreate", "Failed to create MachineSet %s: %v", klog.KObj(ms), err)
332+
return errors.Wrapf(err, "failed to create new MachineSet %s", klog.KObj(ms))
333+
}
334+
log.Info(fmt.Sprintf("MachineSet created (%s)", p.createReason))
335+
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulCreate", "Created MachineSet %s with %d replicas", klog.KObj(ms), ptr.Deref(ms.Spec.Replicas, 0))
336+
337+
// Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
338+
// the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
339+
// a duplicate MachineSet.
340+
var pollErrors []error
341+
tmpMS := &clusterv1.MachineSet{}
342+
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
343+
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ms), tmpMS); err != nil {
344+
// Do not return error here. Continue to poll even if we hit an error
345+
// so that we avoid existing because of transient errors like network flakes.
346+
// Capture all the errors and return the aggregate error if the poll fails eventually.
347+
pollErrors = append(pollErrors, err)
348+
return false, nil
349+
}
350+
return true, nil
351+
}); err != nil {
352+
return errors.Wrapf(kerrors.NewAggregate(pollErrors), "failed to get the MachineSet %s after creation", klog.KObj(ms))
353+
}
354+
355+
// Report back creation timestamp, because legacy scale func leverage on this info to sort machines.
356+
// TODO(in-place): drop this as soon as handling of MD with paused rollouts is moved into rollout planner (see scale in machinedeployment_sync.go).
357+
ms.CreationTimestamp = tmpMS.CreationTimestamp
358+
continue
359+
}
360+
361+
// Update the MachineSet to propagate in-place mutable fields from the MachineDeployment and/or changes applied by the rollout planner.
362+
originalMS, ok := p.originalMS[ms.Name]
363+
if !ok {
364+
return errors.Errorf("failed to update MachineSet %s, original MS is missing", klog.KObj(ms))
365+
}
366+
367+
err := ssa.Patch(ctx, r.Client, machineDeploymentManagerName, ms, ssa.WithCachingProxy{Cache: r.ssaCache, Original: originalMS})
368+
if err != nil {
369+
r.recorder.Eventf(p.md, corev1.EventTypeWarning, "FailedUpdate", "Failed to update MachineSet %s: %v", klog.KObj(ms), err)
370+
return errors.Wrapf(err, "failed to update MachineSet %s", klog.KObj(ms))
371+
}
372+
373+
newReplicas := ptr.Deref(ms.Spec.Replicas, 0)
374+
if newReplicas < originalReplicas {
375+
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas))
376+
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled Down MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
377+
}
378+
if newReplicas > originalReplicas {
379+
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas))
380+
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled Up MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
381+
}
382+
}
383+
return nil
384+
}
385+
306386
func (r *Reconciler) reconcileDelete(ctx context.Context, s *scope) error {
307387
log := ctrl.LoggerFrom(ctx)
308388
if err := r.getAndAdoptMachineSetsForDeployment(ctx, s); err != nil {

internal/controllers/machinedeployment/machinedeployment_controller_test.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,9 @@ func TestMachineDeploymentReconciler(t *testing.T) {
348348
g.Expect(env.List(ctx, machineSets, msListOpts...)).To(Succeed())
349349
// Verify we still only have 2 MachineSets.
350350
g.Expect(machineSets.Items).To(HaveLen(2))
351-
// Verify that the new MachineSet gets the updated labels.
351+
// Verify that the new and old MachineSet gets the updated labels.
352352
g.Expect(machineSets.Items[0].Spec.Template.Labels).To(HaveKeyWithValue("updated", "true"))
353-
// Verify that the old MachineSet does not get the updated labels.
354-
g.Expect(machineSets.Items[1].Spec.Template.Labels).ShouldNot(HaveKeyWithValue("updated", "true"))
353+
g.Expect(machineSets.Items[1].Spec.Template.Labels).To(HaveKeyWithValue("updated", "true"))
355354
}, timeout).Should(Succeed())
356355

357356
// Update the NodeDrainTimout, NodeDeletionTimeoutSeconds, NodeVolumeDetachTimeoutSeconds of the MachineDeployment,
@@ -384,10 +383,19 @@ func TestMachineDeploymentReconciler(t *testing.T) {
384383
HaveValue(Equal(duration10s)),
385384
), "NodeVolumeDetachTimeoutSeconds value does not match expected")
386385

387-
// Verify that the old machine set keeps the old values.
388-
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDrainTimeoutSeconds).Should(BeNil())
389-
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDeletionTimeoutSeconds).Should(BeNil())
390-
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeVolumeDetachTimeoutSeconds).Should(BeNil())
386+
// Verify that the old machine set have the new values.
387+
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDrainTimeoutSeconds).Should(And(
388+
Not(BeNil()),
389+
HaveValue(Equal(duration10s)),
390+
), "NodeDrainTimout value does not match expected")
391+
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDeletionTimeoutSeconds).Should(And(
392+
Not(BeNil()),
393+
HaveValue(Equal(duration10s)),
394+
), "NodeDeletionTimeoutSeconds value does not match expected")
395+
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeVolumeDetachTimeoutSeconds).Should(And(
396+
Not(BeNil()),
397+
HaveValue(Equal(duration10s)),
398+
), "NodeVolumeDetachTimeoutSeconds value does not match expected")
391399
}).Should(Succeed())
392400

393401
// Update the deletion.order of the MachineDeployment,
@@ -404,8 +412,8 @@ func TestMachineDeploymentReconciler(t *testing.T) {
404412
// Verify the deletion.order value is updated
405413
g.Expect(machineSets.Items[0].Spec.Deletion.Order).Should(Equal(clusterv1.NewestMachineSetDeletionOrder))
406414

407-
// Verify that the old machine set retains its delete policy
408-
g.Expect(machineSets.Items[1].Spec.Deletion.Order).To(Equal(clusterv1.OldestMachineSetDeletionOrder))
415+
// Verify that the old machine set have the new values.
416+
g.Expect(machineSets.Items[1].Spec.Deletion.Order).Should(Equal(clusterv1.NewestMachineSetDeletionOrder))
409417
}).Should(Succeed())
410418

411419
// Verify that all the MachineSets have the expected OwnerRef.

internal/controllers/machinedeployment/machinedeployment_rollout_ondelete.go

Lines changed: 16 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -27,46 +27,26 @@ import (
2727

2828
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
2929
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
30-
"sigs.k8s.io/cluster-api/util/patch"
3130
)
3231

3332
// rolloutOnDelete reconcile machine sets controlled by a MachineDeployment that is using the OnDelete strategy.
3433
func (r *Reconciler) rolloutOnDelete(ctx context.Context, md *clusterv1.MachineDeployment, msList []*clusterv1.MachineSet, templateExists bool) error {
35-
// TODO(in-place): move create newMS into rolloutPlanner
36-
newMS, oldMSs, err := r.getAllMachineSetsAndSyncRevision(ctx, md, msList, true, templateExists)
37-
if err != nil {
34+
planner := newRolloutPlanner()
35+
if err := planner.init(ctx, md, msList, nil, true, templateExists); err != nil {
3836
return err
3937
}
4038

41-
planner := newRolloutPlanner()
42-
planner.md = md
43-
planner.newMS = newMS
44-
planner.oldMSs = oldMSs
45-
4639
if err := planner.planOnDelete(ctx); err != nil {
4740
return err
4841
}
4942

50-
allMSs := append(oldMSs, newMS)
51-
52-
// TODO(in-place): also apply/remove labels to MS should go into rolloutPlanner
53-
if err := r.cleanupDisableMachineCreateAnnotation(ctx, newMS); err != nil {
54-
return err
55-
}
56-
if err := r.addDisableMachineCreateAnnotation(ctx, oldMSs); err != nil {
43+
if err := r.createOrUpdateMachineSets(ctx, planner); err != nil {
5744
return err
5845
}
5946

60-
// TODO(in-place): this should be changed as soon as rolloutPlanner support MS creation and adding/removing labels from MS
61-
for _, ms := range allMSs {
62-
scaleIntent := ptr.Deref(ms.Spec.Replicas, 0)
63-
if v, ok := planner.scaleIntents[ms.Name]; ok {
64-
scaleIntent = v
65-
}
66-
if err := r.scaleMachineSet(ctx, ms, scaleIntent, md); err != nil {
67-
return err
68-
}
69-
}
47+
newMS := planner.newMS
48+
oldMSs := planner.oldMSs
49+
allMSs := append(oldMSs, newMS)
7050

7151
if err := r.syncDeploymentStatus(allMSs, newMS, md); err != nil {
7252
return err
@@ -102,6 +82,16 @@ func (p *rolloutPlanner) reconcileOldMachineSetsOnDelete(ctx context.Context) {
10282
return
10383
}
10484

85+
// Make sure old MS cannot create replicas to compensate replicas deleted manually.
86+
// NOTE: cleanup of this annotation will happen automatically as soon as the rollout planner stops to set it/stops to run the following loop,
87+
// because this annotation is not part of the output of computeDesiredMS.
88+
for _, oldMS := range p.oldMSs {
89+
if oldMS.Annotations == nil {
90+
oldMS.Annotations = map[string]string{}
91+
}
92+
oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation] = "true"
93+
}
94+
10595
// Determine if there are more Machines than MD.spec.replicas, e.g. due to a scale down in MD.
10696
newMSReplicas := ptr.Deref(p.newMS.Spec.Replicas, 0)
10797
if v, ok := p.scaleIntents[p.newMS.Name]; ok {
@@ -164,26 +154,3 @@ func (p *rolloutPlanner) reconcileOldMachineSetsOnDelete(ctx context.Context) {
164154
}
165155
}
166156
}
167-
168-
// addDisableMachineCreateAnnotation will add the disable machine create annotation to old MachineSets.
169-
func (r *Reconciler) addDisableMachineCreateAnnotation(ctx context.Context, oldMSs []*clusterv1.MachineSet) error {
170-
for _, oldMS := range oldMSs {
171-
log := ctrl.LoggerFrom(ctx, "MachineSet", klog.KObj(oldMS))
172-
if _, ok := oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation]; !ok {
173-
log.V(4).Info("adding annotation on old MachineSet to disable machine creation")
174-
patchHelper, err := patch.NewHelper(oldMS, r.Client)
175-
if err != nil {
176-
return err
177-
}
178-
if oldMS.Annotations == nil {
179-
oldMS.Annotations = map[string]string{}
180-
}
181-
oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation] = "true"
182-
err = patchHelper.Patch(ctx, oldMS)
183-
if err != nil {
184-
return err
185-
}
186-
}
187-
}
188-
return nil
189-
}

internal/controllers/machinedeployment/machinedeployment_rollout_ondelete_test.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,10 @@ func TestReconcileOldMachineSetsOnDelete(t *testing.T) {
329329

330330
planner.reconcileOldMachineSetsOnDelete(ctx)
331331
g.Expect(planner.scaleIntents).To(Equal(tt.expectScaleIntent), "unexpected scaleIntents")
332+
g.Expect(planner.newMS.Annotations).ToNot(HaveKey(clusterv1.DisableMachineCreateAnnotation))
333+
for _, oldMS := range planner.oldMSs {
334+
g.Expect(oldMS.Annotations).To(HaveKeyWithValue(clusterv1.DisableMachineCreateAnnotation, "true"))
335+
}
332336
})
333337
}
334338
}
@@ -581,22 +585,25 @@ func runOnDeleteTestCase(ctx context.Context, t *testing.T, tt onDeleteSequenceT
581585

582586
// Running a small subset of MD reconcile (the rollout logic and a bit of setReplicas)
583587
p := newRolloutPlanner()
584-
p.md = current.machineDeployment
585-
p.newMS = current.newMS()
586-
p.oldMSs = current.oldMSs()
588+
p.computeDesiredMS = func(_ context.Context, deployment *clusterv1.MachineDeployment, currentNewMS *clusterv1.MachineSet) (*clusterv1.MachineSet, error) {
589+
desiredNewMS := currentNewMS
590+
if currentNewMS == nil {
591+
// uses a predictable MS name when creating newMS, also add the newMS to current.machineSets
592+
totMS := len(current.machineSets)
593+
desiredNewMS = createMS(fmt.Sprintf("ms%d", totMS+1), deployment.Spec.Template.Spec.FailureDomain, 0)
594+
current.machineSets = append(current.machineSets, desiredNewMS)
595+
}
596+
return desiredNewMS, nil
597+
}
587598

588-
err := p.planOnDelete(ctx)
599+
// init the rollout planner and plan next step for a rollout.
600+
err := p.init(ctx, current.machineDeployment, current.machineSets, current.machines(), true, true)
589601
g.Expect(err).ToNot(HaveOccurred())
590602

591-
// Apply changes.
592-
delete(p.newMS.Annotations, clusterv1.DisableMachineCreateAnnotation)
593-
for _, oldMS := range current.oldMSs() {
594-
if oldMS.Annotations == nil {
595-
oldMS.Annotations = map[string]string{}
596-
}
597-
oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation] = "true"
598-
}
603+
err = p.planOnDelete(ctx)
604+
g.Expect(err).ToNot(HaveOccurred())
599605

606+
// Apply changes.
600607
for _, ms := range current.machineSets {
601608
if scaleIntent, ok := p.scaleIntents[ms.Name]; ok {
602609
ms.Spec.Replicas = ptr.To(scaleIntent)

0 commit comments

Comments
 (0)