Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ray-operator/apis/ray/v1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ const (
RayServiceReady RayServiceConditionType = "Ready"
// UpgradeInProgress means the RayService is currently performing a zero-downtime upgrade.
UpgradeInProgress RayServiceConditionType = "UpgradeInProgress"
// RollbackInProgress means the RayService is currently rolling back an in-progress upgrade to the original cluster state.
RollbackInProgress RayServiceConditionType = "RollbackInProgress"
)

const (
Expand All @@ -217,6 +219,7 @@ const (
NoPendingCluster RayServiceConditionReason = "NoPendingCluster"
NoActiveCluster RayServiceConditionReason = "NoActiveCluster"
RayServiceValidationFailed RayServiceConditionReason = "ValidationFailed"
GoalClusterChanged RayServiceConditionReason = "GoalClusterChanged"
)

// +kubebuilder:object:root=true
Expand Down
112 changes: 105 additions & 7 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
// Check if NewClusterWithIncrementalUpgrade is enabled, if so reconcile Gateway objects.
var httpRouteInstance *gwv1.HTTPRoute
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) {
// If an upgrade is in progress, check if rollback is necessary.
if activeRayClusterInstance != nil && pendingRayClusterInstance != nil {
if err := r.reconcileRollbackState(ctx, rayServiceInstance, activeRayClusterInstance, pendingRayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}
// Ensure per-cluster Serve service exists for the active and pending RayClusters.
if err = r.reconcilePerClusterServeService(ctx, rayServiceInstance, activeRayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
Expand Down Expand Up @@ -379,6 +385,30 @@ func (r *RayServiceReconciler) calculateStatus(

rayServiceInstance.Status.ObservedGeneration = rayServiceInstance.ObjectMeta.Generation

if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
activeStatus := &rayServiceInstance.Status.ActiveServiceStatus
pendingStatus := &rayServiceInstance.Status.PendingServiceStatus

// A rollback is complete when the active cluster is back at 100% TargetCapacity and TrafficRoutedPercent,
// and the pending cluster is at 0% TargetCapacity and TrafficRoutedPercent.
if ptr.Deref(activeStatus.TargetCapacity, -1) == 100 &&
ptr.Deref(activeStatus.TrafficRoutedPercent, -1) == 100 &&
ptr.Deref(pendingStatus.TargetCapacity, -1) == 0 &&
ptr.Deref(pendingStatus.TrafficRoutedPercent, -1) == 0 {

logger.Info("Rollback to original cluster is complete. Cleaning up pending cluster from prior upgrade.")

// Clear the RayService pending service status to clean up the pending cluster.
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{}
pendingCluster = nil

meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

// Ensure the upgrade state resets after a successful rollback.
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.NoPendingCluster, "Rollback complete, active Ray cluster exists and no pending Ray cluster")
}
}

// Update RayClusterStatus in RayService status.
var activeClusterStatus, pendingClusterStatus rayv1.RayClusterStatus
if activeCluster != nil {
Expand Down Expand Up @@ -709,9 +739,11 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context

// Check that target_capacity has been updated before migrating traffic.
pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0)
activeClusterTargetCapacity := ptr.Deref(activeServiceStatus.TargetCapacity, 100)
isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

if pendingClusterWeight == pendingClusterTargetCapacity {
// Stop traffic migration because the pending cluster's current traffic weight has reached its target capacity limit.
if (pendingClusterWeight == pendingClusterTargetCapacity && !isRollbackInProgress) || (isRollbackInProgress && activeClusterWeight == activeClusterTargetCapacity) {
// Stop traffic migration because the cluster being migrated to has reached its target capacity limit.
return activeClusterWeight, pendingClusterWeight, nil
}

Expand All @@ -720,11 +752,19 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context
interval := time.Duration(*options.IntervalSeconds) * time.Second
lastTrafficMigratedTime := pendingServiceStatus.LastTrafficMigratedTime
if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval {
// Gradually shift traffic from the active to the pending cluster.
logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent)
proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent
pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity)
activeClusterWeight = 100 - pendingClusterWeight
if isRollbackInProgress {
// Gradually shift traffic from the pending to the active cluster.
logger.Info("Rollback in progress. Shifting traffic back to active cluster.", "stepSize", *options.StepSizePercent)
proposedActiveWeight := activeClusterWeight + *options.StepSizePercent
activeClusterWeight = min(100, proposedActiveWeight, activeClusterTargetCapacity)
pendingClusterWeight = 100 - activeClusterWeight
} else {
// Gradually shift traffic from the active to the pending cluster.
logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent)
proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent
pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity)
activeClusterWeight = 100 - pendingClusterWeight
}
}
}

Expand Down Expand Up @@ -1423,6 +1463,31 @@ func (r *RayServiceReconciler) reconcileServeTargetCapacity(ctx context.Context,
}
maxSurgePercent := ptr.Deref(options.MaxSurgePercent, 100)

if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
// Rollback the upgrade. The active RayCluster should be scaled back to 100% target_capacity,
// while the pending RayCluster is scaled to 0%. This is the inverse of the regular upgrade path.
activeTrafficRoutedPercent := ptr.Deref(activeRayServiceStatus.TrafficRoutedPercent, 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent default for active TrafficRoutedPercent causes potential deadlock

Medium Severity

During rollback, reconcileServeTargetCapacity defaults active TrafficRoutedPercent to 0 via ptr.Deref(..., 0), while calculateTrafficRoutedPercent defaults the same field to 100 via ptr.Deref(..., 100). If TrafficRoutedPercent is ever nil during rollback, the capacity reconciler sees activeTargetCapacity(100) != activeTrafficRoutedPercent(0) and defers, while the traffic calculator sees activeClusterWeight(100) == activeClusterTargetCapacity(100) and stops migration. Neither function makes progress, creating a deadlock where the rollback can never complete.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ryanaoleary to take a look

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct solution is to default it to TargetCapacity if it's ever nil, rather than 100 or 0 since either can result in deadlock (100 (capacity) != 0 (traffic) -> defer or 100 (traffic) == 100 (capacity) -> defer if we default when it becomes nil.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in d02b7be and added unit tests for this case

if activeTargetCapacity != activeTrafficRoutedPercent {
logger.Info("Traffic is rolling back to active cluster, deferring capacity update.", "ActiveTargetCapacity", activeTargetCapacity, "ActiveTrafficRoutedPercent", activeTrafficRoutedPercent)
return nil
}

if activeTargetCapacity+pendingTargetCapacity > 100 {
if rayClusterInstance.Name == pendingRayServiceStatus.RayClusterName {
goalTargetCapacity := max(int32(0), pendingTargetCapacity-maxSurgePercent)
logger.Info("Rollback: Scaling down pending cluster `target_capacity`.", "goal", goalTargetCapacity)
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
}
} else {
if rayClusterInstance.Name == activeRayServiceStatus.RayClusterName {
goalTargetCapacity := min(int32(100), activeTargetCapacity+maxSurgePercent)
logger.Info("Rollback: Scaling up active cluster `target_capacity`.", "goal", goalTargetCapacity)
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
}
}
return nil
}

// Defer updating the target_capacity until traffic weights are updated
if pendingTargetCapacity != pendingTrafficRoutedPercent {
logger.Info("Traffic is currently being migrated to pending cluster", "RayCluster", pendingRayServiceStatus.RayClusterName, "TargetCapacity", pendingTargetCapacity, "TrafficRoutedPercent", pendingTrafficRoutedPercent)
Expand Down Expand Up @@ -1921,3 +1986,36 @@ func (r *RayServiceReconciler) reconcilePerClusterServeService(ctx context.Conte

return err
}

// reconcileRollbackState determines whether to initiate a rollback by setting the RollbackInProgress condition.
func (r *RayServiceReconciler) reconcileRollbackState(ctx context.Context, rayServiceInstance *rayv1.RayService, activeCluster, pendingCluster *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

goalHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
if err != nil {
return fmt.Errorf("failed to generate hash for goal cluster spec: %w", err)
}

originalHash := activeCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
pendingHash := pendingCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]

isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

// Case 1: The goal spec matches the pending cluster's spec. In this case, we should revert the rollback attempt
// and continue to upgrade as normal.
if goalHash == pendingHash {
if isRollbackInProgress {
logger.Info("Goal state matches pending cluster. Canceling rollback and resuming upgrade.")
meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))
}
return nil
}

// Case 2: The goal spec differs from pending cluster's spec. Rollback to original cluster.
if !isRollbackInProgress {
logger.Info("Goal state has changed during upgrade. Initiating rollback to the original cluster.", "goalHash", goalHash, "originalHash", originalHash, "pendingHash", pendingHash)
setCondition(rayServiceInstance, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.GoalClusterChanged, "Goal state changed, rolling back to original cluster.")
}

return nil
}
88 changes: 88 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2647,3 +2647,91 @@ func Test_RayServiceReconcileManagedBy(t *testing.T) {
})
}
}

func TestReconcileRollbackState(t *testing.T) {
ctx := context.TODO()
namespace := "test-ns"

baseSpec := rayv1.RayClusterSpec{
RayVersion: "2.54.0",
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{GroupName: "worker-group", Replicas: ptr.To(int32(1))},
},
}

updatedSpec := baseSpec.DeepCopy()
updatedSpec.RayVersion = "2.50.0"

baseHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(baseSpec)
require.NoError(t, err)

updatedHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(*updatedSpec)
require.NoError(t, err)

activeCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{Name: "active-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: baseHash}},
}
pendingCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{Name: "pending-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: updatedHash}},
}

tests := []struct {
name string
rayServiceSpec rayv1.RayClusterSpec
isRollbackInProgress bool
expectRollbackStatus bool
}{
{
name: "Normal RayService upgrade, goal matches pending",
rayServiceSpec: *updatedSpec,
isRollbackInProgress: false,
expectRollbackStatus: false,
},
{
name: "RayService Spec changed, initiate rollback",
rayServiceSpec: baseSpec,
isRollbackInProgress: false,
expectRollbackStatus: true,
},
{
name: "Rollback in progress, continues rolling back",
rayServiceSpec: baseSpec,
isRollbackInProgress: true,
expectRollbackStatus: true,
},
{
name: "Rollback cancelled, user updated spec back to pending",
rayServiceSpec: *updatedSpec,
isRollbackInProgress: true,
expectRollbackStatus: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rayService := &rayv1.RayService{
ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace},
Spec: rayv1.RayServiceSpec{
RayClusterSpec: tt.rayServiceSpec,
},
Status: rayv1.RayServiceStatuses{
Conditions: []metav1.Condition{},
},
}

if tt.isRollbackInProgress {
setCondition(rayService, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.GoalClusterChanged, "rolling back")
}

reconciler := RayServiceReconciler{
Recorder: record.NewFakeRecorder(1),
}

err := reconciler.reconcileRollbackState(ctx, rayService, activeCluster, pendingCluster)
require.NoError(t, err)

isCurrentlyRollingBack := meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.RollbackInProgress))
assert.Equal(t, tt.expectRollbackStatus, isCurrentlyRollingBack)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -216,3 +218,125 @@ func TestRayServiceIncrementalUpgrade(t *testing.T) {
stdout, _ = CurlRayServiceGateway(test, gatewayIP, curlPod, curlContainerName, "/fruit", `["MANGO", 2]`)
g.Expect(stdout.String()).To(Equal("8"))
}

func TestRayServiceIncrementalUpgradeRollback(t *testing.T) {
features.SetFeatureGateDuringTest(t, features.RayServiceIncrementalUpgrade, true)

test := With(t)
g := NewWithT(t)

namespace := test.NewTestNamespace()
rayServiceName := "rollback-rayservice"

// Create a RayService with IncrementalUpgrade enabled
stepSize := ptr.To(int32(25))
interval := ptr.To(int32(10))
maxSurge := ptr.To(int32(50))

rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).
WithSpec(IncrementalUpgradeRayServiceApplyConfiguration(stepSize, interval, maxSurge))
rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(rayService).NotTo(BeNil())

LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be ready", rayService.Namespace, rayService.Name)
g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium).
Should(WithTransform(IsRayServiceReady, BeTrue()))

// Copy original spec to use to trigger a rollback later.
rayService, err = GetRayService(test, namespace.Name, rayServiceName)
g.Expect(err).NotTo(HaveOccurred())
originalSpec := rayService.Spec.DeepCopy()

// Verify Gateway and HTTPRoute are ready.
gatewayName := fmt.Sprintf("%s-%s", rayServiceName, "gateway")
g.Eventually(Gateway(test, rayService.Namespace, gatewayName), TestTimeoutMedium).
Should(WithTransform(utils.IsGatewayReady, BeTrue()))

gateway, err := GetGateway(test, namespace.Name, fmt.Sprintf("%s-%s", rayServiceName, "gateway"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(gateway).NotTo(BeNil())

httpRouteName := fmt.Sprintf("%s-%s", "httproute", gatewayName)
LogWithTimestamp(test.T(), "Waiting for HTTPRoute %s/%s to be ready", rayService.Namespace, httpRouteName)
g.Eventually(HTTPRoute(test, rayService.Namespace, httpRouteName), TestTimeoutMedium).
Should(Not(BeNil()))

httpRoute, err := GetHTTPRoute(test, namespace.Name, httpRouteName)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(utils.IsHTTPRouteReady(gateway, httpRoute)).To(BeTrue())

// Trigger an incremental upgrade through a change to the RayCluster spec.
LogWithTimestamp(test.T(), "Triggering an upgrade for RayService %s/%s", rayService.Namespace, rayService.Name)
rayService, err = GetRayService(test, namespace.Name, rayServiceName)
g.Expect(err).NotTo(HaveOccurred())
rayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceCPU] = resource.MustParse("500m")
_, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{})
g.Expect(err).NotTo(HaveOccurred())

LogWithTimestamp(test.T(), "Waiting for RayService %s/%s UpgradeInProgress condition to be true", rayService.Namespace, rayService.Name)
g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutShort).Should(WithTransform(IsRayServiceUpgrading, BeTrue()))

// Wait for the upgrade to be underway with traffic partially migrated.
LogWithTimestamp(test.T(), "Waiting for upgrade to be partially complete")
g.Eventually(func(g Gomega) {
svc, err := GetRayService(test, namespace.Name, rayServiceName)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(svc.Status.PendingServiceStatus.TrafficRoutedPercent).NotTo(BeNil())
g.Expect(*svc.Status.PendingServiceStatus.TrafficRoutedPercent).Should(BeNumerically(">", 0))
}, TestTimeoutMedium).Should(Succeed())

// Trigger a rollback by updating the spec back to the original version.
LogWithTimestamp(test.T(), "Triggering a rollback for RayService %s/%s", rayService.Namespace, rayService.Name)
rayService, err = GetRayService(test, namespace.Name, rayServiceName)
g.Expect(err).NotTo(HaveOccurred())
rayService.Spec = *originalSpec
_, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{})
g.Expect(err).NotTo(HaveOccurred())

// Verify that the controller enters the rollback state.
LogWithTimestamp(test.T(), "Waiting for RayService %s/%s RollbackInProgress condition to be true", rayService.Namespace, rayService.Name)
g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutShort).Should(WithTransform(IsRayServiceRollingBack, BeTrue()))

// Verify that traffic gradually shifts back to the active cluster.
LogWithTimestamp(test.T(), "Verifying traffic shifts back to the active cluster")
g.Eventually(func(g Gomega) {
svc, err := GetRayService(test, namespace.Name, rayServiceName)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(svc.Status.ActiveServiceStatus.TrafficRoutedPercent).NotTo(BeNil())
g.Expect(*svc.Status.ActiveServiceStatus.TrafficRoutedPercent).Should(Equal(int32(100)))
g.Expect(svc.Status.PendingServiceStatus.TrafficRoutedPercent).NotTo(BeNil())
g.Expect(*svc.Status.PendingServiceStatus.TrafficRoutedPercent).Should(Equal(int32(0)))
}, TestTimeoutMedium).Should(Succeed())

// Verify that the rollback completes and the pending cluster is cleaned up.
LogWithTimestamp(test.T(), "Waiting for rollback to complete and pending cluster to be deleted")
g.Eventually(func(g Gomega) {
svc, err := GetRayService(test, namespace.Name, rayServiceName)
g.Expect(err).NotTo(HaveOccurred())
// Rollback is done when both conditions are false and pending status is empty.
g.Expect(IsRayServiceRollingBack(svc)).To(BeFalse())
g.Expect(IsRayServiceUpgrading(svc)).To(BeFalse())
g.Expect(svc.Status.PendingServiceStatus.RayClusterName).To(BeEmpty())
}, TestTimeoutMedium).Should(Succeed())

// Check that the pending RayCluster resource is deleted.
rayService, err = GetRayService(test, namespace.Name, rayServiceName)
g.Expect(err).NotTo(HaveOccurred())
pendingClusterName := rayService.Status.PendingServiceStatus.RayClusterName
if pendingClusterName != "" {
g.Eventually(func() error {
_, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), pendingClusterName, metav1.GetOptions{})
return err
}, TestTimeoutShort).Should(WithTransform(errors.IsNotFound, BeTrue()))
}

// The HTTPRoute should now only have one backend after the rollback completes.
g.Eventually(HTTPRoute(test, namespace.Name, httpRouteName), TestTimeoutShort).
Should(WithTransform(func(route *gwv1.HTTPRoute) int {
if route == nil || len(route.Spec.Rules) == 0 {
return 0
}
return len(route.Spec.Rules[0].BackendRefs)
}, Equal(1)))
}
Loading
Loading