diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index 43c692226c4..5b6ca9e0ff8 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -206,6 +206,8 @@ const ( RayServiceReady RayServiceConditionType = "Ready" // UpgradeInProgress means the RayService is currently performing a zero-downtime upgrade. UpgradeInProgress RayServiceConditionType = "UpgradeInProgress" + // RollbackInProgress means the RayService is currently rolling back an in-progress upgrade to the original cluster state. + RollbackInProgress RayServiceConditionType = "RollbackInProgress" ) const ( @@ -217,6 +219,7 @@ const ( NoPendingCluster RayServiceConditionReason = "NoPendingCluster" NoActiveCluster RayServiceConditionReason = "NoActiveCluster" RayServiceValidationFailed RayServiceConditionReason = "ValidationFailed" + TargetClusterChanged RayServiceConditionReason = "TargetClusterChanged" ) // +kubebuilder:object:root=true diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 5415c334689..8584d41c921 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -165,6 +165,17 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, client.IgnoreNotFound(err) } + // Determine the rollback state immediately before making serving decisions. + if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) { + // If an upgrade is in progress, check if rollback is necessary. + isUpgradeInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress)) + if isUpgradeInProgress && activeRayClusterInstance != nil && pendingRayClusterInstance != nil { + if err := r.reconcileRollbackState(ctx, rayServiceInstance, activeRayClusterInstance, pendingRayClusterInstance); err != nil { + return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err + } + } + } + // Check both active and pending Ray clusters to see if the head Pod is ready to serve requests. // This is important to ensure the reliability of the serve service because the head Pod cannot // rely on readiness probes to determine serve readiness. @@ -412,7 +423,7 @@ func (r *RayServiceReconciler) calculateStatus( logger.Info("Updated LastTrafficMigratedTime of Active Service.") } } - if pendingWeight >= 0 { + if pendingWeight >= 0 && pendingCluster != nil { rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent = ptr.To(pendingWeight) logger.Info("Updated pending TrafficRoutedPercent from HTTPRoute", "pendingClusterWeight", pendingWeight) if pendingWeight != oldPendingPercent { @@ -425,6 +436,30 @@ func (r *RayServiceReconciler) calculateStatus( isPendingClusterServing = reconcilePromotionAndServingStatus(ctx, headSvc, serveSvc, rayServiceInstance, pendingCluster) } + if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) { + activeStatus := &rayServiceInstance.Status.ActiveServiceStatus + pendingStatus := &rayServiceInstance.Status.PendingServiceStatus + + // A rollback is complete when the active cluster is back at 100% TargetCapacity and TrafficRoutedPercent, + // and the pending cluster is at 0% TargetCapacity and TrafficRoutedPercent. + if ptr.Deref(activeStatus.TargetCapacity, -1) == 100 && + ptr.Deref(activeStatus.TrafficRoutedPercent, -1) == 100 && + ptr.Deref(pendingStatus.TargetCapacity, -1) == 0 && + ptr.Deref(pendingStatus.TrafficRoutedPercent, -1) == 0 { + + logger.Info("Rollback to original cluster is complete. Cleaning up pending cluster from prior upgrade.") + + // Clear the RayService pending service status to clean up the pending cluster. + rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{} + pendingCluster = nil + + meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) + + // Ensure the upgrade state machine resets after a successful rollback. + setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.NoPendingCluster, "Rollback complete, active Ray cluster exists and no pending Ray cluster") + } + } + if shouldPrepareNewCluster(ctx, rayServiceInstance, activeCluster, pendingCluster, isPendingClusterServing) { rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{ RayClusterName: utils.GenerateRayClusterName(rayServiceInstance.Name), @@ -700,31 +735,51 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context activeClusterWeight = ptr.Deref(activeServiceStatus.TrafficRoutedPercent, 100) pendingClusterWeight = ptr.Deref(pendingServiceStatus.TrafficRoutedPercent, 0) - if isPendingClusterReady { - // Zero-downtime upgrade in progress. - options := utils.GetRayServiceClusterUpgradeOptions(&rayServiceInstance.Spec) - if options == nil { - return 0, 0, errstd.New("ClusterUpgradeOptions are not set during upgrade.") - } + // Zero-downtime upgrade in progress. + options := utils.GetRayServiceClusterUpgradeOptions(&rayServiceInstance.Spec) + if options == nil { + return 0, 0, errstd.New("ClusterUpgradeOptions are not set during upgrade.") + } - // Check that target_capacity has been updated before migrating traffic. - pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0) + // Check that target_capacity has been updated before migrating traffic. + pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0) + activeClusterTargetCapacity := ptr.Deref(activeServiceStatus.TargetCapacity, 100) + isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) - if pendingClusterWeight == pendingClusterTargetCapacity { - // Stop traffic migration because the pending cluster's current traffic weight has reached its target capacity limit. - return activeClusterWeight, pendingClusterWeight, nil - } + if (pendingClusterWeight == pendingClusterTargetCapacity && !isRollbackInProgress) || (isRollbackInProgress && activeClusterWeight == activeClusterTargetCapacity) { + // Stop traffic migration because the cluster being migrated to has reached its target capacity limit. + return activeClusterWeight, pendingClusterWeight, nil + } + + // If IntervalSeconds has passed since LastTrafficMigratedTime, migrate StepSizePercent traffic + // from the active RayCluster to the pending RayCluster. + interval := time.Duration(*options.IntervalSeconds) * time.Second - // If IntervalSeconds has passed since LastTrafficMigratedTime, migrate StepSizePercent traffic - // from the active RayCluster to the pending RayCluster. - interval := time.Duration(*options.IntervalSeconds) * time.Second - lastTrafficMigratedTime := pendingServiceStatus.LastTrafficMigratedTime - if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval { - // Gradually shift traffic from the active to the pending cluster. + // Determine which timestamp to use based on the direction of traffic flow. + // We use the LastTrafficMigratedTime of the cluster to which traffic is increasing. + var lastTrafficMigratedTime *metav1.Time + if isRollbackInProgress { + lastTrafficMigratedTime = activeServiceStatus.LastTrafficMigratedTime + } else { + lastTrafficMigratedTime = pendingServiceStatus.LastTrafficMigratedTime + } + + if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval { + if isRollbackInProgress { + // Gradually shift traffic from the pending to the active cluster. + // Rollback traffic migration occurs regardless of pending cluster readiness. + logger.Info("Rollback in progress. Shifting traffic back to active cluster.", "stepSize", *options.StepSizePercent) + proposedActiveWeight := activeClusterWeight + *options.StepSizePercent + activeClusterWeight = min(100, proposedActiveWeight, activeClusterTargetCapacity) + pendingClusterWeight = 100 - activeClusterWeight + } else if isPendingClusterReady { + // Gradually shift traffic from the active to the pending cluster if it's ready. logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent) proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity) activeClusterWeight = 100 - pendingClusterWeight + } else { + logger.Info("Upgrade in progress, but pending cluster is not ready. Pausing traffic migration.") } } @@ -871,7 +926,7 @@ func (r *RayServiceReconciler) reconcileHTTPRoute(ctx context.Context, rayServic } // If HTTPRoute already exists, check if update is needed - if !reflect.DeepEqual(existingHTTPRoute.Spec, desiredHTTPRoute.Spec) { + if !utils.IsHTTPRouteEqual(existingHTTPRoute, desiredHTTPRoute) { logger.Info("Updating existing HTTPRoute", "name", desiredHTTPRoute.Name) existingHTTPRoute.Spec = desiredHTTPRoute.Spec if err := r.Update(ctx, existingHTTPRoute); err != nil { @@ -1094,6 +1149,12 @@ func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayS if isPendingClusterServing { return false } + + // Do not prepare a new cluster while a rollback is actively in progress. + if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) { + return false + } + if activeRayCluster == nil && pendingRayCluster == nil { // Both active and pending clusters are nil, which means the RayService has just been created. // Create a new pending cluster. @@ -1423,6 +1484,31 @@ func (r *RayServiceReconciler) reconcileServeTargetCapacity(ctx context.Context, } maxSurgePercent := ptr.Deref(options.MaxSurgePercent, 100) + if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) { + // Rollback the upgrade. The active RayCluster should be scaled back to 100% target_capacity, + // while the pending RayCluster is scaled to 0%. This is the inverse of the regular upgrade path. + activeTrafficRoutedPercent := ptr.Deref(activeRayServiceStatus.TrafficRoutedPercent, 0) + if activeTargetCapacity != activeTrafficRoutedPercent { + logger.Info("Traffic is rolling back to active cluster, deferring capacity update.", "ActiveTargetCapacity", activeTargetCapacity, "ActiveTrafficRoutedPercent", activeTrafficRoutedPercent) + return nil + } + + if activeTargetCapacity+pendingTargetCapacity > 100 { + if rayClusterInstance.Name == pendingRayServiceStatus.RayClusterName { + goalTargetCapacity := max(int32(0), pendingTargetCapacity-maxSurgePercent) + logger.Info("Rollback: Scaling down pending cluster `target_capacity`.", "goal", goalTargetCapacity) + return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity) + } + } else { + if rayClusterInstance.Name == activeRayServiceStatus.RayClusterName { + goalTargetCapacity := min(int32(100), activeTargetCapacity+maxSurgePercent) + logger.Info("Rollback: Scaling up active cluster `target_capacity`.", "goal", goalTargetCapacity) + return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity) + } + } + return nil + } + // Defer updating the target_capacity until traffic weights are updated if pendingTargetCapacity != pendingTrafficRoutedPercent { logger.Info("Traffic is currently being migrated to pending cluster", "RayCluster", pendingRayServiceStatus.RayClusterName, "TargetCapacity", pendingTargetCapacity, "TrafficRoutedPercent", pendingTrafficRoutedPercent) @@ -1921,3 +2007,39 @@ func (r *RayServiceReconciler) reconcilePerClusterServeService(ctx context.Conte return err } + +// reconcileRollbackState determines whether to initiate a rollback by setting the RollbackInProgress condition. +func (r *RayServiceReconciler) reconcileRollbackState(ctx context.Context, rayServiceInstance *rayv1.RayService, activeCluster, pendingCluster *rayv1.RayCluster) error { + logger := ctrl.LoggerFrom(ctx) + + targetHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec) + if err != nil { + return fmt.Errorf("failed to generate hash for goal cluster spec: %w", err) + } + + originalHash := activeCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] + pendingHash := pendingCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] + + isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) + + // Case 1: The goal spec matches the pending cluster's spec. + // The upgrade is on track. We should revert any accidental rollback attempt and continue. + if targetHash == pendingHash { + if isRollbackInProgress { + logger.Info("Goal state matches pending cluster. Canceling rollback and resuming upgrade.") + meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) + } + return nil + } + + // Case 2: The goal spec diverges from the pending cluster. + // This happens if the user reverted to the original spec, or if they submitted a 3rd entirely new spec mid-upgrade. + // In all divergence cases, we must first safely route all traffic back to the original cluster before allowing + // a new cluster to be spun up. + if !isRollbackInProgress { + logger.Info("Goal state has changed during upgrade. Initiating safe rollback to the original cluster.", "targetHash", targetHash, "originalHash", originalHash, "pendingHash", pendingHash) + setCondition(rayServiceInstance, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.TargetClusterChanged, "Goal state changed mid-upgrade, rolling back to original cluster.") + } + + return nil +} diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 6a8a38f8206..53e79cce7c5 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -2647,3 +2647,91 @@ func Test_RayServiceReconcileManagedBy(t *testing.T) { }) } } + +func TestReconcileRollbackState(t *testing.T) { + ctx := context.TODO() + namespace := "test-ns" + + baseSpec := rayv1.RayClusterSpec{ + RayVersion: "2.54.0", + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + {GroupName: "worker-group", Replicas: ptr.To(int32(1))}, + }, + } + + updatedSpec := baseSpec.DeepCopy() + updatedSpec.RayVersion = "2.50.0" + + baseHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(baseSpec) + require.NoError(t, err) + + updatedHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(*updatedSpec) + require.NoError(t, err) + + activeCluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "active-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: baseHash}}, + } + pendingCluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "pending-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: updatedHash}}, + } + + tests := []struct { + name string + rayServiceSpec rayv1.RayClusterSpec + isRollbackInProgress bool + expectRollbackStatus bool + }{ + { + name: "Normal RayService upgrade, goal matches pending", + rayServiceSpec: *updatedSpec, + isRollbackInProgress: false, + expectRollbackStatus: false, + }, + { + name: "RayService Spec changed, initiate rollback", + rayServiceSpec: baseSpec, + isRollbackInProgress: false, + expectRollbackStatus: true, + }, + { + name: "Rollback in progress, continues rolling back", + rayServiceSpec: baseSpec, + isRollbackInProgress: true, + expectRollbackStatus: true, + }, + { + name: "Rollback canceled, user updated spec back to pending", + rayServiceSpec: *updatedSpec, + isRollbackInProgress: true, + expectRollbackStatus: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rayService := &rayv1.RayService{ + ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace}, + Spec: rayv1.RayServiceSpec{ + RayClusterSpec: tt.rayServiceSpec, + }, + Status: rayv1.RayServiceStatuses{ + Conditions: []metav1.Condition{}, + }, + } + + if tt.isRollbackInProgress { + setCondition(rayService, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.TargetClusterChanged, "rolling back") + } + + reconciler := RayServiceReconciler{ + Recorder: record.NewFakeRecorder(1), + } + + err := reconciler.reconcileRollbackState(ctx, rayService, activeCluster, pendingCluster) + require.NoError(t, err) + + isCurrentlyRollingBack := meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.RollbackInProgress)) + assert.Equal(t, tt.expectRollbackStatus, isCurrentlyRollingBack) + }) + } +} diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index fe578f0fb21..ffc2dd73ddb 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -1039,3 +1039,29 @@ func GetRayHttpProxyClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun func HasSubmitter(rayJobInstance *rayv1.RayJob) bool { return rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode } + +// IsHTTPRouteEqual checks if the existing HTTPRoute matches the desired HTTPRoute. +func IsHTTPRouteEqual(existing, desired *gwv1.HTTPRoute) bool { + if len(existing.Spec.Rules) != len(desired.Spec.Rules) { + return false + } + + for i := range desired.Spec.Rules { + if len(existing.Spec.Rules[i].BackendRefs) != len(desired.Spec.Rules[i].BackendRefs) { + return false + } + + for j := range desired.Spec.Rules[i].BackendRefs { + existingRef := existing.Spec.Rules[i].BackendRefs[j] + desiredRef := desired.Spec.Rules[i].BackendRefs[j] + + // Only compare the fields the controller updates. + if string(existingRef.Name) != string(desiredRef.Name) || + ptr.Deref(existingRef.Weight, 1) != ptr.Deref(desiredRef.Weight, 1) || + ptr.Deref(existingRef.Port, 0) != ptr.Deref(desiredRef.Port, 0) { + return false + } + } + } + return true +} diff --git a/ray-operator/controllers/ray/utils/util_test.go b/ray-operator/controllers/ray/utils/util_test.go index cd6fd7a1465..7343186547c 100644 --- a/ray-operator/controllers/ray/utils/util_test.go +++ b/ray-operator/controllers/ray/utils/util_test.go @@ -1785,3 +1785,144 @@ func TestGetWeightsFromHTTPRoute(t *testing.T) { }) } } + +func TestIsHTTPRouteEqual(t *testing.T) { + tests := []struct { + existing *gwv1.HTTPRoute + desired *gwv1.HTTPRoute + name string + expected bool + }{ + { + name: "Exactly equal HTTPRoutes", + existing: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a", Port: ptr.To(gwv1.PortNumber(8000))}, Weight: ptr.To(int32(100))}}, + }, + }, + }, + }, + }, + desired: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a", Port: ptr.To(gwv1.PortNumber(8000))}, Weight: ptr.To(int32(100))}}, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "Different number of rules", + existing: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + {BackendRefs: []gwv1.HTTPBackendRef{{BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a"}}}}}, + }, + }, + }, + desired: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + {BackendRefs: []gwv1.HTTPBackendRef{{BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a"}}}}}, + {BackendRefs: []gwv1.HTTPBackendRef{{BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-b"}}}}}, + }, + }, + }, + expected: false, + }, + { + name: "Different number of backends", + existing: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a"}}}, + }, + }, + }, + }, + }, + desired: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a"}}}, + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-b"}}}, + }, + }, + }, + }, + }, + expected: false, + }, + { + name: "Different backend weights", + existing: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a"}, Weight: ptr.To(int32(100))}}, + }, + }, + }, + }, + }, + desired: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-a"}, Weight: ptr.To(int32(75))}}, + }, + }, + }, + }, + }, + expected: false, + }, + { + name: "Different backend names", + existing: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-old"}}}, + }, + }, + }, + }, + }, + desired: &gwv1.HTTPRoute{ + Spec: gwv1.HTTPRouteSpec{ + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + {BackendRef: gwv1.BackendRef{BackendObjectReference: gwv1.BackendObjectReference{Name: "svc-new"}}}, + }, + }, + }, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IsHTTPRouteEqual(tt.existing, tt.desired) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/ray-operator/test/e2eincrementalupgrade/rayservice_incremental_upgrade_test.go b/ray-operator/test/e2eincrementalupgrade/rayservice_incremental_upgrade_test.go index 7232d6012b5..95120b5f5e6 100644 --- a/ray-operator/test/e2eincrementalupgrade/rayservice_incremental_upgrade_test.go +++ b/ray-operator/test/e2eincrementalupgrade/rayservice_incremental_upgrade_test.go @@ -7,9 +7,11 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" + gwv1 "sigs.k8s.io/gateway-api/apis/v1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" @@ -216,3 +218,126 @@ func TestRayServiceIncrementalUpgrade(t *testing.T) { stdout, _ = CurlRayServiceGateway(test, gatewayIP, curlPod, curlContainerName, "/fruit", `["MANGO", 2]`) g.Expect(stdout.String()).To(Equal("8")) } + +func TestRayServiceIncrementalUpgradeRollback(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.RayServiceIncrementalUpgrade, true) + + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + rayServiceName := "rollback-rayservice" + + // Create a RayService with IncrementalUpgrade enabled + stepSize := ptr.To(int32(25)) + interval := ptr.To(int32(10)) + maxSurge := ptr.To(int32(50)) + + rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name). + WithSpec(IncrementalUpgradeRayServiceApplyConfiguration(stepSize, interval, maxSurge)) + rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayService).NotTo(BeNil()) + + LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be ready", rayService.Namespace, rayService.Name) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + + // Copy original spec to use to trigger a rollback later. + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + originalSpec := rayService.Spec.DeepCopy() + + // Verify Gateway and HTTPRoute are ready. + gatewayName := fmt.Sprintf("%s-%s", rayServiceName, "gateway") + g.Eventually(Gateway(test, rayService.Namespace, gatewayName), TestTimeoutMedium). + Should(WithTransform(utils.IsGatewayReady, BeTrue())) + + gateway, err := GetGateway(test, namespace.Name, fmt.Sprintf("%s-%s", rayServiceName, "gateway")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(gateway).NotTo(BeNil()) + + httpRouteName := fmt.Sprintf("%s-%s", rayServiceName, "httproute") + LogWithTimestamp(test.T(), "Waiting for HTTPRoute %s/%s to be ready", rayService.Namespace, httpRouteName) + g.Eventually(HTTPRoute(test, rayService.Namespace, httpRouteName), TestTimeoutMedium). + Should(Not(BeNil())) + + httpRoute, err := GetHTTPRoute(test, namespace.Name, httpRouteName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(utils.IsHTTPRouteReady(gateway, httpRoute)).To(BeTrue()) + + // Trigger an incremental upgrade through a change to the RayCluster spec. + LogWithTimestamp(test.T(), "Triggering an upgrade for RayService %s/%s", rayService.Namespace, rayService.Name) + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceCPU] = resource.MustParse("500m") + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for RayService %s/%s UpgradeInProgress condition to be true", rayService.Namespace, rayService.Name) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutShort).Should(WithTransform(IsRayServiceUpgrading, BeTrue())) + + // Wait for the upgrade to be underway with traffic partially migrated. + LogWithTimestamp(test.T(), "Waiting for upgrade to be partially complete") + var pendingClusterName string + g.Eventually(func(g Gomega) { + svc, err := GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(svc.Status.PendingServiceStatus.TrafficRoutedPercent).NotTo(BeNil()) + g.Expect(*svc.Status.PendingServiceStatus.TrafficRoutedPercent).Should(BeNumerically(">", 0)) + + // Capture the pending cluster name before the rollback starts + pendingClusterName = svc.Status.PendingServiceStatus.RayClusterName + g.Expect(pendingClusterName).NotTo(BeEmpty()) + }, TestTimeoutMedium).Should(Succeed()) + + // Trigger a rollback by updating the spec back to the original version. + LogWithTimestamp(test.T(), "Triggering a rollback for RayService %s/%s", rayService.Namespace, rayService.Name) + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec = *originalSpec + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + // Verify that the controller enters the rollback state. + LogWithTimestamp(test.T(), "Waiting for RayService %s/%s RollbackInProgress condition to be true", rayService.Namespace, rayService.Name) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutShort).Should(WithTransform(IsRayServiceRollingBack, BeTrue())) + + // Verify that traffic gradually shifts back to the active cluster. + LogWithTimestamp(test.T(), "Verifying traffic shifts back to the active cluster") + g.Eventually(func(g Gomega) { + svc, err := GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(svc.Status.ActiveServiceStatus.TrafficRoutedPercent).NotTo(BeNil()) + g.Expect(*svc.Status.ActiveServiceStatus.TrafficRoutedPercent).Should(Equal(int32(100))) + g.Expect(svc.Status.PendingServiceStatus.TrafficRoutedPercent).NotTo(BeNil()) + g.Expect(*svc.Status.PendingServiceStatus.TrafficRoutedPercent).Should(Equal(int32(0))) + }, TestTimeoutMedium).Should(Succeed()) + + // Verify that the rollback completes and the pending cluster is cleaned up. + LogWithTimestamp(test.T(), "Waiting for rollback to complete and pending cluster to be deleted") + g.Eventually(func(g Gomega) { + svc, err := GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + // Rollback is done when both conditions are false and pending status is empty. + g.Expect(IsRayServiceRollingBack(svc)).To(BeFalse()) + g.Expect(IsRayServiceUpgrading(svc)).To(BeFalse()) + g.Expect(svc.Status.PendingServiceStatus.RayClusterName).To(BeEmpty()) + }, TestTimeoutMedium).Should(Succeed()) + + // Check that the pending RayCluster resource is deleted. + LogWithTimestamp(test.T(), "Verifying the pending RayCluster resource was deleted") + g.Eventually(func() error { + _, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), pendingClusterName, metav1.GetOptions{}) + return err + }, TestTimeoutMedium).Should(WithTransform(errors.IsNotFound, BeTrue())) + + // The HTTPRoute should now only have one backend after the rollback completes. + g.Eventually(HTTPRoute(test, namespace.Name, httpRouteName), TestTimeoutShort). + Should(WithTransform(func(route *gwv1.HTTPRoute) int { + if route == nil || len(route.Spec.Rules) == 0 { + return 0 + } + return len(route.Spec.Rules[0].BackendRefs) + }, Equal(1))) +} diff --git a/ray-operator/test/support/ray.go b/ray-operator/test/support/ray.go index 97ed6939f5e..0f6ff06dd53 100644 --- a/ray-operator/test/support/ray.go +++ b/ray-operator/test/support/ray.go @@ -232,6 +232,10 @@ func IsRayServiceUpgrading(service *rayv1.RayService) bool { return meta.IsStatusConditionTrue(service.Status.Conditions, string(rayv1.UpgradeInProgress)) } +func IsRayServiceRollingBack(service *rayv1.RayService) bool { + return meta.IsStatusConditionTrue(service.Status.Conditions, string(rayv1.RollbackInProgress)) +} + func RayServicesNumEndPoints(service *rayv1.RayService) int32 { return service.Status.NumServeEndpoints }