Skip to content

Commit 44fdb7e

Browse files
Revert "Revert: Remove rollback e2e"
This reverts commit baf2d6c, reversing changes made to 73e6637.
1 parent baf2d6c commit 44fdb7e

File tree

7 files changed

+392
-111
lines changed

7 files changed

+392
-111
lines changed

ray-operator/apis/ray/v1/rayservice_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ const (
206206
RayServiceReady RayServiceConditionType = "Ready"
207207
// UpgradeInProgress means the RayService is currently performing a zero-downtime upgrade.
208208
UpgradeInProgress RayServiceConditionType = "UpgradeInProgress"
209+
// RollbackInProgress means the RayService is currently rolling back an in-progress upgrade to the original cluster state.
210+
RollbackInProgress RayServiceConditionType = "RollbackInProgress"
209211
)
210212

211213
const (
@@ -217,6 +219,7 @@ const (
217219
NoPendingCluster RayServiceConditionReason = "NoPendingCluster"
218220
NoActiveCluster RayServiceConditionReason = "NoActiveCluster"
219221
RayServiceValidationFailed RayServiceConditionReason = "ValidationFailed"
222+
TargetClusterChanged RayServiceConditionReason = "TargetClusterChanged"
220223
)
221224

222225
// +kubebuilder:object:root=true

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 141 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
165165
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, client.IgnoreNotFound(err)
166166
}
167167

168+
// Determine the rollback state immediately before making serving decisions.
169+
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) {
170+
// If an upgrade is in progress, check if rollback is necessary.
171+
isUpgradeInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress))
172+
if isUpgradeInProgress && activeRayClusterInstance != nil && pendingRayClusterInstance != nil {
173+
if err := r.reconcileRollbackState(ctx, rayServiceInstance, activeRayClusterInstance, pendingRayClusterInstance); err != nil {
174+
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
175+
}
176+
}
177+
}
178+
168179
// Check both active and pending Ray clusters to see if the head Pod is ready to serve requests.
169180
// This is important to ensure the reliability of the serve service because the head Pod cannot
170181
// rely on readiness probes to determine serve readiness.
@@ -412,7 +423,7 @@ func (r *RayServiceReconciler) calculateStatus(
412423
logger.Info("Updated LastTrafficMigratedTime of Active Service.")
413424
}
414425
}
415-
if pendingWeight >= 0 {
426+
if pendingWeight >= 0 && pendingCluster != nil {
416427
rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent = ptr.To(pendingWeight)
417428
logger.Info("Updated pending TrafficRoutedPercent from HTTPRoute", "pendingClusterWeight", pendingWeight)
418429
if pendingWeight != oldPendingPercent {
@@ -425,6 +436,30 @@ func (r *RayServiceReconciler) calculateStatus(
425436
isPendingClusterServing = reconcilePromotionAndServingStatus(ctx, headSvc, serveSvc, rayServiceInstance, pendingCluster)
426437
}
427438

439+
if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
440+
activeStatus := &rayServiceInstance.Status.ActiveServiceStatus
441+
pendingStatus := &rayServiceInstance.Status.PendingServiceStatus
442+
443+
// A rollback is complete when the active cluster is back at 100% TargetCapacity and TrafficRoutedPercent,
444+
// and the pending cluster is at 0% TargetCapacity and TrafficRoutedPercent.
445+
if ptr.Deref(activeStatus.TargetCapacity, -1) == 100 &&
446+
ptr.Deref(activeStatus.TrafficRoutedPercent, -1) == 100 &&
447+
ptr.Deref(pendingStatus.TargetCapacity, -1) == 0 &&
448+
ptr.Deref(pendingStatus.TrafficRoutedPercent, -1) == 0 {
449+
450+
logger.Info("Rollback to original cluster is complete. Cleaning up pending cluster from prior upgrade.")
451+
452+
// Clear the RayService pending service status to clean up the pending cluster.
453+
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{}
454+
pendingCluster = nil
455+
456+
meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))
457+
458+
// Ensure the upgrade state machine resets after a successful rollback.
459+
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.NoPendingCluster, "Rollback complete, active Ray cluster exists and no pending Ray cluster")
460+
}
461+
}
462+
428463
if shouldPrepareNewCluster(ctx, rayServiceInstance, activeCluster, pendingCluster, isPendingClusterServing) {
429464
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{
430465
RayClusterName: utils.GenerateRayClusterName(rayServiceInstance.Name),
@@ -700,31 +735,51 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context
700735
activeClusterWeight = ptr.Deref(activeServiceStatus.TrafficRoutedPercent, 100)
701736
pendingClusterWeight = ptr.Deref(pendingServiceStatus.TrafficRoutedPercent, 0)
702737

703-
if isPendingClusterReady {
704-
// Zero-downtime upgrade in progress.
705-
options := utils.GetRayServiceClusterUpgradeOptions(&rayServiceInstance.Spec)
706-
if options == nil {
707-
return 0, 0, errstd.New("ClusterUpgradeOptions are not set during upgrade.")
708-
}
738+
// Zero-downtime upgrade in progress.
739+
options := utils.GetRayServiceClusterUpgradeOptions(&rayServiceInstance.Spec)
740+
if options == nil {
741+
return 0, 0, errstd.New("ClusterUpgradeOptions are not set during upgrade.")
742+
}
709743

710-
// Check that target_capacity has been updated before migrating traffic.
711-
pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0)
744+
// Check that target_capacity has been updated before migrating traffic.
745+
pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0)
746+
activeClusterTargetCapacity := ptr.Deref(activeServiceStatus.TargetCapacity, 100)
747+
isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))
712748

713-
if pendingClusterWeight == pendingClusterTargetCapacity {
714-
// Stop traffic migration because the pending cluster's current traffic weight has reached its target capacity limit.
715-
return activeClusterWeight, pendingClusterWeight, nil
716-
}
749+
if (pendingClusterWeight == pendingClusterTargetCapacity && !isRollbackInProgress) || (isRollbackInProgress && activeClusterWeight == activeClusterTargetCapacity) {
750+
// Stop traffic migration because the cluster being migrated to has reached its target capacity limit.
751+
return activeClusterWeight, pendingClusterWeight, nil
752+
}
753+
754+
// If IntervalSeconds has passed since LastTrafficMigratedTime, migrate StepSizePercent traffic
755+
// from the active RayCluster to the pending RayCluster.
756+
interval := time.Duration(*options.IntervalSeconds) * time.Second
717757

718-
// If IntervalSeconds has passed since LastTrafficMigratedTime, migrate StepSizePercent traffic
719-
// from the active RayCluster to the pending RayCluster.
720-
interval := time.Duration(*options.IntervalSeconds) * time.Second
721-
lastTrafficMigratedTime := pendingServiceStatus.LastTrafficMigratedTime
722-
if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval {
723-
// Gradually shift traffic from the active to the pending cluster.
758+
// Determine which timestamp to use based on the direction of traffic flow.
759+
// We use the LastTrafficMigratedTime of the cluster to which traffic is increasing.
760+
var lastTrafficMigratedTime *metav1.Time
761+
if isRollbackInProgress {
762+
lastTrafficMigratedTime = activeServiceStatus.LastTrafficMigratedTime
763+
} else {
764+
lastTrafficMigratedTime = pendingServiceStatus.LastTrafficMigratedTime
765+
}
766+
767+
if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval {
768+
if isRollbackInProgress {
769+
// Gradually shift traffic from the pending to the active cluster.
770+
// Rollback traffic migration occurs regardless of pending cluster readiness.
771+
logger.Info("Rollback in progress. Shifting traffic back to active cluster.", "stepSize", *options.StepSizePercent)
772+
proposedActiveWeight := activeClusterWeight + *options.StepSizePercent
773+
activeClusterWeight = min(100, proposedActiveWeight, activeClusterTargetCapacity)
774+
pendingClusterWeight = 100 - activeClusterWeight
775+
} else if isPendingClusterReady {
776+
// Gradually shift traffic from the active to the pending cluster if it's ready.
724777
logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent)
725778
proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent
726779
pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity)
727780
activeClusterWeight = 100 - pendingClusterWeight
781+
} else {
782+
logger.Info("Upgrade in progress, but pending cluster is not ready. Pausing traffic migration.")
728783
}
729784
}
730785

@@ -1094,6 +1149,12 @@ func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayS
10941149
if isPendingClusterServing {
10951150
return false
10961151
}
1152+
1153+
// Do not prepare a new cluster while a rollback is actively in progress.
1154+
if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
1155+
return false
1156+
}
1157+
10971158
if activeRayCluster == nil && pendingRayCluster == nil {
10981159
// Both active and pending clusters are nil, which means the RayService has just been created.
10991160
// Create a new pending cluster.
@@ -1423,6 +1484,31 @@ func (r *RayServiceReconciler) reconcileServeTargetCapacity(ctx context.Context,
14231484
}
14241485
maxSurgePercent := ptr.Deref(options.MaxSurgePercent, 100)
14251486

1487+
if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
1488+
// Rollback the upgrade. The active RayCluster should be scaled back to 100% target_capacity,
1489+
// while the pending RayCluster is scaled to 0%. This is the inverse of the regular upgrade path.
1490+
activeTrafficRoutedPercent := ptr.Deref(activeRayServiceStatus.TrafficRoutedPercent, 0)
1491+
if activeTargetCapacity != activeTrafficRoutedPercent {
1492+
logger.Info("Traffic is rolling back to active cluster, deferring capacity update.", "ActiveTargetCapacity", activeTargetCapacity, "ActiveTrafficRoutedPercent", activeTrafficRoutedPercent)
1493+
return nil
1494+
}
1495+
1496+
if activeTargetCapacity+pendingTargetCapacity > 100 {
1497+
if rayClusterInstance.Name == pendingRayServiceStatus.RayClusterName {
1498+
goalTargetCapacity := max(int32(0), pendingTargetCapacity-maxSurgePercent)
1499+
logger.Info("Rollback: Scaling down pending cluster `target_capacity`.", "goal", goalTargetCapacity)
1500+
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
1501+
}
1502+
} else {
1503+
if rayClusterInstance.Name == activeRayServiceStatus.RayClusterName {
1504+
goalTargetCapacity := min(int32(100), activeTargetCapacity+maxSurgePercent)
1505+
logger.Info("Rollback: Scaling up active cluster `target_capacity`.", "goal", goalTargetCapacity)
1506+
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
1507+
}
1508+
}
1509+
return nil
1510+
}
1511+
14261512
// Defer updating the target_capacity until traffic weights are updated
14271513
if pendingTargetCapacity != pendingTrafficRoutedPercent {
14281514
logger.Info("Traffic is currently being migrated to pending cluster", "RayCluster", pendingRayServiceStatus.RayClusterName, "TargetCapacity", pendingTargetCapacity, "TrafficRoutedPercent", pendingTrafficRoutedPercent)
@@ -1921,3 +2007,39 @@ func (r *RayServiceReconciler) reconcilePerClusterServeService(ctx context.Conte
19212007

19222008
return err
19232009
}
2010+
2011+
// reconcileRollbackState determines whether to initiate a rollback by setting the RollbackInProgress condition.
2012+
func (r *RayServiceReconciler) reconcileRollbackState(ctx context.Context, rayServiceInstance *rayv1.RayService, activeCluster, pendingCluster *rayv1.RayCluster) error {
2013+
logger := ctrl.LoggerFrom(ctx)
2014+
2015+
targetHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
2016+
if err != nil {
2017+
return fmt.Errorf("failed to generate hash for goal cluster spec: %w", err)
2018+
}
2019+
2020+
originalHash := activeCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
2021+
pendingHash := pendingCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
2022+
2023+
isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))
2024+
2025+
// Case 1: The goal spec matches the pending cluster's spec.
2026+
// The upgrade is on track. We should revert any accidental rollback attempt and continue.
2027+
if targetHash == pendingHash {
2028+
if isRollbackInProgress {
2029+
logger.Info("Goal state matches pending cluster. Canceling rollback and resuming upgrade.")
2030+
meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))
2031+
}
2032+
return nil
2033+
}
2034+
2035+
// Case 2: The goal spec diverges from the pending cluster.
2036+
// This happens if the user reverted to the original spec, or if they submitted a 3rd entirely new spec mid-upgrade.
2037+
// In all divergence cases, we must first safely route all traffic back to the original cluster before allowing
2038+
// a new cluster to be spun up.
2039+
if !isRollbackInProgress {
2040+
logger.Info("Goal state has changed during upgrade. Initiating safe rollback to the original cluster.", "targetHash", targetHash, "originalHash", originalHash, "pendingHash", pendingHash)
2041+
setCondition(rayServiceInstance, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.TargetClusterChanged, "Goal state changed mid-upgrade, rolling back to original cluster.")
2042+
}
2043+
2044+
return nil
2045+
}

ray-operator/controllers/ray/rayservice_controller_unit_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2672,3 +2672,91 @@ func Test_RayServiceReconcileManagedBy(t *testing.T) {
26722672
})
26732673
}
26742674
}
2675+
2676+
func TestReconcileRollbackState(t *testing.T) {
2677+
ctx := context.TODO()
2678+
namespace := "test-ns"
2679+
2680+
baseSpec := rayv1.RayClusterSpec{
2681+
RayVersion: "2.54.0",
2682+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
2683+
{GroupName: "worker-group", Replicas: ptr.To(int32(1))},
2684+
},
2685+
}
2686+
2687+
updatedSpec := baseSpec.DeepCopy()
2688+
updatedSpec.RayVersion = "2.50.0"
2689+
2690+
baseHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(baseSpec)
2691+
require.NoError(t, err)
2692+
2693+
updatedHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(*updatedSpec)
2694+
require.NoError(t, err)
2695+
2696+
activeCluster := &rayv1.RayCluster{
2697+
ObjectMeta: metav1.ObjectMeta{Name: "active-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: baseHash}},
2698+
}
2699+
pendingCluster := &rayv1.RayCluster{
2700+
ObjectMeta: metav1.ObjectMeta{Name: "pending-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: updatedHash}},
2701+
}
2702+
2703+
tests := []struct {
2704+
name string
2705+
rayServiceSpec rayv1.RayClusterSpec
2706+
isRollbackInProgress bool
2707+
expectRollbackStatus bool
2708+
}{
2709+
{
2710+
name: "Normal RayService upgrade, goal matches pending",
2711+
rayServiceSpec: *updatedSpec,
2712+
isRollbackInProgress: false,
2713+
expectRollbackStatus: false,
2714+
},
2715+
{
2716+
name: "RayService Spec changed, initiate rollback",
2717+
rayServiceSpec: baseSpec,
2718+
isRollbackInProgress: false,
2719+
expectRollbackStatus: true,
2720+
},
2721+
{
2722+
name: "Rollback in progress, continues rolling back",
2723+
rayServiceSpec: baseSpec,
2724+
isRollbackInProgress: true,
2725+
expectRollbackStatus: true,
2726+
},
2727+
{
2728+
name: "Rollback canceled, user updated spec back to pending",
2729+
rayServiceSpec: *updatedSpec,
2730+
isRollbackInProgress: true,
2731+
expectRollbackStatus: false,
2732+
},
2733+
}
2734+
2735+
for _, tt := range tests {
2736+
t.Run(tt.name, func(t *testing.T) {
2737+
rayService := &rayv1.RayService{
2738+
ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace},
2739+
Spec: rayv1.RayServiceSpec{
2740+
RayClusterSpec: tt.rayServiceSpec,
2741+
},
2742+
Status: rayv1.RayServiceStatuses{
2743+
Conditions: []metav1.Condition{},
2744+
},
2745+
}
2746+
2747+
if tt.isRollbackInProgress {
2748+
setCondition(rayService, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.TargetClusterChanged, "rolling back")
2749+
}
2750+
2751+
reconciler := RayServiceReconciler{
2752+
Recorder: record.NewFakeRecorder(1),
2753+
}
2754+
2755+
err := reconciler.reconcileRollbackState(ctx, rayService, activeCluster, pendingCluster)
2756+
require.NoError(t, err)
2757+
2758+
isCurrentlyRollingBack := meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.RollbackInProgress))
2759+
assert.Equal(t, tt.expectRollbackStatus, isCurrentlyRollingBack)
2760+
})
2761+
}
2762+
}

ray-operator/controllers/ray/utils/util.go

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -849,34 +849,6 @@ func IsIncrementalUpgradeComplete(rayServiceInstance *rayv1.RayService, pendingC
849849
ptr.Deref(rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent, -1) == 100
850850
}
851851

852-
// IsHTTPRouteEqual checks if the existing HTTPRoute matches the desired HTTPRoute.
853-
// It only compares fields that the controller manages (Name, Weight, Port) to avoid
854-
// false diffs caused by server-side defaulting of fields like Group and Kind.
855-
func IsHTTPRouteEqual(existing, desired *gwv1.HTTPRoute) bool {
856-
if len(existing.Spec.Rules) != len(desired.Spec.Rules) {
857-
return false
858-
}
859-
860-
for i := range desired.Spec.Rules {
861-
if len(existing.Spec.Rules[i].BackendRefs) != len(desired.Spec.Rules[i].BackendRefs) {
862-
return false
863-
}
864-
865-
for j := range desired.Spec.Rules[i].BackendRefs {
866-
existingRef := existing.Spec.Rules[i].BackendRefs[j]
867-
desiredRef := desired.Spec.Rules[i].BackendRefs[j]
868-
869-
// Only compare the fields the controller updates.
870-
if existingRef.Name != desiredRef.Name ||
871-
ptr.Deref(existingRef.Weight, 1) != ptr.Deref(desiredRef.Weight, 1) ||
872-
ptr.Deref(existingRef.Port, 0) != ptr.Deref(desiredRef.Port, 0) {
873-
return false
874-
}
875-
}
876-
}
877-
return true
878-
}
879-
880852
// GetWeightsFromHTTPRoute parses a given HTTPRoute object and extracts the traffic weights
881853
// for the active and pending clusters (if present) of a RayService.
882854
func GetWeightsFromHTTPRoute(httpRoute *gwv1.HTTPRoute, rayServiceInstance *rayv1.RayService) (activeWeight int32, pendingWeight int32) {
@@ -1076,3 +1048,29 @@ func GetRayHttpProxyClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
10761048
func HasSubmitter(rayJobInstance *rayv1.RayJob) bool {
10771049
return rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode
10781050
}
1051+
1052+
// IsHTTPRouteEqual checks if the existing HTTPRoute matches the desired HTTPRoute.
1053+
func IsHTTPRouteEqual(existing, desired *gwv1.HTTPRoute) bool {
1054+
if len(existing.Spec.Rules) != len(desired.Spec.Rules) {
1055+
return false
1056+
}
1057+
1058+
for i := range desired.Spec.Rules {
1059+
if len(existing.Spec.Rules[i].BackendRefs) != len(desired.Spec.Rules[i].BackendRefs) {
1060+
return false
1061+
}
1062+
1063+
for j := range desired.Spec.Rules[i].BackendRefs {
1064+
existingRef := existing.Spec.Rules[i].BackendRefs[j]
1065+
desiredRef := desired.Spec.Rules[i].BackendRefs[j]
1066+
1067+
// Only compare the fields the controller updates.
1068+
if string(existingRef.Name) != string(desiredRef.Name) ||
1069+
ptr.Deref(existingRef.Weight, 1) != ptr.Deref(desiredRef.Weight, 1) ||
1070+
ptr.Deref(existingRef.Port, 0) != ptr.Deref(desiredRef.Port, 0) {
1071+
return false
1072+
}
1073+
}
1074+
}
1075+
return true
1076+
}

0 commit comments

Comments
 (0)