-
Notifications
You must be signed in to change notification settings - Fork 723
[RayService] Rollback Support for Incremental Upgrades #4109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
5938b5b
31733e4
bdc3007
7613de3
3615825
b93aeb4
11299d6
e263d1f
9d659d0
eb52d96
a7867fa
9978cc8
196ea1f
cc0a5a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -207,6 +207,12 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque | |
| // Check if NewClusterWithIncrementalUpgrade is enabled, if so reconcile Gateway objects. | ||
| var httpRouteInstance *gwv1.HTTPRoute | ||
| if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) { | ||
| // If an upgrade is in progress, check if rollback is necessary. | ||
| if activeRayClusterInstance != nil && pendingRayClusterInstance != nil { | ||
| if err := r.reconcileRollbackState(ctx, rayServiceInstance, activeRayClusterInstance, pendingRayClusterInstance); err != nil { | ||
| return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err | ||
| } | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Ensure per-cluster Serve service exists for the active and pending RayClusters. | ||
| if err = r.reconcilePerClusterServeService(ctx, rayServiceInstance, activeRayClusterInstance); err != nil { | ||
| return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err | ||
|
|
@@ -379,6 +385,27 @@ func (r *RayServiceReconciler) calculateStatus( | |
|
|
||
| rayServiceInstance.Status.ObservedGeneration = rayServiceInstance.ObjectMeta.Generation | ||
|
|
||
| if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) { | ||
| activeStatus := &rayServiceInstance.Status.ActiveServiceStatus | ||
| pendingStatus := &rayServiceInstance.Status.PendingServiceStatus | ||
|
|
||
| // A rollback is complete when the active cluster is back at 100% TargetCapacity and TrafficRoutedPercent, | ||
| // and the pending cluster is at 0% TargetCapacity and TrafficRoutedPercent. | ||
| if ptr.Deref(activeStatus.TargetCapacity, -1) == 100 && | ||
| ptr.Deref(activeStatus.TrafficRoutedPercent, -1) == 100 && | ||
| ptr.Deref(pendingStatus.TargetCapacity, -1) == 0 && | ||
| ptr.Deref(pendingStatus.TrafficRoutedPercent, -1) == 0 { | ||
|
|
||
| logger.Info("Rollback to original cluster is complete. Cleaning up pending cluster from prior upgrade.") | ||
|
|
||
| // Clear the RayService pending service status to clean up the pending cluster. | ||
| rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{} | ||
| pendingCluster = nil | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
ryanaoleary marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Update RayClusterStatus in RayService status. | ||
| var activeClusterStatus, pendingClusterStatus rayv1.RayClusterStatus | ||
| if activeCluster != nil { | ||
|
|
@@ -709,9 +736,11 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context | |
|
|
||
| // Check that target_capacity has been updated before migrating traffic. | ||
| pendingClusterTargetCapacity := ptr.Deref(pendingServiceStatus.TargetCapacity, 0) | ||
| activeClusterTargetCapacity := ptr.Deref(activeServiceStatus.TargetCapacity, 100) | ||
| isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) | ||
|
|
||
| if pendingClusterWeight == pendingClusterTargetCapacity { | ||
| // Stop traffic migration because the pending cluster's current traffic weight has reached its target capacity limit. | ||
| if (pendingClusterWeight == pendingClusterTargetCapacity && !isRollbackInProgress) || (isRollbackInProgress && activeClusterWeight == activeClusterTargetCapacity) { | ||
| // Stop traffic migration because the cluster being migrated to has reached its target capacity limit. | ||
| return activeClusterWeight, pendingClusterWeight, nil | ||
| } | ||
|
|
||
|
|
@@ -720,11 +749,19 @@ func (r *RayServiceReconciler) calculateTrafficRoutedPercent(ctx context.Context | |
| interval := time.Duration(*options.IntervalSeconds) * time.Second | ||
| lastTrafficMigratedTime := pendingServiceStatus.LastTrafficMigratedTime | ||
| if lastTrafficMigratedTime == nil || time.Since(lastTrafficMigratedTime.Time) >= interval { | ||
| // Gradually shift traffic from the active to the pending cluster. | ||
| logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent) | ||
| proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent | ||
| pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity) | ||
| activeClusterWeight = 100 - pendingClusterWeight | ||
| if isRollbackInProgress { | ||
| // Gradually shift traffic from the pending to the active cluster. | ||
| logger.Info("Rollback in progress. Shifting traffic back to active cluster.", "stepSize", *options.StepSizePercent) | ||
| proposedActiveWeight := activeClusterWeight + *options.StepSizePercent | ||
| activeClusterWeight = min(100, proposedActiveWeight, activeClusterTargetCapacity) | ||
| pendingClusterWeight = 100 - activeClusterWeight | ||
| } else { | ||
| // Gradually shift traffic from the active to the pending cluster. | ||
| logger.Info("Upgrade in progress. Migrating traffic by StepSizePercent.", "stepSize", *options.StepSizePercent) | ||
| proposedPendingWeight := pendingClusterWeight + *options.StepSizePercent | ||
| pendingClusterWeight = min(100, proposedPendingWeight, pendingClusterTargetCapacity) | ||
| activeClusterWeight = 100 - pendingClusterWeight | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
|
|
@@ -1423,6 +1460,31 @@ func (r *RayServiceReconciler) reconcileServeTargetCapacity(ctx context.Context, | |
| } | ||
| maxSurgePercent := ptr.Deref(options.MaxSurgePercent, 100) | ||
|
|
||
| if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) { | ||
| // Rollback the upgrade. The active RayCluster should be scaled back to 100% target_capacity, | ||
| // while the pending RayCluster is scaled to 0%. This is the inverse of the regular upgrade path. | ||
| activeTrafficRoutedPercent := ptr.Deref(activeRayServiceStatus.TrafficRoutedPercent, 0) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent default for active TrafficRoutedPercent causes potential deadlockMedium Severity During rollback, Additional Locations (1)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @ryanaoleary to take a look
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the correct solution is to default it to
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in d02b7be and added unit tests for this case |
||
| if activeTargetCapacity != activeTrafficRoutedPercent { | ||
| logger.Info("Traffic is rolling back to active cluster, deferring capacity update.", "ActiveTargetCapacity", activeTargetCapacity, "ActiveTrafficRoutedPercent", activeTrafficRoutedPercent) | ||
| return nil | ||
| } | ||
|
|
||
| if activeTargetCapacity+pendingTargetCapacity > 100 { | ||
| if rayClusterInstance.Name == pendingRayServiceStatus.RayClusterName { | ||
| goalTargetCapacity := max(int32(0), pendingTargetCapacity-maxSurgePercent) | ||
| logger.Info("Rollback: Scaling down pending cluster `target_capacity`.", "goal", goalTargetCapacity) | ||
| return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity) | ||
| } | ||
| } else { | ||
| if rayClusterInstance.Name == activeRayServiceStatus.RayClusterName { | ||
| goalTargetCapacity := min(int32(100), activeTargetCapacity+maxSurgePercent) | ||
| logger.Info("Rollback: Scaling up active cluster `target_capacity`.", "goal", goalTargetCapacity) | ||
| return r.applyServeTargetCapacity(ctx, rayServiceInstance, rayClusterInstance, rayDashboardClient, goalTargetCapacity) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Defer updating the target_capacity until traffic weights are updated | ||
| if pendingTargetCapacity != pendingTrafficRoutedPercent { | ||
| logger.Info("Traffic is currently being migrated to pending cluster", "RayCluster", pendingRayServiceStatus.RayClusterName, "TargetCapacity", pendingTargetCapacity, "TrafficRoutedPercent", pendingTrafficRoutedPercent) | ||
|
|
@@ -1921,3 +1983,36 @@ func (r *RayServiceReconciler) reconcilePerClusterServeService(ctx context.Conte | |
|
|
||
| return err | ||
| } | ||
|
|
||
| // reconcileRollbackState determines whether to initiate a rollback by setting the RollbackInProgress condition. | ||
| func (r *RayServiceReconciler) reconcileRollbackState(ctx context.Context, rayServiceInstance *rayv1.RayService, activeCluster, pendingCluster *rayv1.RayCluster) error { | ||
| logger := ctrl.LoggerFrom(ctx) | ||
|
|
||
| goalHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec) | ||
ryanaoleary marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if err != nil { | ||
| return fmt.Errorf("failed to generate hash for goal cluster spec: %w", err) | ||
| } | ||
|
|
||
| originalHash := activeCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] | ||
| pendingHash := pendingCluster.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] | ||
|
|
||
| isRollbackInProgress := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) | ||
|
|
||
| // Case 1: The goal spec matches the pending cluster's spec. In this case, we should revert the rollback attempt | ||
| // and continue to upgrade as normal. | ||
| if goalHash == pendingHash { | ||
| if isRollbackInProgress { | ||
| logger.Info("Goal state matches pending cluster. Canceling rollback and resuming upgrade.") | ||
| meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Case 2: The goal spec differs from pending cluster's spec. Rollback to original cluster. | ||
| if !isRollbackInProgress { | ||
| logger.Info("Goal state has changed during upgrade. Initiating rollback to the original cluster.", "goalHash", goalHash, "originalHash", originalHash, "pendingHash", pendingHash) | ||
| setCondition(rayServiceInstance, rayv1.RollbackInProgress, metav1.ConditionTrue, rayv1.GoalClusterChanged, "Goal state changed, rolling back to original cluster.") | ||
| } | ||
ryanaoleary marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return nil | ||
| } | ||


Uh oh!
There was an error while loading. Please reload this page.