-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: honeycomb scaler #6896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: honeycomb scaler #6896
Changes from all commits
44e2604
f851da8
4ebe3cc
81a8005
86c53b0
a86e243
6ee4929
928ceea
1c79b2d
92cc9fe
263f2a2
a9811f8
d13f467
3fa2ca1
8243f56
79d26d4
938274a
123fccd
2d6edac
190c396
6f41074
8acf30e
4e42692
b5c8bc8
a8945b1
64a1e66
100678f
30b43dc
c5bbf81
016d171
9032827
7d88eca
01dcc86
caaacd0
c8058bf
4275ae7
2aaf0b0
5de0b3a
f8d02ed
dc6d97f
cefafe4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| module github.com/kedacore/keda/v2 | ||
|
|
||
| go 1.23.8 | ||
| go 1.23 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this required? |
||
|
|
||
| require ( | ||
| cloud.google.com/go/compute/metadata v0.6.0 | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,300 @@ | ||||||||||||||||
| package scalers | ||||||||||||||||
|
|
||||||||||||||||
| import ( | ||||||||||||||||
| "bytes" | ||||||||||||||||
| "context" | ||||||||||||||||
| "encoding/json" | ||||||||||||||||
| "errors" | ||||||||||||||||
| "fmt" | ||||||||||||||||
| "io" | ||||||||||||||||
| "net/http" | ||||||||||||||||
| "time" | ||||||||||||||||
|
|
||||||||||||||||
| "github.com/go-logr/logr" | ||||||||||||||||
| v2 "k8s.io/api/autoscaling/v2" | ||||||||||||||||
| "k8s.io/metrics/pkg/apis/external_metrics" | ||||||||||||||||
|
|
||||||||||||||||
| "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" | ||||||||||||||||
| kedautil "github.com/kedacore/keda/v2/pkg/util" | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| const ( | ||||||||||||||||
| honeycombScalerName = "honeycomb" | ||||||||||||||||
| honeycombBaseURL = "https://api.honeycomb.io/1" | ||||||||||||||||
| maxPollAttempts = 5 | ||||||||||||||||
| initialPollDelay = 2 * time.Second | ||||||||||||||||
| honeycombQueryResultsLimit = 10000 | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| type honeycombScaler struct { | ||||||||||||||||
| metricType v2.MetricTargetType | ||||||||||||||||
| metadata honeycombMetadata | ||||||||||||||||
| httpClient *http.Client | ||||||||||||||||
| logger logr.Logger | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| type honeycombMetadata struct { | ||||||||||||||||
| APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata"` | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't support reading secrets from metadata
Suggested change
|
||||||||||||||||
| Dataset string `keda:"name=dataset, order=triggerMetadata"` | ||||||||||||||||
| Query map[string]interface{} `keda:"name=query, order=triggerMetadata, optional"` | ||||||||||||||||
| QueryRaw string `keda:"name=queryRaw, order=triggerMetadata, optional"` | ||||||||||||||||
| ResultField string `keda:"name=resultField, order=triggerMetadata, optional"` | ||||||||||||||||
| ActivationThreshold float64 `keda:"name=activationThreshold, order=triggerMetadata, default=0"` | ||||||||||||||||
| Threshold float64 `keda:"name=threshold, order=triggerMetadata"` | ||||||||||||||||
| Breakdowns []string `keda:"name=breakdowns, order=triggerMetadata, optional"` | ||||||||||||||||
| Calculation string `keda:"name=calculation, order=triggerMetadata, default=COUNT"` | ||||||||||||||||
| Limit int `keda:"name=limit, order=triggerMetadata, default=1"` | ||||||||||||||||
| TimeRange int `keda:"name=timeRange, order=triggerMetadata, default=60"` | ||||||||||||||||
| TriggerIndex int | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| func NewHoneycombScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { | ||||||||||||||||
| metricType, err := GetMetricTargetType(config) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return nil, fmt.Errorf("error getting scaler metric type: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| logger := InitializeLogger(config, fmt.Sprintf("%s_scaler", honeycombScalerName)) | ||||||||||||||||
|
|
||||||||||||||||
| meta, err := parseHoneycombMetadata(config) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return nil, fmt.Errorf("error parsing honeycomb metadata: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| logger.Info("Initializing Honeycomb Scaler", "dataset", meta.Dataset) | ||||||||||||||||
|
|
||||||||||||||||
| return &honeycombScaler{ | ||||||||||||||||
| metricType: metricType, | ||||||||||||||||
| metadata: meta, | ||||||||||||||||
| // Query Results cannot take longer than 10 seconds to run, see https://api-docs.honeycomb.io/api for details | ||||||||||||||||
| httpClient: &http.Client{Timeout: 15 * time.Second}, | ||||||||||||||||
kmoonwright marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
| logger: logger, | ||||||||||||||||
| }, nil | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| func parseHoneycombMetadata(config *scalersconfig.ScalerConfig) (honeycombMetadata, error) { | ||||||||||||||||
| meta := honeycombMetadata{} | ||||||||||||||||
| err := config.TypedConfig(&meta) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return meta, fmt.Errorf("error parsing honeycomb metadata: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| meta.TriggerIndex = config.TriggerIndex | ||||||||||||||||
|
|
||||||||||||||||
| // Use queryRaw if provided, else build query from legacy fields | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How many are options to set the config? Checking the code, user can set the config:
|
||||||||||||||||
| if raw, ok := config.TriggerMetadata["queryRaw"]; ok && raw != "" { | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we just use the parsed value?
Suggested change
|
||||||||||||||||
| var q map[string]interface{} | ||||||||||||||||
| if err := json.Unmarshal([]byte(raw), &q); err != nil { | ||||||||||||||||
| return meta, fmt.Errorf("error parsing queryRaw: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| meta.Query = q | ||||||||||||||||
| } else if meta.Query == nil { | ||||||||||||||||
| q := make(map[string]interface{}) | ||||||||||||||||
| if len(meta.Breakdowns) > 0 { | ||||||||||||||||
| q["breakdowns"] = meta.Breakdowns | ||||||||||||||||
| } | ||||||||||||||||
| if meta.Calculation != "" { | ||||||||||||||||
| q["calculations"] = []map[string]string{{"op": meta.Calculation}} | ||||||||||||||||
| } | ||||||||||||||||
| if meta.Limit > 0 { | ||||||||||||||||
| q["limit"] = meta.Limit | ||||||||||||||||
| } | ||||||||||||||||
| if meta.TimeRange > 0 { | ||||||||||||||||
| q["time_range"] = meta.TimeRange | ||||||||||||||||
| } | ||||||||||||||||
| meta.Query = q | ||||||||||||||||
| } | ||||||||||||||||
| if meta.Query == nil { | ||||||||||||||||
| return meta, errors.New("no valid query provided in 'queryRaw', 'query', or legacy fields") | ||||||||||||||||
| } | ||||||||||||||||
| return meta, nil | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| func (s *honeycombScaler) Close(context.Context) error { return nil } | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should close idle connections on closing
Suggested change
|
||||||||||||||||
|
|
||||||||||||||||
| // ----- Core Query Logic ----- | ||||||||||||||||
|
|
||||||||||||||||
| func (s *honeycombScaler) executeHoneycombQuery(ctx context.Context) (float64, error) { | ||||||||||||||||
| // 1. Create Query | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we create a query on each request? I mean, we will create multiple queries per minute, is this correct from honeycomb pov or users should create the query by their own somewhere and provide the query ID as parameter? |
||||||||||||||||
| createURL := fmt.Sprintf("%s/queries/%s", honeycombBaseURL, s.metadata.Dataset) | ||||||||||||||||
| bodyBytes, err := json.Marshal(s.metadata.Query) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("error marshaling Honeycomb create query body: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| req, err := http.NewRequestWithContext(ctx, "POST", createURL, bytes.NewBuffer(bodyBytes)) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("error creating Honeycomb create query request: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| req.Header.Set("Content-Type", "application/json") | ||||||||||||||||
| req.Header.Set("X-Honeycomb-Team", s.metadata.APIKey) | ||||||||||||||||
| resp, err := s.httpClient.Do(req) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("honeycomb create query error: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| defer resp.Body.Close() | ||||||||||||||||
| if resp.StatusCode != 200 && resp.StatusCode != 201 { | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this API returns 200 and 201? I'd expect just only one of them |
||||||||||||||||
| body, _ := io.ReadAll(resp.Body) | ||||||||||||||||
| return 0, fmt.Errorf("honeycomb createQuery status: %s - %s", resp.Status, string(body)) | ||||||||||||||||
| } | ||||||||||||||||
| var createRes struct { | ||||||||||||||||
| ID string `json:"id"` | ||||||||||||||||
| } | ||||||||||||||||
| if err := json.NewDecoder(resp.Body).Decode(&createRes); err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("decode createQuery: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| if createRes.ID == "" { | ||||||||||||||||
| return 0, errors.New("createQuery: missing query id") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // 2. Run Query | ||||||||||||||||
| runURL := fmt.Sprintf("%s/query_results/%s", honeycombBaseURL, s.metadata.Dataset) | ||||||||||||||||
| runBody, err := json.Marshal(map[string]interface{}{ | ||||||||||||||||
| "query_id": createRes.ID, | ||||||||||||||||
| "disable_series": false, | ||||||||||||||||
| "disable_total_by_aggregate": true, | ||||||||||||||||
| "disable_other_by_aggregate": true, | ||||||||||||||||
| // Query results limit is 10000, see https://api-docs.honeycomb.io/api for details | ||||||||||||||||
| "limit": honeycombQueryResultsLimit, | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to expose this to users? I mean, the limit must be enforced, but maybe a user prefers to query X items instead of the limit. Maybe we can expose this to users and check that value doesn't pass the limit. |
||||||||||||||||
| }) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("error marshaling Honeycomb run query body: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| runReq, err := http.NewRequestWithContext(ctx, "POST", runURL, bytes.NewBuffer(runBody)) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("error creating Honeycomb run query request: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| runReq.Header.Set("Content-Type", "application/json") | ||||||||||||||||
| runReq.Header.Set("X-Honeycomb-Team", s.metadata.APIKey) | ||||||||||||||||
| runResp, err := s.httpClient.Do(runReq) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("honeycomb run query error: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| defer runResp.Body.Close() | ||||||||||||||||
| if runResp.StatusCode == 429 { | ||||||||||||||||
| return 0, errors.New("honeycomb: rate limited (429), back off and try again later") | ||||||||||||||||
| } | ||||||||||||||||
| if runResp.StatusCode == 401 { | ||||||||||||||||
| body, _ := io.ReadAll(runResp.Body) | ||||||||||||||||
| return 0, fmt.Errorf("honeycomb: unauthorized (401) - an Enterprise API key is required, check your API key permissions. See: https://api-docs.honeycomb.io/api/query-data for details. Response: %s", string(body)) | ||||||||||||||||
| } | ||||||||||||||||
| if runResp.StatusCode != 200 && runResp.StatusCode != 201 { | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above |
||||||||||||||||
| body, _ := io.ReadAll(runResp.Body) | ||||||||||||||||
| return 0, fmt.Errorf("honeycomb runQuery status: %s - %s", runResp.Status, string(body)) | ||||||||||||||||
| } | ||||||||||||||||
| var runRes struct { | ||||||||||||||||
| ID string `json:"id"` | ||||||||||||||||
| Complete bool `json:"complete"` | ||||||||||||||||
| Data struct { | ||||||||||||||||
| Results []map[string]interface{} `json:"results"` | ||||||||||||||||
| } `json:"data"` | ||||||||||||||||
| } | ||||||||||||||||
| if err := json.NewDecoder(runResp.Body).Decode(&runRes); err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("decode runQuery: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| if runRes.ID == "" { | ||||||||||||||||
| return 0, errors.New("runQuery: missing queryResult id") | ||||||||||||||||
| } | ||||||||||||||||
| if runRes.Complete && len(runRes.Data.Results) > 0 { | ||||||||||||||||
| return extractResultField(runRes.Data.Results, s.metadata.ResultField) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // 3. Poll for completion (exponential backoff) | ||||||||||||||||
| pollURL := fmt.Sprintf("%s/query_results/%s/%s", honeycombBaseURL, s.metadata.Dataset, runRes.ID) | ||||||||||||||||
| // Polling query results are rate limited, see https://api-docs.honeycomb.io/api for details | ||||||||||||||||
| pollDelay := initialPollDelay | ||||||||||||||||
| for attempt := 0; attempt < maxPollAttempts; attempt++ { | ||||||||||||||||
kmoonwright marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
| time.Sleep(pollDelay) | ||||||||||||||||
| pollDelay *= 2 | ||||||||||||||||
| statusReq, err := http.NewRequestWithContext(ctx, "GET", pollURL, nil) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("error creating Honeycomb poll query request: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| statusReq.Header.Set("X-Honeycomb-Team", s.metadata.APIKey) | ||||||||||||||||
| statusResp, err := s.httpClient.Do(statusReq) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return 0, fmt.Errorf("honeycomb poll query error: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| if statusResp.StatusCode == 429 { | ||||||||||||||||
| statusResp.Body.Close() | ||||||||||||||||
| return 0, errors.New("honeycomb: rate limited (429) on poll, back off and try again later") | ||||||||||||||||
| } | ||||||||||||||||
| if statusResp.StatusCode != 200 && statusResp.StatusCode != 201 { | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||||||||||||||||
| body, _ := io.ReadAll(statusResp.Body) | ||||||||||||||||
| statusResp.Body.Close() | ||||||||||||||||
| return 0, fmt.Errorf("honeycomb pollQuery status: %s - %s", statusResp.Status, string(body)) | ||||||||||||||||
| } | ||||||||||||||||
| var pollRes struct { | ||||||||||||||||
| Complete bool `json:"complete"` | ||||||||||||||||
| Data struct { | ||||||||||||||||
| Results []map[string]interface{} `json:"results"` | ||||||||||||||||
| } `json:"data"` | ||||||||||||||||
| } | ||||||||||||||||
| if err := json.NewDecoder(statusResp.Body).Decode(&pollRes); err != nil { | ||||||||||||||||
| statusResp.Body.Close() | ||||||||||||||||
| return 0, fmt.Errorf("pollQuery decode error: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| statusResp.Body.Close() | ||||||||||||||||
| if pollRes.Complete && len(pollRes.Data.Results) > 0 { | ||||||||||||||||
| return extractResultField(pollRes.Data.Results, s.metadata.ResultField) | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| return 0, errors.New("honeycomb: timed out waiting for query result") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| func extractResultField(results []map[string]interface{}, field string) (float64, error) { | ||||||||||||||||
| if len(results) == 0 { | ||||||||||||||||
| return 0, errors.New("no results from Honeycomb") | ||||||||||||||||
| } | ||||||||||||||||
| dataObj, ok := results[0]["data"].(map[string]interface{}) | ||||||||||||||||
| if !ok { | ||||||||||||||||
| return 0, errors.New("missing 'data' field in Honeycomb result") | ||||||||||||||||
| } | ||||||||||||||||
| if field == "" { | ||||||||||||||||
| for _, v := range dataObj { | ||||||||||||||||
| switch val := v.(type) { | ||||||||||||||||
| case float64: | ||||||||||||||||
| return val, nil | ||||||||||||||||
| case int: | ||||||||||||||||
| return float64(val), nil | ||||||||||||||||
| case int64: | ||||||||||||||||
| return float64(val), nil | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| return 0, errors.New("no numeric value found in Honeycomb result data") | ||||||||||||||||
| } | ||||||||||||||||
| v, ok := dataObj[field] | ||||||||||||||||
| if !ok { | ||||||||||||||||
| return 0, fmt.Errorf("field '%s' not found in Honeycomb result data", field) | ||||||||||||||||
| } | ||||||||||||||||
| switch val := v.(type) { | ||||||||||||||||
| case float64: | ||||||||||||||||
| return val, nil | ||||||||||||||||
| case int: | ||||||||||||||||
| return float64(val), nil | ||||||||||||||||
| case int64: | ||||||||||||||||
| return float64(val), nil | ||||||||||||||||
| } | ||||||||||||||||
| return 0, fmt.Errorf("no numeric value found for field '%s'", field) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // ----- KEDA Scaler interface ----- | ||||||||||||||||
| func (s *honeycombScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | ||||||||||||||||
| val, err := s.executeHoneycombQuery(ctx) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| s.logger.Error(err, "error executing Honeycomb query") | ||||||||||||||||
| return []external_metrics.ExternalMetricValue{}, false, err | ||||||||||||||||
| } | ||||||||||||||||
| metric := GenerateMetricInMili(metricName, val) | ||||||||||||||||
| return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| func (s *honeycombScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec { | ||||||||||||||||
| metricName := kedautil.NormalizeString(honeycombScalerName) | ||||||||||||||||
| externalMetric := &v2.ExternalMetricSource{ | ||||||||||||||||
| Metric: v2.MetricIdentifier{ | ||||||||||||||||
| Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName), | ||||||||||||||||
| }, | ||||||||||||||||
| Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold), | ||||||||||||||||
| } | ||||||||||||||||
| metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} | ||||||||||||||||
| return []v2.MetricSpec{metricSpec} | ||||||||||||||||
| } | ||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to
Newsection