Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ray-operator/apis/ray/v1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ const (
RayServiceReady RayServiceConditionType = "Ready"
// UpgradeInProgress means the RayService is currently performing a zero-downtime upgrade.
UpgradeInProgress RayServiceConditionType = "UpgradeInProgress"
// RollbackInProgress means the RayService is currently rolling back an in-progress upgrade to the original cluster state.
RollbackInProgress RayServiceConditionType = "RollbackInProgress"
)

const (
Expand All @@ -217,6 +219,7 @@ const (
NoPendingCluster RayServiceConditionReason = "NoPendingCluster"
NoActiveCluster RayServiceConditionReason = "NoActiveCluster"
RayServiceValidationFailed RayServiceConditionReason = "ValidationFailed"
GoalClusterChanged RayServiceConditionReason = "GoalClusterChanged"
)

// +kubebuilder:object:root=true
Expand Down
162 changes: 142 additions & 20 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, client.IgnoreNotFound(err)
}

// Determine the rollback state immediately before making serving decisions.
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) {
// If an upgrade is in progress, check if rollback is necessary.
isUpgradeInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress))
if isUpgradeInProgress && activeRayClusterInstance != nil && pendingRayClusterInstance != nil {
if err := r.reconcileRollbackState(ctx, rayServiceInstance, activeRayClusterInstance, pendingRayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}
}

// Check both active and pending Ray clusters to see if the head Pod is ready to serve requests.
// This is important to ensure the reliability of the serve service because the head Pod cannot
// rely on readiness probes to determine serve readiness.
Expand Down Expand Up @@ -412,7 +423,7 @@ func (r *RayServiceReconciler) calculateStatus(
logger.Info("Updated LastTrafficMigratedTime of Active Service.")
}
}
if pendingWeight >= 0 {
if pendingWeight >= 0 && pendingCluster != nil {
rayServiceInstance.Status.PendingServiceStatus.TrafficRoutedPercent = ptr.To(pendingWeight)
logger.Info("Updated pending TrafficRoutedPercent from HTTPRoute", "pendingClusterWeight", pendingWeight)
if pendingWeight != oldPendingPercent {
Expand All @@ -425,6 +436,30 @@ func (r *RayServiceReconciler) calculateStatus(
isPendingClusterServing = reconcilePromotionAndServingStatus(ctx, headSvc, serveSvc, rayServiceInstance, pendingCluster)
}

if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
activeStatus := &rayServiceInstance.Status.ActiveServiceStatus
pendingStatus := &rayServiceInstance.Status.PendingServiceStatus

// A rollback is complete when the active cluster is back at 100% TargetCapacity and TrafficRoutedPercent,
// and the pending cluster is at 0% TargetCapacity and TrafficRoutedPercent.
if ptr.Deref(activeStatus.TargetCapacity, -1) == 100 &&
ptr.Deref(activeStatus.TrafficRoutedPercent, -1) == 100 &&
ptr.Deref(pendingStatus.TargetCapacity, -1) == 0 &&
ptr.Deref(pendingStatus.TrafficRoutedPercent, -1) == 0 {

logger.Info("Rollback to original cluster is complete. Cleaning up pending cluster from prior upgrade.")

// Clear the RayService pending service status to clean up the pending cluster.
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{}
pendingCluster = nil

meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

// Ensure the upgrade state machine resets after a successful rollback.
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.NoPendingCluster, "Rollback complete, active Ray cluster exists and no pending Ray cluster")
}
}

if shouldPrepareNewCluster(ctx, rayServiceInstance, activeCluster, pendingCluster, isPendingClusterServing) {
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{
RayClusterName: utils.GenerateRayClusterName(rayServiceInstance.Name),
Expand Down Expand Up @@ -700,31 +735,51 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context
activeClusterWeight = ptr.Deref(activeServiceStatus.TrafficRoutedPercent, 100)
pendingClusterWeight = ptr.Deref(pendingServiceStatus.TrafficRoutedPercent, 0)

if isPendingClusterReady {
// Zero-downtime upgrade in progress.
options := utils.GetRayServiceClusterUpgradeOptions(&rayServiceInstance.Spec)
if options == nil {
return 0, 0, errstd.New("ClusterUpgradeOptions are not set during upgrade.")
}
// Zero-downtime upgrade in progress.
options := utils.GetRayServiceClusterUpgradeOptions(&rayServiceInstance.Spec)
if options == nil {
return 0, 0, errstd.New("ClusterUpgradeOptions are not set during upgrade.")
}

// Check that target_capacity has been updated before migrating traffic.
pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0)
// Check that target_capacity has been updated before migrating traffic.
pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0)
activeClusterTargetCapacity := ptr.Deref(activeServiceStatus.TargetCapacity, 100)
isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

if pendingClusterWeight == pendingClusterTargetCapacity {
// Stop traffic migration because the pending cluster's current traffic weight has reached its target capacity limit.
return activeClusterWeight, pendingClusterWeight, nil
}
if (pendingClusterWeight == pendingClusterTargetCapacity && !isRollbackInProgress) || (isRollbackInProgress && activeClusterWeight == activeClusterTargetCapacity) {
// Stop traffic migration because the cluster being migrated to has reached its target capacity limit.
return activeClusterWeight, pendingClusterWeight, nil
}

// If IntervalSeconds has passed since LastTrafficMigratedTime, migrate StepSizePercent traffic
// from the active RayCluster to the pending RayCluster.
interval := time.Duration(*options.IntervalSeconds) * time.Second

// If IntervalSeconds has passed since LastTrafficMigratedTime, migrate StepSizePercent traffic
// from the active RayCluster to the pending RayCluster.
interval := time.Duration(*options.IntervalSeconds) * time.Second
lastTrafficMigratedTime := pendingServiceStatus.LastTrafficMigratedTime
if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval {
// Gradually shift traffic from the active to the pending cluster.
// Determine which timestamp to use based on the direction of traffic flow.
// We use the LastTrafficMigratedTime of the cluster to which traffic is increasing.
var lastTrafficMigratedTime *metav1.Time
if isRollbackInProgress {
lastTrafficMigratedTime = activeServiceStatus.LastTrafficMigratedTime
} else {
lastTrafficMigratedTime = pendingServiceStatus.LastTrafficMigratedTime
}

if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval {
if isRollbackInProgress {
// Gradually shift traffic from the pending to the active cluster.
// Rollback traffic migration occurs regardless of pending cluster readiness.
logger.Info("Rollback in progress. Shifting traffic back to active cluster.", "stepSize", *options.StepSizePercent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the active / pending terminology will get confusing going forward, especially as we add rollback support. Do we use these terms in the APIs or only in the logs? If only in the logs we should consider changing the terminology in a future PR. Maybe next cluster and previous cluster might be easier to understand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

active and pending are used in the API in ActiveServiceStatus and PendingServiceStatus, so I used similar terminology when referring to the cluster that's being scaled for each service. Currently we only refer to the active/pending clusters in the comments/logs though so that should be fine to change to next cluster and previous cluster. Can follow-up in the next PR if needed

Comment on lines +768 to +771

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Gate rollback traffic migration on active readiness

The rollback branch starts shifting HTTPRoute weight back to the active cluster whenever isRollbackInProgress is true, but it never checks whether the active cluster is actually ready to serve. In the case where the active cluster is unhealthy during an in-progress upgrade and the user changes spec (triggering rollback), this logic can still move traffic onto a non-ready backend and cause request failures; rollback traffic shifts should be conditioned on active-cluster readiness the same way forward migration is conditioned on pending readiness.

Useful? React with 👍 / 👎.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ryanaoleary to take a look

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 81ad0cf

proposedActiveWeight := activeClusterWeight + *options.StepSizePercent
activeClusterWeight = min(100, proposedActiveWeight, activeClusterTargetCapacity)
pendingClusterWeight = 100 - activeClusterWeight
} else if isPendingClusterReady {
// Gradually shift traffic from the active to the pending cluster if it's ready.
logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent)
proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent
pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity)
activeClusterWeight = 100 - pendingClusterWeight
} else {
logger.Info("Upgrade in progress, but pending cluster is not ready. Pausing traffic migration.")
}
}

Expand Down Expand Up @@ -871,7 +926,7 @@ func (r *RayServiceReconciler) reconcileHTTPRoute(ctx context.Context, rayServic
}

// If HTTPRoute already exists, check if update is needed
if !reflect.DeepEqual(existingHTTPRoute.Spec, desiredHTTPRoute.Spec) {
if !utils.IsHTTPRouteEqual(existingHTTPRoute, desiredHTTPRoute) {
logger.Info("Updating existing HTTPRoute", "name", desiredHTTPRoute.Name)
existingHTTPRoute.Spec = desiredHTTPRoute.Spec
if err := r.Update(ctx, existingHTTPRoute); err != nil {
Expand Down Expand Up @@ -1094,6 +1149,12 @@ func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayS
if isPendingClusterServing {
return false
}

// Do not prepare a new cluster while a rollback is actively in progress.
if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
return false
}

if activeRayCluster == nil && pendingRayCluster == nil {
// Both active and pending clusters are nil, which means the RayService has just been created.
// Create a new pending cluster.
Expand Down Expand Up @@ -1423,6 +1484,31 @@ func (r *RayServiceReconciler) reconcileServeTargetCapacity(ctx context.Context,
}
maxSurgePercent := ptr.Deref(options.MaxSurgePercent, 100)

if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) {
// Rollback the upgrade. The active RayCluster should be scaled back to 100% target_capacity,
// while the pending RayCluster is scaled to 0%. This is the inverse of the regular upgrade path.
activeTrafficRoutedPercent := ptr.Deref(activeRayServiceStatus.TrafficRoutedPercent, 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent default for active TrafficRoutedPercent causes potential deadlock

Medium Severity

During rollback, reconcileServeTargetCapacity defaults active TrafficRoutedPercent to 0 via ptr.Deref(..., 0), while calculateTrafficRoutedPercent defaults the same field to 100 via ptr.Deref(..., 100). If TrafficRoutedPercent is ever nil during rollback, the capacity reconciler sees activeTargetCapacity(100) != activeTrafficRoutedPercent(0) and defers, while the traffic calculator sees activeClusterWeight(100) == activeClusterTargetCapacity(100) and stops migration. Neither function makes progress, creating a deadlock where the rollback can never complete.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ryanaoleary to take a look

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct solution is to default it to TargetCapacity if it's ever nil, rather than 100 or 0 since either can result in deadlock (100 (capacity) != 0 (traffic) -> defer or 100 (traffic) == 100 (capacity) -> defer if we default when it becomes nil.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in d02b7be and added unit tests for this case

if activeTargetCapacity != activeTrafficRoutedPercent {
logger.Info("Traffic is rolling back to active cluster, deferring capacity update.", "ActiveTargetCapacity", activeTargetCapacity, "ActiveTrafficRoutedPercent", activeTrafficRoutedPercent)
return nil
}

if activeTargetCapacity+pendingTargetCapacity > 100 {
if rayClusterInstance.Name == pendingRayServiceStatus.RayClusterName {
goalTargetCapacity := max(int32(0), pendingTargetCapacity-maxSurgePercent)
logger.Info("Rollback: Scaling down pending cluster `target_capacity`.", "goal", goalTargetCapacity)
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
}
} else {
if rayClusterInstance.Name == activeRayServiceStatus.RayClusterName {
goalTargetCapacity := min(int32(100), activeTargetCapacity+maxSurgePercent)
logger.Info("Rollback: Scaling up active cluster `target_capacity`.", "goal", goalTargetCapacity)
return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity)
}
}
return nil
}

// Defer updating the target_capacity until traffic weights are updated
if pendingTargetCapacity != pendingTrafficRoutedPercent {
logger.Info("Traffic is currently being migrated to pending cluster", "RayCluster", pendingRayServiceStatus.RayClusterName, "TargetCapacity", pendingTargetCapacity, "TrafficRoutedPercent", pendingTrafficRoutedPercent)
Expand Down Expand Up @@ -1921,3 +2007,39 @@ func (r *RayServiceReconciler) reconcilePerClusterServeService(ctx context.Conte

return err
}

// reconcileRollbackState determines whether to initiate a rollback by setting the RollbackInProgress condition.
func (r *RayServiceReconciler) reconcileRollbackState(ctx context.Context, rayServiceInstance *rayv1.RayService, activeCluster, pendingCluster *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

goalHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
if err != nil {
return fmt.Errorf("failed to generate hash for goal cluster spec: %w", err)
}

originalHash := activeCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
pendingHash := pendingCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]

isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))

// Case 1: The goal spec matches the pending cluster's spec.
// The upgrade is on track. We should revert any accidental rollback attempt and continue.
if goalHash == pendingHash {
if isRollbackInProgress {
logger.Info("Goal state matches pending cluster. Canceling rollback and resuming upgrade.")
meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress))
}
return nil
}

// Case 2: The goal spec diverges from the pending cluster.
// This happens if the user reverted to the original spec, or if they submitted a 3rd entirely new spec mid-upgrade.
// In all divergence cases, we must first safely route all traffic back to the original cluster before allowing
// a new cluster to be spun up.
if !isRollbackInProgress {
logger.Info("Goal state has changed during upgrade. Initiating safe rollback to the original cluster.", "goalHash", goalHash, "originalHash", originalHash, "pendingHash", pendingHash)
setCondition(rayServiceInstance, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.GoalClusterChanged, "Goal state changed mid-upgrade, rolling back to original cluster.")
}

return nil
}
88 changes: 88 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2647,3 +2647,91 @@ func Test_RayServiceReconcileManagedBy(t *testing.T) {
})
}
}

func TestReconcileRollbackState(t *testing.T) {
ctx := context.TODO()
namespace := "test-ns"

baseSpec := rayv1.RayClusterSpec{
RayVersion: "2.54.0",
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{GroupName: "worker-group", Replicas: ptr.To(int32(1))},
},
}

updatedSpec := baseSpec.DeepCopy()
updatedSpec.RayVersion = "2.50.0"

baseHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(baseSpec)
require.NoError(t, err)

updatedHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(*updatedSpec)
require.NoError(t, err)

activeCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{Name: "active-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: baseHash}},
}
pendingCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{Name: "pending-cluster", Namespace: namespace, Annotations: map[string]string{utils.HashWithoutReplicasAndWorkersToDeleteKey: updatedHash}},
}

tests := []struct {
name string
rayServiceSpec rayv1.RayClusterSpec
isRollbackInProgress bool
expectRollbackStatus bool
}{
{
name: "Normal RayService upgrade, goal matches pending",
rayServiceSpec: *updatedSpec,
isRollbackInProgress: false,
expectRollbackStatus: false,
},
{
name: "RayService Spec changed, initiate rollback",
rayServiceSpec: baseSpec,
isRollbackInProgress: false,
expectRollbackStatus: true,
},
{
name: "Rollback in progress, continues rolling back",
rayServiceSpec: baseSpec,
isRollbackInProgress: true,
expectRollbackStatus: true,
},
{
name: "Rollback canceled, user updated spec back to pending",
rayServiceSpec: *updatedSpec,
isRollbackInProgress: true,
expectRollbackStatus: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rayService := &rayv1.RayService{
ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace},
Spec: rayv1.RayServiceSpec{
RayClusterSpec: tt.rayServiceSpec,
},
Status: rayv1.RayServiceStatuses{
Conditions: []metav1.Condition{},
},
}

if tt.isRollbackInProgress {
setCondition(rayService, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.GoalClusterChanged, "rolling back")
}

reconciler := RayServiceReconciler{
Recorder: record.NewFakeRecorder(1),
}

err := reconciler.reconcileRollbackState(ctx, rayService, activeCluster, pendingCluster)
require.NoError(t, err)

isCurrentlyRollingBack := meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.RollbackInProgress))
assert.Equal(t, tt.expectRollbackStatus, isCurrentlyRollingBack)
})
}
}
26 changes: 26 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,3 +1039,29 @@ func GetRayHttpProxyClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
func HasSubmitter(rayJobInstance *rayv1.RayJob) bool {
return rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode
}

// IsHTTPRouteEqual checks if the existing HTTPRoute matches the desired HTTPRoute.
func IsHTTPRouteEqual(existing, desired *gwv1.HTTPRoute) bool {
if len(existing.Spec.Rules) != len(desired.Spec.Rules) {
return false
}

for i := range desired.Spec.Rules {
if len(existing.Spec.Rules[i].BackendRefs) != len(desired.Spec.Rules[i].BackendRefs) {
return false
}

for j := range desired.Spec.Rules[i].BackendRefs {
existingRef := existing.Spec.Rules[i].BackendRefs[j]
desiredRef := desired.Spec.Rules[i].BackendRefs[j]

// Only compare the fields the controller updates.
if string(existingRef.Name) != string(desiredRef.Name) ||
ptr.Deref(existingRef.Weight, 1) != ptr.Deref(desiredRef.Weight, 1) ||
ptr.Deref(existingRef.Port, 0) != ptr.Deref(desiredRef.Port, 0) {
return false
}
}
}
return true
}
Loading
Loading