Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ray-operator/apis/ray/v1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ const (
RayServiceReady RayServiceConditionType = "Ready"
// UpgradeInProgress means the RayService is currently performing a zero-downtime upgrade.
UpgradeInProgress RayServiceConditionType = "UpgradeInProgress"
// RollbackInProgress means the RayService is currently rolling back an in-progress upgrade to the original cluster state.
RollbackInProgress RayServiceConditionType = "RollbackInProgress"
)

const (
Expand All @@ -217,6 +219,7 @@ const (
NoPendingCluster RayServiceConditionReason = "NoPendingCluster"
NoActiveCluster RayServiceConditionReason = "NoActiveCluster"
RayServiceValidationFailed RayServiceConditionReason = "ValidationFailed"
GoalClusterChanged RayServiceConditionReason = "GoalClusterChanged"
)

// +kubebuilder:object:root=true
Expand Down
124 changes: 116 additions & 8 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
// Check if NewClusterWithIncrementalUpgrade is enabled, if so reconcile Gateway objects.
var httpRouteInstance *gwv1.HTTPRoute
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) {
// If an upgrade is in progress, check if rollback is necessary.
if activeRayClusterInstance != nil && pendingRayClusterInstance != nil {
if err := r.reconcileRollbackState(ctx, rayServiceInstance, activeRayClusterInstance, pendingRayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}
// Ensure per-cluster Serve service exists for the active and pending RayClusters.
if err = r.reconcilePerClusterServeService(ctx, rayServiceInstance, activeRayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
Expand Down Expand Up @@ -379,6 +385,31 @@ func (r *RayServiceReconciler) calculateStatus(

rayServiceInstance.Status.ObservedGeneration = rayServiceInstance.ObjectMeta.Generation

if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
activeStatus := &rayServiceInstance.Status.ActiveServiceStatus
pendingStatus := &rayServiceInstance.Status.PendingServiceStatus

// A rollback is complete when the active cluster is back at 100% TargetCapacity and TrafficRoutedPercent,
// and the pending cluster is at 0% TargetCapacity and TrafficRoutedPercent.
if ptr.Deref(activeStatus.TargetCapacity, -1) == 100 &&
ptr.Deref(activeStatus.TrafficRoutedPercent, -1) == 100 &&
ptr.Deref(pendingStatus.TargetCapacity, -1) == 0 &&
ptr.Deref(pendingStatus.TrafficRoutedPercent, -1) == 0 {

logger.Info("Rollback to original cluster is complete. Cleaning up pending cluster from prior upgrade.")

// Clear the RayService pending service status to clean up the pending cluster.
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{}
pendingCluster = nil
pendingClusterServeApplications = nil

meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

// Ensure the upgrade state machine resets after a successful rollback.
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.NoPendingCluster, "Rollback complete, active Ray cluster exists and no pending Ray cluster")
}
}

// Update RayClusterStatus in RayService status.
var activeClusterStatus, pendingClusterStatus rayv1.RayClusterStatus
if activeCluster != nil {
Expand Down Expand Up @@ -412,7 +443,7 @@ func (r *RayServiceReconciler) calculateStatus(
logger.Info("Updated LastTrafficMigratedTime of Active Service.")
}
}
if pendingWeight >= 0 {
if pendingWeight >= 0 && pendingCluster != nil {
rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent = ptr.To(pendingWeight)
logger.Info("Updated pending TrafficRoutedPercent from HTTPRoute", "pendingClusterWeight", pendingWeight)
if pendingWeight != oldPendingPercent {
Expand Down Expand Up @@ -709,9 +740,11 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context

// Check that target_capacity has been updated before migrating traffic.
pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0)
activeClusterTargetCapacity := ptr.Deref(activeServiceStatus.TargetCapacity, 100)
isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

if pendingClusterWeight == pendingClusterTargetCapacity {
// Stop traffic migration because the pending cluster's current traffic weight has reached its target capacity limit.
if (pendingClusterWeight == pendingClusterTargetCapacity && !isRollbackInProgress) || (isRollbackInProgress && activeClusterWeight == activeClusterTargetCapacity) {
// Stop traffic migration because the cluster being migrated to has reached its target capacity limit.
return activeClusterWeight, pendingClusterWeight, nil
}

Expand All @@ -720,11 +753,19 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context
interval := time.Duration(*options.IntervalSeconds) * time.Second
lastTrafficMigratedTime := pendingServiceStatus.LastTrafficMigratedTime
if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval {
// Gradually shift traffic from the active to the pending cluster.
logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent)
proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent
pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity)
activeClusterWeight = 100 - pendingClusterWeight
if isRollbackInProgress {
// Gradually shift traffic from the pending to the active cluster.
logger.Info("Rollback in progress. Shifting traffic back to active cluster.", "stepSize", *options.StepSizePercent)
proposedActiveWeight := activeClusterWeight + *options.StepSizePercent
activeClusterWeight = min(100, proposedActiveWeight, activeClusterTargetCapacity)
pendingClusterWeight = 100 - activeClusterWeight
} else {
// Gradually shift traffic from the active to the pending cluster.
logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent)
proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent
pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity)
activeClusterWeight = 100 - pendingClusterWeight
}
}
}

Expand Down Expand Up @@ -1094,6 +1135,12 @@ func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayS
if isPendingClusterServing {
return false
}

// Do not prepare a new cluster while a rollback is actively in progress.
if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
return false
}

if activeRayCluster == nil && pendingRayCluster == nil {
// Both active and pending clusters are nil, which means the RayService has just been created.
// Create a new pending cluster.
Expand Down Expand Up @@ -1423,6 +1470,31 @@ func (r *RayServiceReconciler) reconcileServeTargetCapacity(ctx context.Context,
}
maxSurgePercent := ptr.Deref(options.MaxSurgePercent, 100)

if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
// Rollback the upgrade. The active RayCluster should be scaled back to 100% target_capacity,
// while the pending RayCluster is scaled to 0%. This is the inverse of the regular upgrade path.
activeTrafficRoutedPercent := ptr.Deref(activeRayServiceStatus.TrafficRoutedPercent, 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent default for active TrafficRoutedPercent causes potential deadlock

Medium Severity

During rollback, reconcileServeTargetCapacity defaults active TrafficRoutedPercent to 0 via ptr.Deref(..., 0), while calculateTrafficRoutedPercent defaults the same field to 100 via ptr.Deref(..., 100). If TrafficRoutedPercent is ever nil during rollback, the capacity reconciler sees activeTargetCapacity(100) != activeTrafficRoutedPercent(0) and defers, while the traffic calculator sees activeClusterWeight(100) == activeClusterTargetCapacity(100) and stops migration. Neither function makes progress, creating a deadlock where the rollback can never complete.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ryanaoleary to take a look

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct solution is to default it to TargetCapacity if it's ever nil, rather than 100 or 0 since either can result in deadlock (100 (capacity) != 0 (traffic) -> defer or 100 (traffic) == 100 (capacity) -> defer if we default when it becomes nil.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in d02b7be and added unit tests for this case

if activeTargetCapacity != activeTrafficRoutedPercent {
logger.Info("Traffic is rolling back to active cluster, deferring capacity update.", "ActiveTargetCapacity", activeTargetCapacity, "ActiveTrafficRoutedPercent", activeTrafficRoutedPercent)
return nil
}

if activeTargetCapacity+pendingTargetCapacity > 100 {
if rayClusterInstance.Name == pendingRayServiceStatus.RayClusterName {
goalTargetCapacity := max(int32(0), pendingTargetCapacity-maxSurgePercent)
logger.Info("Rollback: Scaling down pending cluster `target_capacity`.", "goal", goalTargetCapacity)
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
}
} else {
if rayClusterInstance.Name == activeRayServiceStatus.RayClusterName {
goalTargetCapacity := min(int32(100), activeTargetCapacity+maxSurgePercent)
logger.Info("Rollback: Scaling up active cluster `target_capacity`.", "goal", goalTargetCapacity)
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
}
}
return nil
}

// Defer updating the target_capacity until traffic weights are updated
if pendingTargetCapacity != pendingTrafficRoutedPercent {
logger.Info("Traffic is currently being migrated to pending cluster", "RayCluster", pendingRayServiceStatus.RayClusterName, "TargetCapacity", pendingTargetCapacity, "TrafficRoutedPercent", pendingTrafficRoutedPercent)
Expand Down Expand Up @@ -1921,3 +1993,39 @@ func (r *RayServiceReconciler) reconcilePerClusterServeService(ctx context.Conte

return err
}

// reconcileRollbackState determines whether to initiate a rollback by setting the RollbackInProgress condition.
func (r *RayServiceReconciler) reconcileRollbackState(ctx context.Context, rayServiceInstance *rayv1.RayService, activeCluster, pendingCluster *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

goalHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
if err != nil {
return fmt.Errorf("failed to generate hash for goal cluster spec: %w", err)
}

originalHash := activeCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
pendingHash := pendingCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]

isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

// Case 1: The goal spec matches the pending cluster's spec.
// The upgrade is on track. We should revert any accidental rollback attempt and continue.
if goalHash == pendingHash {
if isRollbackInProgress {
logger.Info("Goal state matches pending cluster. Canceling rollback and resuming upgrade.")
meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))
}
return nil
}

// Case 2: The goal spec diverges from the pending cluster.
// This happens if the user reverted to the original spec, or if they submitted a 3rd entirely new spec mid-upgrade.
// In all divergence cases, we must first safely route all traffic back to the original cluster before allowing
// a new cluster to be spun up.
if !isRollbackInProgress {
logger.Info("Goal state has changed during upgrade. Initiating safe rollback to the original cluster.", "goalHash", goalHash, "originalHash", originalHash, "pendingHash", pendingHash)
setCondition(rayServiceInstance, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.GoalClusterChanged, "Goal state changed mid-upgrade, rolling back to original cluster.")
}

return nil
}
88 changes: 88 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2647,3 +2647,91 @@ func Test_RayServiceReconcileManagedBy(t *testing.T) {
})
}
}

func TestReconcileRollbackState(t *testing.T) {
ctx := context.TODO()
namespace := "test-ns"

baseSpec := rayv1.RayClusterSpec{
RayVersion: "2.54.0",
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{GroupName: "worker-group", Replicas: ptr.To(int32(1))},
},
}

updatedSpec := baseSpec.DeepCopy()
updatedSpec.RayVersion = "2.50.0"

baseHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(baseSpec)
require.NoError(t, err)

updatedHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(*updatedSpec)
require.NoError(t, err)

activeCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{Name: "active-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: baseHash}},
}
pendingCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{Name: "pending-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: updatedHash}},
}

tests := []struct {
name string
rayServiceSpec rayv1.RayClusterSpec
isRollbackInProgress bool
expectRollbackStatus bool
}{
{
name: "Normal RayService upgrade, goal matches pending",
rayServiceSpec: *updatedSpec,
isRollbackInProgress: false,
expectRollbackStatus: false,
},
{
name: "RayService Spec changed, initiate rollback",
rayServiceSpec: baseSpec,
isRollbackInProgress: false,
expectRollbackStatus: true,
},
{
name: "Rollback in progress, continues rolling back",
rayServiceSpec: baseSpec,
isRollbackInProgress: true,
expectRollbackStatus: true,
},
{
name: "Rollback canceled, user updated spec back to pending",
rayServiceSpec: *updatedSpec,
isRollbackInProgress: true,
expectRollbackStatus: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rayService := &rayv1.RayService{
ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace},
Spec: rayv1.RayServiceSpec{
RayClusterSpec: tt.rayServiceSpec,
},
Status: rayv1.RayServiceStatuses{
Conditions: []metav1.Condition{},
},
}

if tt.isRollbackInProgress {
setCondition(rayService, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.GoalClusterChanged, "rolling back")
}

reconciler := RayServiceReconciler{
Recorder: record.NewFakeRecorder(1),
}

err := reconciler.reconcileRollbackState(ctx, rayService, activeCluster, pendingCluster)
require.NoError(t, err)

isCurrentlyRollingBack := meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.RollbackInProgress))
assert.Equal(t, tt.expectRollbackStatus, isCurrentlyRollingBack)
})
}
}
Loading
Loading