From a08f02548677f3da4d03953576ba0eb9fc75857c Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Mon, 10 Nov 2025 16:24:34 +0100 Subject: [PATCH 1/3] fix: apply fallback in polling loop to enable scaling from zero Signed-off-by: Rick Brouwer --- CHANGELOG.md | 1 + pkg/scaling/scale_handler.go | 10 ++++++++- tests/internals/fallback/fallback.go | 32 ++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37b9bafb35f..2a7ed9f8155 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### Fixes +- **General**: Apply fallback in polling loop to enable scaling from zero ([#7239](https://github.com/kedacore/keda/issues/7239)) - **IBMMQ Scaler**: Create new HTTP request for each queue query in IBMMQ scaler ([#7202](https://github.com/kedacore/keda/pull/7202)) ### Deprecations diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index e52e7493527..723234865c4 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -759,7 +759,7 @@ type scalerState struct { // for an specific scaler. The state contains if it's active or // with erros, but also the records for the cache and he metrics // for the custom formulas -func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, triggerIndex int, scalerConfig scalersconfig.ScalerConfig, +func (h *scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, triggerIndex int, scalerConfig scalersconfig.ScalerConfig, cache *cache.ScalersCache, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) scalerState { result := scalerState{ IsActive: false, @@ -795,6 +795,14 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, if latency != -1 { metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency) } + + // Apply fallback if needed + metrics, fallbackActive, err := fallback.GetMetricsWithFallback(ctx, h.client, h.scaleClient, metrics, err, metricName, scaledObject, spec) + if fallbackActive { + logger.V(1).Info("Using fallback metrics in polling loop", "scaler", result.TriggerName, "metricName", metricName) + isMetricActive = true + } + result.Metrics = append(result.Metrics, metrics...) logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) diff --git a/tests/internals/fallback/fallback.go b/tests/internals/fallback/fallback.go index 37a509caf19..0e89b98c0e5 100644 --- a/tests/internals/fallback/fallback.go +++ b/tests/internals/fallback/fallback.go @@ -531,6 +531,7 @@ func TestFallback(t *testing.T, s ScaleTargetType) { TestFallbackWithCurrentReplicasIfLower(t, s) TestFallbackWithCurrentReplicas(t, s) TestFallbackWithStatic(t, s) + TestFallbackFromZero(t, s) } func TestFallbackWithAverageValueMetrics(t *testing.T, s ScaleTargetType) { @@ -771,6 +772,37 @@ func TestFallbackWithStatic(t *testing.T, s ScaleTargetType) { helper.DeleteKubernetesResources(t, data.Namespace, data, templates) } +func TestFallbackFromZero(t *testing.T, s ScaleTargetType) { + kc := helper.GetKubernetesClient(t) + t.Logf("--- running TestFallbackFromZero test for %s ---", s) + data, templates := getTemplateData(s) + + // Replace the default scaledObject template + for i, tmpl := range templates { + if tmpl.Name == "scaledObjectTemplate" { + templates[i].Config = scaledObjectTemplateWithStatic + break + } + } + + helper.CreateKubernetesResources(t, kc, data.Namespace, data, templates) + + assert.True(t, scaleTargetMap[s].WaitForReplicaReadyCount(t, kc, scaleTargetName, data.Namespace, minReplicas, 180, 3), + "replica count should be %d after 9 minutes", minReplicas) + + // Stop metrics server to trigger fallback + helper.KubectlApplyWithTemplate(t, data, "fallbackMSDeploymentTemplate", fallbackMSDeploymentTemplate) + + // Should go to fallback value (3) because of static + assert.True(t, scaleTargetMap[s].WaitForReplicaReadyCount(t, kc, scaleTargetName, data.Namespace, 3, 30, 3), + "replica count should remain at 3 after fallback") + + // Ensure the replica count remains stable + scaleTargetMap[s].AssertReplicaCountNotChangeDuringTimePeriod(t, kc, scaleTargetName, data.Namespace, 3, 30) + + helper.DeleteKubernetesResources(t, data.Namespace, data, templates) +} + // scale out to max replicas first func testScaleOut(t *testing.T, kc *kubernetes.Clientset, s ScaleTargetType, data templateData) { t.Log("--- testing scale out ---") From 05a3da58b9521892db7a91280cc035153aef4c86 Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Wed, 12 Nov 2025 13:44:31 +0100 Subject: [PATCH 2/3] fix Signed-off-by: Rick Brouwer --- pkg/scaling/scale_handler.go | 224 ++++++++++++++++++++++++----------- 1 file changed, 153 insertions(+), 71 deletions(-) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 723234865c4..9ff266d0941 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -441,6 +441,57 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int /// ---------- ScaledObject related methods --------- /// /// --------------------------------------------------------------------------- /// +// processMetricsWithFallback processes metrics with fallback support and handles metric recording. +func (h *scaleHandler) processMetricsWithFallback( + ctx context.Context, + rawMetrics []external_metrics.ExternalMetricValue, + rawErr error, + metricName string, + triggerName string, + triggerIndex int, + scaledObject *kedav1alpha1.ScaledObject, + metricSpec v2.MetricSpec, + sendRawMetricsCondition bool, + logger logr.Logger, +) ([]external_metrics.ExternalMetricValue, bool, error) { + // check if we need to set a fallback + metrics, fallbackActive, err := fallback.GetMetricsWithFallback( + ctx, + h.client, + h.scaleClient, + rawMetrics, + rawErr, + metricName, + scaledObject, + metricSpec, + ) + + if err != nil { + logger.Error(err, "error getting metric for trigger", "trigger", triggerName) + } else { + // Record metrics + for _, metric := range metrics { + metricValue := metric.Value.AsApproximateFloat64() + metricscollector.RecordScalerMetric( + scaledObject.Namespace, + scaledObject.Name, + triggerName, + triggerIndex, + metric.MetricName, + true, + metricValue, + ) + } + + // Send raw metrics if conditions are met + if sendRawMetricsCondition { + go h.sendWhenSubscribed(scaledObject.Name, scaledObject.Namespace, triggerName, metrics) + } + } + + return metrics, fallbackActive, err +} + // GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricsName string) (*external_metrics.ExternalMetricValueList, error) { @@ -487,6 +538,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN triggerIndex int metricSpec v2.MetricSpec err error + fallbackActive bool } allScalers, scalerConfigs := cache.GetScalers() // the matching metrics length has to be the same as required metrics length @@ -530,7 +582,8 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN if err != nil { logger.Error(err, "error pairing triggers & metrics for compositeScaler") } - var metrics []external_metrics.ExternalMetricValue + var rawMetrics []external_metrics.ExternalMetricValue + var rawErr error // if cache is defined for this scaler/metric, let's try to hit it first metricsFoundInCache := false @@ -538,25 +591,41 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN var metricsRecord metricscache.MetricsRecord if metricsRecord, metricsFoundInCache = h.scaledObjectsMetricCache.ReadRecord(scaledObjectIdentifier, metricName); metricsFoundInCache { logger.V(1).Info("Reading metrics from cache", "scaler", triggerName, "metricName", metricName, "metricsRecord", metricsRecord) - metrics = metricsRecord.Metric - err = metricsRecord.ScalerError + rawMetrics = metricsRecord.Metric + rawErr = metricsRecord.ScalerError } } if !metricsFoundInCache { var latency time.Duration - metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) + rawMetrics, _, latency, rawErr = cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) if latency != -1 { metricscollector.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, triggerName, triggerIndex, metricName, true, latency) } - logger.V(1).Info("Getting metrics from trigger", "trigger", triggerName, "metricName", metricName, "metrics", metrics, "scalerError", err) + logger.V(1).Info("Getting metrics from trigger", "trigger", triggerName, "metricName", metricName, "metrics", rawMetrics, "scalerError", rawErr) } + + // Use the helper function to process metrics with fallback + metrics, fallbackActive, err := h.processMetricsWithFallback( + ctx, + rawMetrics, + rawErr, + metricName, + triggerName, + triggerIndex, + scaledObject, + spec, + shouldSendRawMetrics(RawMetricsHPA), + logger, + ) + result.metricName = metricName result.triggerName = triggerName result.triggerIndex = triggerIndex result.metricSpec = spec result.metrics = metrics result.err = err + result.fallbackActive = fallbackActive results <- result wg.Done() }(matchingMetricsChan, &wg, metricName, triggerIndex, scalerConfigs[triggerIndex], spec) @@ -570,28 +639,19 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN for key, value := range result.metricTriggerPair { metricTriggerPairList[key] = value } - // check if we need to set a fallback - metrics, fallbackActive, err := fallback.GetMetricsWithFallback(ctx, h.client, h.scaleClient, result.metrics, result.err, result.metricName, scaledObject, result.metricSpec) - if err != nil { - isScalerError = true - logger.Error(err, "error getting metric for trigger", "trigger", result.triggerName) - } else { - for _, metric := range metrics { - metricValue := metric.Value.AsApproximateFloat64() - metricscollector.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, metric.MetricName, true, metricValue) - } - // this is for raw metrics subscription for HPA requests - if shouldSendRawMetrics(RawMetricsHPA) { - // send the raw metric to all subscribed clients in a non-blocking fashion - go h.sendWhenSubscribed(scaledObjectName, scaledObjectNamespace, result.triggerName, metrics) - } - } - if fallbackActive { + + // The fallback is already handled by processMetricsWithFallback + if result.fallbackActive { isFallbackActive = true - fallbackMetrics = append(fallbackMetrics, metrics...) + fallbackMetrics = append(fallbackMetrics, result.metrics...) } - metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, true, err) - matchingMetrics = append(matchingMetrics, metrics...) + + if result.err != nil { + isScalerError = true + } + + metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, true, result.err) + matchingMetrics = append(matchingMetrics, result.metrics...) } // invalidate the cache for the ScaledObject, if we hit an error in any scaler // in this case we try to build all scalers (and resolve all secrets/creds) again in the next call @@ -619,6 +679,22 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN }, nil } +// scalerState is used as return +// for the function getScalerState. It contains +// the state of the scaler and all the required +// info for calculating the ScaledObjectState +type scalerState struct { + // IsActive will be overrided by formula calculation + IsActive bool + TriggerName string + Metrics []external_metrics.ExternalMetricValue + Pairs map[string]string + Records map[string]metricscache.MetricsRecord + Err error + FallbackActive bool + FallbackMetrics []external_metrics.ExternalMetricValue +} + // getScaledObjectState returns whether the input ScaledObject: // is active as the first return value, // the second return value indicates whether there was any error during querying scalers, @@ -632,7 +708,9 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k metricsRecord := map[string]metricscache.MetricsRecord{} metricTriggerPairList := make(map[string]string) var matchingMetrics []external_metrics.ExternalMetricValue + var fallbackMetrics []external_metrics.ExternalMetricValue var activeTriggers []string + isFallbackActive := false cache, err := h.GetScalersCache(ctx, scaledObject) metricscollector.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, err) @@ -673,6 +751,13 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k if result.Err != nil { isScaledObjectError = true } + + // Handle fallback + if result.FallbackActive { + isFallbackActive = true + fallbackMetrics = append(fallbackMetrics, result.FallbackMetrics...) + } + matchingMetrics = append(matchingMetrics, result.Metrics...) for k, v := range result.Pairs { metricTriggerPairList[k] = v @@ -682,12 +767,6 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k } metricscollector.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, result.Err) - - // this is for raw metrics subscription for polling interval - if shouldSendRawMetrics(RawMetricsPollingInterval) { - // send the raw metric to all subscribed clients in a non-blocking fashion - go h.sendWhenSubscribed(scaledObject.Name, scaledObject.Namespace, result.TriggerName, result.Metrics) - } } // invalidate the cache for the ScaledObject, if we hit an error in any scaler @@ -701,7 +780,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k } // apply scaling modifiers - matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, nil, cache, logger) + matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, isFallbackActive, fallbackMetrics, cache, logger) // when we are using formula, we need to reevaluate if it's active here if scaledObject.IsUsingModifiers() { @@ -741,33 +820,21 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k return isScaledObjectActive, isScaledObjectError, metricsRecord, activeTriggers, err } -// scalerState is used as return -// for the function getScalerState. It contains -// the state of the scaler and all the required -// info for calculating the ScaledObjectState -type scalerState struct { - // IsActive will be overrided by formula calculation - IsActive bool - TriggerName string - Metrics []external_metrics.ExternalMetricValue - Pairs map[string]string - Records map[string]metricscache.MetricsRecord - Err error -} - // getScalerState returns getStateScalerResult with the state // for an specific scaler. The state contains if it's active or -// with erros, but also the records for the cache and he metrics +// with errors, but also the records for the cache and the metrics // for the custom formulas func (h *scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, triggerIndex int, scalerConfig scalersconfig.ScalerConfig, cache *cache.ScalersCache, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) scalerState { result := scalerState{ - IsActive: false, - Err: nil, - TriggerName: "", - Metrics: []external_metrics.ExternalMetricValue{}, - Pairs: map[string]string{}, - Records: map[string]metricscache.MetricsRecord{}, + IsActive: false, + Err: nil, + TriggerName: "", + Metrics: []external_metrics.ExternalMetricValue{}, + Pairs: map[string]string{}, + Records: map[string]metricscache.MetricsRecord{}, + FallbackActive: false, + FallbackMetrics: []external_metrics.ExternalMetricValue{}, } result.TriggerName = strings.Replace(fmt.Sprintf("%T", scaler), "*scalers.", "", 1) @@ -790,25 +857,42 @@ func (h *scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler metricName := spec.External.Metric.Name var latency time.Duration - metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) - metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err) + rawMetrics, isMetricActive, latency, rawErr := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) + metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, rawErr) if latency != -1 { metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency) } - // Apply fallback if needed - metrics, fallbackActive, err := fallback.GetMetricsWithFallback(ctx, h.client, h.scaleClient, metrics, err, metricName, scaledObject, spec) + logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", rawMetrics, "activity", isMetricActive, "scalerError", rawErr) + + // Use the helper function to process metrics with fallback + metrics, fallbackActive, err := h.processMetricsWithFallback( + ctx, + rawMetrics, + rawErr, + metricName, + result.TriggerName, + triggerIndex, + scaledObject, + spec, + shouldSendRawMetrics(RawMetricsPollingInterval), + logger, + ) + + // Store fallback information if fallbackActive { - logger.V(1).Info("Using fallback metrics in polling loop", "scaler", result.TriggerName, "metricName", metricName) - isMetricActive = true + result.FallbackActive = true + result.FallbackMetrics = append(result.FallbackMetrics, metrics...) } result.Metrics = append(result.Metrics, metrics...) - logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) + + // When fallback is active, the scaler should be considered active + isActiveOrFallback := fallbackActive || isMetricActive if scalerConfig.TriggerUseCachedMetrics { result.Records[metricName] = metricscache.MetricsRecord{ - IsActive: isMetricActive, + IsActive: isActiveOrFallback, Metric: metrics, ScalerError: err, } @@ -824,13 +908,9 @@ func (h *scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) } } else { - result.IsActive = isMetricActive - for _, metric := range metrics { - metricValue := metric.Value.AsApproximateFloat64() - metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metric.MetricName, true, metricValue) - } + result.IsActive = isActiveOrFallback if !scaledObject.IsUsingModifiers() { - if isMetricActive { + if result.IsActive { if spec.External != nil { logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", metricName) } @@ -838,7 +918,7 @@ func (h *scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", spec.Resource.Name) } } - metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive) + metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, result.IsActive) } } @@ -846,13 +926,15 @@ func (h *scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler if err != nil { logger.Error(err, "error pairing triggers & metrics for compositeScaler") } + + metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err) } return result } -// / --------------------------------------------------------------------------- /// -// / ---------- ScaledJob related methods --------- /// -// / --------------------------------------------------------------------------- /// +/// --------------------------------------------------------------------------- /// +/// ---------- ScaledJob related methods --------- /// +/// --------------------------------------------------------------------------- /// // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. From 920dd6d9541c87b997236a344995e24e1c41c1be Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Wed, 19 Nov 2025 10:37:30 +0100 Subject: [PATCH 3/3] retrigger jobs Signed-off-by: Rick Brouwer --- tests/internals/fallback/fallback.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/internals/fallback/fallback.go b/tests/internals/fallback/fallback.go index 0e89b98c0e5..eb944fba528 100644 --- a/tests/internals/fallback/fallback.go +++ b/tests/internals/fallback/fallback.go @@ -777,7 +777,7 @@ func TestFallbackFromZero(t *testing.T, s ScaleTargetType) { t.Logf("--- running TestFallbackFromZero test for %s ---", s) data, templates := getTemplateData(s) - // Replace the default scaledObject template + // Replace the default ScaledObject template for i, tmpl := range templates { if tmpl.Name == "scaledObjectTemplate" { templates[i].Config = scaledObjectTemplateWithStatic