Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- **General**: Apply fallback in polling loop to enable scaling from zero ([#7239](https://github.com/kedacore/keda/issues/7239))
- **General**: Fix nil reference panic when transfer-hpa-ownership is set but no hpa name is provided ([#7254](https://github.com/kedacore/keda/issues/7254))
- **General**: Fix race condition in paused-replicas annotation causing ScaledObject to get stuck ([#7231](https://github.com/kedacore/keda/issues/7231))
- **General**: Use TriggerError when all ScaledJob triggers fail ([#7205](https://github.com/kedacore/keda/pull/7205))
Expand Down
226 changes: 158 additions & 68 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,57 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int
/// ---------- ScaledObject related methods --------- ///
/// --------------------------------------------------------------------------- ///

// processMetricsWithFallback processes metrics with fallback support and handles metric recording.
func (h *scaleHandler) processMetricsWithFallback(
ctx context.Context,
rawMetrics []external_metrics.ExternalMetricValue,
rawErr error,
metricName string,
triggerName string,
triggerIndex int,
scaledObject *kedav1alpha1.ScaledObject,
metricSpec v2.MetricSpec,
sendRawMetricsCondition bool,
logger logr.Logger,
) ([]external_metrics.ExternalMetricValue, bool, error) {
// check if we need to set a fallback
metrics, fallbackActive, err := fallback.GetMetricsWithFallback(
ctx,
h.client,
h.scaleClient,
rawMetrics,
rawErr,
metricName,
scaledObject,
metricSpec,
)

if err != nil {
logger.Error(err, "error getting metric for trigger", "trigger", triggerName)
} else {
// Record metrics
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
metricscollector.RecordScalerMetric(
scaledObject.Namespace,
scaledObject.Name,
triggerName,
triggerIndex,
metric.MetricName,
true,
metricValue,
)
}

// Send raw metrics if conditions are met
if sendRawMetricsCondition {
go h.sendWhenSubscribed(scaledObject.Name, scaledObject.Namespace, triggerName, metrics)
}
}

return metrics, fallbackActive, err
}

// GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricsName string) (*external_metrics.ExternalMetricValueList, error) {
Expand Down Expand Up @@ -487,6 +538,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
triggerIndex int
metricSpec v2.MetricSpec
err error
fallbackActive bool
}
allScalers, scalerConfigs := cache.GetScalers()
// the matching metrics length has to be the same as required metrics length
Expand Down Expand Up @@ -530,33 +582,50 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
if err != nil {
logger.Error(err, "error pairing triggers & metrics for compositeScaler")
}
var metrics []external_metrics.ExternalMetricValue
var rawMetrics []external_metrics.ExternalMetricValue
var rawErr error

// if cache is defined for this scaler/metric, let's try to hit it first
metricsFoundInCache := false
if scalerConfig.TriggerUseCachedMetrics {
var metricsRecord metricscache.MetricsRecord
if metricsRecord, metricsFoundInCache = h.scaledObjectsMetricCache.ReadRecord(scaledObjectIdentifier, metricName); metricsFoundInCache {
logger.V(1).Info("Reading metrics from cache", "scaler", triggerName, "metricName", metricName, "metricsRecord", metricsRecord)
metrics = metricsRecord.Metric
err = metricsRecord.ScalerError
rawMetrics = metricsRecord.Metric
rawErr = metricsRecord.ScalerError
}
}

if !metricsFoundInCache {
var latency time.Duration
metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName)
rawMetrics, _, latency, rawErr = cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName)
if latency != -1 {
metricscollector.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, triggerName, triggerIndex, metricName, true, latency)
}
logger.V(1).Info("Getting metrics from trigger", "trigger", triggerName, "metricName", metricName, "metrics", metrics, "scalerError", err)
logger.V(1).Info("Getting metrics from trigger", "trigger", triggerName, "metricName", metricName, "metrics", rawMetrics, "scalerError", rawErr)
}

// Use the helper function to process metrics with fallback
metrics, fallbackActive, err := h.processMetricsWithFallback(
ctx,
rawMetrics,
rawErr,
metricName,
triggerName,
triggerIndex,
scaledObject,
spec,
shouldSendRawMetrics(RawMetricsHPA),
logger,
)

result.metricName = metricName
result.triggerName = triggerName
result.triggerIndex = triggerIndex
result.metricSpec = spec
result.metrics = metrics
result.err = err
result.fallbackActive = fallbackActive
results <- result
wg.Done()
}(matchingMetricsChan, &wg, metricName, triggerIndex, scalerConfigs[triggerIndex], spec)
Expand All @@ -570,28 +639,19 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
for key, value := range result.metricTriggerPair {
metricTriggerPairList[key] = value
}
// check if we need to set a fallback
metrics, fallbackActive, err := fallback.GetMetricsWithFallback(ctx, h.client, h.scaleClient, result.metrics, result.err, result.metricName, scaledObject, result.metricSpec)
if err != nil {
isScalerError = true
logger.Error(err, "error getting metric for trigger", "trigger", result.triggerName)
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
metricscollector.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, metric.MetricName, true, metricValue)
}
// this is for raw metrics subscription for HPA requests
if shouldSendRawMetrics(RawMetricsHPA) {
// send the raw metric to all subscribed clients in a non-blocking fashion
go h.sendWhenSubscribed(scaledObjectName, scaledObjectNamespace, result.triggerName, metrics)
}
}
if fallbackActive {

// The fallback is already handled by processMetricsWithFallback
if result.fallbackActive {
isFallbackActive = true
fallbackMetrics = append(fallbackMetrics, metrics...)
fallbackMetrics = append(fallbackMetrics, result.metrics...)
}

if result.err != nil {
isScalerError = true
}
metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, true, err)
matchingMetrics = append(matchingMetrics, metrics...)

metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, true, result.err)
matchingMetrics = append(matchingMetrics, result.metrics...)
}
// invalidate the cache for the ScaledObject, if we hit an error in any scaler
// in this case we try to build all scalers (and resolve all secrets/creds) again in the next call
Expand Down Expand Up @@ -619,6 +679,22 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
}, nil
}

// scalerState is used as return
// for the function getScalerState. It contains
// the state of the scaler and all the required
// info for calculating the ScaledObjectState
type scalerState struct {
// IsActive will be overrided by formula calculation
IsActive bool
TriggerName string
Metrics []external_metrics.ExternalMetricValue
Pairs map[string]string
Records map[string]metricscache.MetricsRecord
Err error
FallbackActive bool
FallbackMetrics []external_metrics.ExternalMetricValue
}

// getScaledObjectState returns whether the input ScaledObject:
// is active as the first return value,
// the second return value indicates whether there was any error during querying scalers,
Expand All @@ -632,7 +708,9 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
metricsRecord := map[string]metricscache.MetricsRecord{}
metricTriggerPairList := make(map[string]string)
var matchingMetrics []external_metrics.ExternalMetricValue
var fallbackMetrics []external_metrics.ExternalMetricValue
var activeTriggers []string
isFallbackActive := false

cache, err := h.GetScalersCache(ctx, scaledObject)
metricscollector.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, err)
Expand Down Expand Up @@ -673,6 +751,13 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
if result.Err != nil {
isScaledObjectError = true
}

// Handle fallback
if result.FallbackActive {
isFallbackActive = true
fallbackMetrics = append(fallbackMetrics, result.FallbackMetrics...)
}

matchingMetrics = append(matchingMetrics, result.Metrics...)
for k, v := range result.Pairs {
metricTriggerPairList[k] = v
Expand All @@ -682,12 +767,6 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
}

metricscollector.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, result.Err)

// this is for raw metrics subscription for polling interval
if shouldSendRawMetrics(RawMetricsPollingInterval) {
// send the raw metric to all subscribed clients in a non-blocking fashion
go h.sendWhenSubscribed(scaledObject.Name, scaledObject.Namespace, result.TriggerName, result.Metrics)
}
}

// invalidate the cache for the ScaledObject, if we hit an error in any scaler
Expand All @@ -701,7 +780,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
}

// apply scaling modifiers
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, nil, cache, logger)
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, isFallbackActive, fallbackMetrics, cache, logger)

// when we are using formula, we need to reevaluate if it's active here
if scaledObject.IsUsingModifiers() {
Expand Down Expand Up @@ -741,33 +820,21 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
return isScaledObjectActive, isScaledObjectError, metricsRecord, activeTriggers, err
}

// scalerState is used as return
// for the function getScalerState. It contains
// the state of the scaler and all the required
// info for calculating the ScaledObjectState
type scalerState struct {
// IsActive will be overrided by formula calculation
IsActive bool
TriggerName string
Metrics []external_metrics.ExternalMetricValue
Pairs map[string]string
Records map[string]metricscache.MetricsRecord
Err error
}

// getScalerState returns getStateScalerResult with the state
// for an specific scaler. The state contains if it's active or
// with erros, but also the records for the cache and he metrics
// with errors, but also the records for the cache and the metrics
// for the custom formulas
func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, triggerIndex int, scalerConfig scalersconfig.ScalerConfig,
func (h *scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, triggerIndex int, scalerConfig scalersconfig.ScalerConfig,
cache *cache.ScalersCache, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) scalerState {
result := scalerState{
IsActive: false,
Err: nil,
TriggerName: "",
Metrics: []external_metrics.ExternalMetricValue{},
Pairs: map[string]string{},
Records: map[string]metricscache.MetricsRecord{},
IsActive: false,
Err: nil,
TriggerName: "",
Metrics: []external_metrics.ExternalMetricValue{},
Pairs: map[string]string{},
Records: map[string]metricscache.MetricsRecord{},
FallbackActive: false,
FallbackMetrics: []external_metrics.ExternalMetricValue{},
}

result.TriggerName = strings.Replace(fmt.Sprintf("%T", scaler), "*scalers.", "", 1)
Expand All @@ -790,17 +857,42 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler,
metricName := spec.External.Metric.Name

var latency time.Duration
metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName)
metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err)
rawMetrics, isMetricActive, latency, rawErr := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName)
metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, rawErr)
if latency != -1 {
metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency)
}

logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", rawMetrics, "activity", isMetricActive, "scalerError", rawErr)

// Use the helper function to process metrics with fallback
metrics, fallbackActive, err := h.processMetricsWithFallback(
ctx,
rawMetrics,
rawErr,
metricName,
result.TriggerName,
triggerIndex,
scaledObject,
spec,
shouldSendRawMetrics(RawMetricsPollingInterval),
logger,
)

// Store fallback information
if fallbackActive {
result.FallbackActive = true
result.FallbackMetrics = append(result.FallbackMetrics, metrics...)
}

result.Metrics = append(result.Metrics, metrics...)
logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err)

// When fallback is active, the scaler should be considered active
isActiveOrFallback := fallbackActive || isMetricActive

if scalerConfig.TriggerUseCachedMetrics {
result.Records[metricName] = metricscache.MetricsRecord{
IsActive: isMetricActive,
IsActive: isActiveOrFallback,
Metric: metrics,
ScalerError: err,
}
Expand All @@ -816,35 +908,33 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler,
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}
} else {
result.IsActive = isMetricActive
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metric.MetricName, true, metricValue)
}
result.IsActive = isActiveOrFallback
if !scaledObject.IsUsingModifiers() {
if isMetricActive {
if result.IsActive {
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", metricName)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", spec.Resource.Name)
}
}
metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive)
metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, result.IsActive)
}
}

result.Pairs, err = modifiers.GetPairTriggerAndMetric(scaledObject, metricName, scalerConfig.TriggerName)
if err != nil {
logger.Error(err, "error pairing triggers & metrics for compositeScaler")
}

metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err)
}
return result
}

// / --------------------------------------------------------------------------- ///
// / ---------- ScaledJob related methods --------- ///
// / --------------------------------------------------------------------------- ///
/// --------------------------------------------------------------------------- ///
/// ---------- ScaledJob related methods --------- ///
/// --------------------------------------------------------------------------- ///

// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
Expand Down
Loading
Loading