diff --git a/.chloggen/feat_42663.yaml b/.chloggen/feat_42663.yaml new file mode 100644 index 0000000000000..4153cdcd277d4 --- /dev/null +++ b/.chloggen/feat_42663.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "processor/resourcedetection" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for dynamic refresh resource attributes with refresh_interval parameter" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42663] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/resourcedetectionprocessor/README.md b/processor/resourcedetectionprocessor/README.md index b30b9f99d889a..abe9439650b17 100644 --- a/processor/resourcedetectionprocessor/README.md +++ b/processor/resourcedetectionprocessor/README.md @@ -831,6 +831,8 @@ processors: detectors: [ ] # determines if existing resource attributes should be overridden or preserved, defaults to true override: +# how often resource detection should be refreshed; if unset, detection runs only once at startup +refresh_interval: # [DEPRECATED] When included, only attributes in the list will be appended. Applies to all detectors. attributes: [ ] ``` @@ -880,6 +882,18 @@ resourcedetection: enabled: false ``` +### Using the `refresh_interval` parameter + +The `refresh_interval` option allows resource attributes to be periodically refreshed without restarting the Collector. + +**Important considerations:** + +- **Latency**: Newly detected resource attributes will be applied after the next refresh cycle completes (up to `refresh_interval` duration). +- **Metric cardinality**: Changes to resource attributes create new metric time series, which can significantly increase storage costs and query complexity. +- **Performance impact**: Each refresh re-runs all configured detectors. Values below 5 minutes can increase CPU and memory usage. There is no enforced minimum, but intervals below 1 minute are strongly discouraged. + +**Recommendation**: In most environments, a single resource detection at startup is sufficient. Periodic refresh should be used only when resource attributes are expected to change during the Collector's lifetime (e.g., Kubernetes pod labels, cloud instance tags). + ## Ordering Note that if multiple detectors are inserting the same attribute name, the first detector to insert wins. For example if you had `detectors: [eks, ec2]` then `cloud.platform` will be `aws_eks` instead of `ec2`. The below ordering is recommended. diff --git a/processor/resourcedetectionprocessor/config.go b/processor/resourcedetectionprocessor/config.go index b24712d1b7371..e13a9297d6118 100644 --- a/processor/resourcedetectionprocessor/config.go +++ b/processor/resourcedetectionprocessor/config.go @@ -4,6 +4,8 @@ package resourcedetectionprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor" import ( + "time" + "go.opentelemetry.io/collector/config/confighttp" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" @@ -49,6 +51,9 @@ type Config struct { // If a supplied attribute is not a valid attribute of a supplied detector it will be ignored. // Deprecated: Please use detector's resource_attributes config instead Attributes []string `mapstructure:"attributes"` + // If > 0, periodically re-run detection for all configured detectors. + // When 0 (default), no periodic refresh occurs. + RefreshInterval time.Duration `mapstructure:"refresh_interval"` } // DetectorConfig contains user-specified configurations unique to all individual detectors diff --git a/processor/resourcedetectionprocessor/config_test.go b/processor/resourcedetectionprocessor/config_test.go index 1d3fd3912ffd3..0f8a7797e3e1e 100644 --- a/processor/resourcedetectionprocessor/config_test.go +++ b/processor/resourcedetectionprocessor/config_test.go @@ -133,6 +133,16 @@ func TestLoadConfig(t *testing.T) { DetectorConfig: resourceAttributesConfig, }, }, + { + id: component.NewIDWithName(metadata.Type, "refresh"), + expected: &Config{ + Detectors: []string{"system"}, + ClientConfig: cfg, + Override: false, + DetectorConfig: detectorCreateDefaultConfig(), + RefreshInterval: 5 * time.Second, + }, + }, { id: component.NewIDWithName(metadata.Type, "invalid"), errorMessage: "hostname_sources contains invalid value: \"invalid_source\"", diff --git a/processor/resourcedetectionprocessor/factory.go b/processor/resourcedetectionprocessor/factory.go index ff3c510fd088d..9312a334e0ac2 100644 --- a/processor/resourcedetectionprocessor/factory.go +++ b/processor/resourcedetectionprocessor/factory.go @@ -109,11 +109,12 @@ func (*factory) Type() component.Type { func createDefaultConfig() component.Config { return &Config{ - Detectors: []string{env.TypeStr}, - ClientConfig: defaultClientConfig(), - Override: true, - Attributes: nil, - DetectorConfig: detectorCreateDefaultConfig(), + Detectors: []string{env.TypeStr}, + ClientConfig: defaultClientConfig(), + Override: true, + Attributes: nil, + DetectorConfig: detectorCreateDefaultConfig(), + RefreshInterval: 0, // TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved, // Set the default value of 'hostname_source' here instead of 'system' detector } @@ -143,7 +144,9 @@ func (f *factory) createTracesProcessor( nextConsumer, rdp.processTraces, processorhelper.WithCapabilities(consumerCapabilities), - processorhelper.WithStart(rdp.Start)) + processorhelper.WithStart(rdp.Start), + processorhelper.WithShutdown(rdp.Shutdown), + ) } func (f *factory) createMetricsProcessor( @@ -164,7 +167,9 @@ func (f *factory) createMetricsProcessor( nextConsumer, rdp.processMetrics, processorhelper.WithCapabilities(consumerCapabilities), - processorhelper.WithStart(rdp.Start)) + processorhelper.WithStart(rdp.Start), + processorhelper.WithShutdown(rdp.Shutdown), + ) } func (f *factory) createLogsProcessor( @@ -185,7 +190,9 @@ func (f *factory) createLogsProcessor( nextConsumer, rdp.processLogs, processorhelper.WithCapabilities(consumerCapabilities), - processorhelper.WithStart(rdp.Start)) + processorhelper.WithStart(rdp.Start), + processorhelper.WithShutdown(rdp.Shutdown), + ) } func (f *factory) createProfilesProcessor( @@ -206,7 +213,9 @@ func (f *factory) createProfilesProcessor( nextConsumer, rdp.processProfiles, xprocessorhelper.WithCapabilities(consumerCapabilities), - xprocessorhelper.WithStart(rdp.Start)) + xprocessorhelper.WithStart(rdp.Start), + xprocessorhelper.WithShutdown(rdp.Shutdown), + ) } func (f *factory) getResourceDetectionProcessor( @@ -226,6 +235,7 @@ func (f *factory) getResourceDetectionProcessor( provider: provider, override: oCfg.Override, httpClientSettings: oCfg.ClientConfig, + refreshInterval: oCfg.RefreshInterval, telemetrySettings: params.TelemetrySettings, }, nil } diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index b0bf80beb3cc4..7a032ae664aff 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -98,8 +98,13 @@ type ResourceProvider struct { timeout time.Duration detectors []Detector detectedResource *resourceResult - once sync.Once + mu sync.RWMutex attributesToKeep map[string]struct{} + + // Refresh loop control + refreshInterval time.Duration + stopCh chan struct{} + wg sync.WaitGroup } type resourceResult struct { @@ -114,73 +119,113 @@ func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesTo timeout: timeout, detectors: detectors, attributesToKeep: attributesToKeep, + refreshInterval: 0, // No periodic refresh by default } } -func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resource pcommon.Resource, schemaURL string, err error) { - p.once.Do(func() { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, client.Timeout) - defer cancel() - p.detectResource(ctx, client.Timeout) - }) +func (p *ResourceProvider) Get(_ context.Context, _ *http.Client) (pcommon.Resource, string, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + resource := pcommon.NewResource() + schemaURL := "" + var err error - return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err + if p.detectedResource != nil { + resource = p.detectedResource.resource + schemaURL = p.detectedResource.schemaURL + err = p.detectedResource.err + } + + return resource, schemaURL, err } -func (p *ResourceProvider) detectResource(ctx context.Context, timeout time.Duration) { - p.detectedResource = &resourceResult{} +// Refresh recomputes the resource, replacing any previous result. +func (p *ResourceProvider) Refresh(ctx context.Context, client *http.Client) (pcommon.Resource, string, error) { + ctx, cancel := context.WithTimeout(ctx, client.Timeout) + defer cancel() + + res, schemaURL, err := p.detectResource(ctx) + p.mu.Lock() + defer p.mu.Unlock() + prev := p.detectedResource + + // Check if we have a previous successfully snapshot + hadPrevSuccess := prev != nil && prev.err == nil && !IsEmptyResource(prev.resource) + + // Keep the last good snapshot if the refresh errored OR returned an empty resource. + if hadPrevSuccess && (err != nil || IsEmptyResource(res)) { + p.logger.Warn("resource refresh yielded empty or error; keeping previous snapshot", zap.Error(err)) + // Return nil error since we're successfully returning the cached resource + return prev.resource, prev.schemaURL, nil + } + + // Otherwise, accept the new snapshot. + p.detectedResource = &resourceResult{resource: res, schemaURL: schemaURL, err: err} + return res, schemaURL, err +} +func (p *ResourceProvider) detectResource(ctx context.Context) (pcommon.Resource, string, error) { res := pcommon.NewResource() mergedSchemaURL := "" + var joinedErr error + successes := 0 p.logger.Info("began detecting resource information") resultsChan := make([]chan resourceResult, len(p.detectors)) for i, detector := range p.detectors { - resultsChan[i] = make(chan resourceResult) - go func(detector Detector) { + ch := make(chan resourceResult, 1) + resultsChan[i] = ch + + go func(detector Detector, ch chan resourceResult) { sleep := backoff.ExponentialBackOff{ InitialInterval: 1 * time.Second, RandomizationFactor: 1.5, Multiplier: 2, - MaxInterval: timeout, } sleep.Reset() - var err error - var r pcommon.Resource - var schemaURL string + for { - r, schemaURL, err = detector.Detect(ctx) + r, schemaURL, err := detector.Detect(ctx) if err == nil { - resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: nil} + ch <- resourceResult{resource: r, schemaURL: schemaURL} return } + p.logger.Warn("failed to detect resource", zap.Error(err)) - timer := time.NewTimer(sleep.NextBackOff()) + next := sleep.NextBackOff() + if next == backoff.Stop { + ch <- resourceResult{err: err} + return + } + + timer := time.NewTimer(next) select { - case <-timer.C: - fmt.Println("Retrying fetching data...") case <-ctx.Done(): - p.logger.Warn("Context was cancelled: %w", zap.Error(ctx.Err())) - resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: err} + p.logger.Warn("context was cancelled", zap.Error(ctx.Err())) + timer.Stop() + ch <- resourceResult{err: err} return + case <-timer.C: + // retry } } - }(detector) + }(detector, ch) } for _, ch := range resultsChan { result := <-ch if result.err != nil { if allowErrorPropagationFeatureGate.IsEnabled() { - p.detectedResource.err = errors.Join(p.detectedResource.err, result.err) + joinedErr = errors.Join(joinedErr, result.err) } - } else { - mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL) - MergeResource(res, result.resource, false) + continue } + successes++ + mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL) + MergeResource(res, result.resource, false) } droppedAttributes := filterAttributes(res.Attributes(), p.attributesToKeep) @@ -190,8 +235,22 @@ func (p *ResourceProvider) detectResource(ctx context.Context, timeout time.Dura p.logger.Info("dropped resource information", zap.Strings("resource keys", droppedAttributes)) } - p.detectedResource.resource = res - p.detectedResource.schemaURL = mergedSchemaURL + // Partial success is acceptable - return merged resources from successful detectors. + // Only propagate an error if ALL detectors failed, ensuring graceful degradation. + if successes == 0 { + if allowErrorPropagationFeatureGate.IsEnabled() { + if joinedErr == nil { + joinedErr = errors.New("resource detection failed: no detectors succeeded") + } + return pcommon.NewResource(), "", joinedErr + } + + // If the feature gate is disabled, do NOT propagate the error. + // Return an empty resource and nil error so the processor can still start. + p.logger.Warn("resource detection failed but error propagation is disabled") + return pcommon.NewResource(), "", nil + } + return res, mergedSchemaURL, nil } func MergeSchemaURL(currentSchemaURL, newSchemaURL string) string { @@ -244,3 +303,41 @@ func MergeResource(to, from pcommon.Resource, overrideTo bool) { func IsEmptyResource(res pcommon.Resource) bool { return res.Attributes().Len() == 0 } + +// StartRefreshing begins periodic resource refresh if refreshInterval > 0 +func (p *ResourceProvider) StartRefreshing(refreshInterval time.Duration, client *http.Client) { + p.refreshInterval = refreshInterval + if p.refreshInterval <= 0 { + return + } + + p.stopCh = make(chan struct{}) + p.wg.Add(1) + go p.refreshLoop(client) +} + +// StopRefreshing stops the periodic refresh goroutine +func (p *ResourceProvider) StopRefreshing() { + if p.stopCh != nil { + close(p.stopCh) + p.wg.Wait() + } +} + +func (p *ResourceProvider) refreshLoop(client *http.Client) { + defer p.wg.Done() + ticker := time.NewTicker(p.refreshInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + _, _, err := p.Refresh(context.Background(), client) + if err != nil { + p.logger.Warn("resource refresh failed", zap.Error(err)) + } + case <-p.stopCh: + return + } + } +} diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go index e6057849f1612..7491746657bdc 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go @@ -30,7 +30,7 @@ type mockDetector struct { func (p *mockDetector) Detect(_ context.Context) (pcommon.Resource, string, error) { args := p.Called() - return args.Get(0).(pcommon.Resource), "", args.Error(1) + return args.Get(0).(pcommon.Resource), args.String(1), args.Error(2) } type mockDetectorConfig struct{} @@ -94,7 +94,7 @@ func TestDetect(t *testing.T) { md := &mockDetector{} res := pcommon.NewResource() require.NoError(t, res.Attributes().FromRaw(resAttrs)) - md.On("Detect").Return(res, nil) + md.On("Detect").Return(res, "", nil) mockDetectorType := DetectorType(fmt.Sprintf("mockDetector%v", i)) mockDetectors[mockDetectorType] = func(processor.Settings, DetectorConfig) (Detector, error) { @@ -107,7 +107,8 @@ func TestDetect(t *testing.T) { p, err := f.CreateResourceProvider(processortest.NewNopSettings(metadata.Type), time.Second, tt.attributes, &mockDetectorConfig{}, mockDetectorTypes...) require.NoError(t, err) - got, _, err := p.Get(t.Context(), &http.Client{Timeout: 10 * time.Second}) + // Perform initial detection + got, _, err := p.Refresh(t.Context(), &http.Client{Timeout: 10 * time.Second}) require.NoError(t, err) assert.Equal(t, tt.expectedResource, got.Attributes().AsRaw()) @@ -141,10 +142,10 @@ func TestDetectResource_Error_ContextDeadline_WithErrPropagation(t *testing.T) { }() md1 := &mockDetector{} - md1.On("Detect").Return(pcommon.NewResource(), errors.New("err1")) + md1.On("Detect").Return(pcommon.NewResource(), "", errors.New("err1")) md2 := &mockDetector{} - md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2")) + md2.On("Detect").Return(pcommon.NewResource(), "", errors.New("err2")) p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2) @@ -152,7 +153,7 @@ func TestDetectResource_Error_ContextDeadline_WithErrPropagation(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) defer cancel() - _, _, err = p.Get(ctx, &http.Client{Timeout: 10 * time.Second}) + _, _, err = p.Refresh(ctx, &http.Client{Timeout: 10 * time.Second}) require.Error(t, err) require.Contains(t, err.Error(), "err1") require.Contains(t, err.Error(), "err2") @@ -160,10 +161,10 @@ func TestDetectResource_Error_ContextDeadline_WithErrPropagation(t *testing.T) { func TestDetectResource_Error_ContextDeadline_WithoutErrPropagation(t *testing.T) { md1 := &mockDetector{} - md1.On("Detect").Return(pcommon.NewResource(), errors.New("err1")) + md1.On("Detect").Return(pcommon.NewResource(), "", errors.New("err1")) md2 := &mockDetector{} - md2.On("Detect").Return(pcommon.NewResource(), errors.New("err2")) + md2.On("Detect").Return(pcommon.NewResource(), "", errors.New("err2")) p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2) @@ -171,7 +172,7 @@ func TestDetectResource_Error_ContextDeadline_WithoutErrPropagation(t *testing.T ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) defer cancel() - _, _, err := p.Get(ctx, &http.Client{Timeout: 10 * time.Second}) + _, _, err := p.Refresh(ctx, &http.Client{Timeout: 10 * time.Second}) require.NoError(t, err) } @@ -214,35 +215,50 @@ type mockParallelDetector struct { } func newMockParallelDetector() *mockParallelDetector { - return &mockParallelDetector{ch: make(chan struct{})} + return &mockParallelDetector{ch: make(chan struct{}, 1)} } func (p *mockParallelDetector) Detect(_ context.Context) (pcommon.Resource, string, error) { <-p.ch args := p.Called() - return args.Get(0).(pcommon.Resource), "", args.Error(1) + return args.Get(0).(pcommon.Resource), args.String(1), args.Error(2) } -// TestDetectResource_Parallel validates that Detect is only called once, even if there -// are multiple calls to ResourceProvider.Get +// TestDetectResource_Parallel validates that multiple concurrent calls to Get +// return the cached result after initial Refresh func TestDetectResource_Parallel(t *testing.T) { const iterations = 5 md1 := newMockParallelDetector() res1 := pcommon.NewResource() require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"})) - md1.On("Detect").Return(res1, nil) + md1.On("Detect").Return(res1, "", nil) md2 := newMockParallelDetector() res2 := pcommon.NewResource() require.NoError(t, res2.Attributes().FromRaw(map[string]any{"a": "11", "c": "3"})) - md2.On("Detect").Return(res2, nil) + md2.On("Detect").Return(res2, "", nil) expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"} p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2) - // call p.Get multiple times + // Perform initial detection + go func() { + time.Sleep(5 * time.Millisecond) + md1.ch <- struct{}{} + md2.ch <- struct{}{} + }() + + detected, _, err := p.Refresh(t.Context(), &http.Client{Timeout: 10 * time.Second}) + require.NoError(t, err) + require.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw()) + + // Verify Detect was called once during Refresh + md1.AssertNumberOfCalls(t, "Detect", 1) + md2.AssertNumberOfCalls(t, "Detect", 1) + + // Now call Get multiple times concurrently - should return cached value wg := &sync.WaitGroup{} wg.Add(iterations) for range iterations { @@ -254,15 +270,9 @@ func TestDetectResource_Parallel(t *testing.T) { }() } - // wait until all goroutines are blocked - time.Sleep(5 * time.Millisecond) - - // detector.Detect should only be called once, so we only need to notify each channel once - md1.ch <- struct{}{} - md2.ch <- struct{}{} - - // then wait until all goroutines are finished, and ensure p.Detect was only called once wg.Wait() + + // Verify Detect still only called once (not called again by Get) md1.AssertNumberOfCalls(t, "Detect", 1) md2.AssertNumberOfCalls(t, "Detect", 1) } @@ -271,20 +281,20 @@ func TestDetectResource_Reconnect(t *testing.T) { md1 := &mockDetector{} res1 := pcommon.NewResource() require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"})) - md1.On("Detect").Return(pcommon.NewResource(), errors.New("connection error1")).Twice() - md1.On("Detect").Return(res1, nil) + md1.On("Detect").Return(pcommon.NewResource(), "", errors.New("connection error1")).Twice() + md1.On("Detect").Return(res1, "", nil) md2 := &mockDetector{} res2 := pcommon.NewResource() require.NoError(t, res2.Attributes().FromRaw(map[string]any{"c": "3"})) - md2.On("Detect").Return(pcommon.NewResource(), errors.New("connection error2")).Once() - md2.On("Detect").Return(res2, nil) + md2.On("Detect").Return(pcommon.NewResource(), "", errors.New("connection error2")).Once() + md2.On("Detect").Return(res2, "", nil) expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"} p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2) - detected, _, err := p.Get(t.Context(), &http.Client{Timeout: 15 * time.Second}) + detected, _, err := p.Refresh(t.Context(), &http.Client{Timeout: 15 * time.Second}) assert.NoError(t, err) assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw()) @@ -292,6 +302,37 @@ func TestDetectResource_Reconnect(t *testing.T) { md2.AssertNumberOfCalls(t, "Detect", 2) // 1 error + 1 success } +func TestResourceProvider_RefreshInterval(t *testing.T) { + md := &mockDetector{} + res1 := pcommon.NewResource() + require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1"})) + res2 := pcommon.NewResource() + require.NoError(t, res2.Attributes().FromRaw(map[string]any{"a": "2"})) + + // First call -> res1, second call -> res2 + md.On("Detect").Return(res1, "", nil).Once() + md.On("Detect").Return(res2, "", nil).Once() + + p := NewResourceProvider(zap.NewNop(), 1*time.Second, nil, md) + + // Initial detection + got, _, err := p.Refresh(t.Context(), &http.Client{Timeout: time.Second}) + require.NoError(t, err) + assert.Equal(t, map[string]any{"a": "1"}, got.Attributes().AsRaw()) + + // Simulate a single periodic refresh + _, _, err = p.Refresh(t.Context(), &http.Client{Timeout: time.Second}) + require.NoError(t, err) + + // The cached resource should now be updated + got, _, err = p.Get(t.Context(), &http.Client{Timeout: time.Second}) + require.NoError(t, err) + assert.Equal(t, map[string]any{"a": "2"}, got.Attributes().AsRaw()) + + // Exactly two detections total: one initial + one refresh + md.AssertNumberOfCalls(t, "Detect", 2) +} + func TestFilterAttributes_Match(t *testing.T) { m := map[string]struct{}{ "host.name": {}, diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor.go b/processor/resourcedetectionprocessor/resourcedetection_processor.go index 40d2939ad3541..2f1f910c887c4 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor.go @@ -5,10 +5,10 @@ package resourcedetectionprocessor // import "github.com/open-telemetry/opentele import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pprofile" @@ -19,10 +19,9 @@ import ( type resourceDetectionProcessor struct { provider *internal.ResourceProvider - resource pcommon.Resource - schemaURL string override bool httpClientSettings confighttp.ClientConfig + refreshInterval time.Duration telemetrySettings component.TelemetrySettings } @@ -30,55 +29,68 @@ type resourceDetectionProcessor struct { func (rdp *resourceDetectionProcessor) Start(ctx context.Context, host component.Host) error { client, _ := rdp.httpClientSettings.ToClient(ctx, host, rdp.telemetrySettings) ctx = internal.ContextWithClient(ctx, client) - var err error - rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client) - return err + + // Perform initial resource detection + _, _, err := rdp.provider.Refresh(ctx, client) + if err != nil { + return err + } + + // Start periodic refresh if configured + rdp.provider.StartRefreshing(rdp.refreshInterval, client) + return nil +} + +// Shutdown is invoked during service shutdown. +func (rdp *resourceDetectionProcessor) Shutdown(_ context.Context) error { + rdp.provider.StopRefreshing() + return nil } // processTraces implements the ProcessTracesFunc type. -func (rdp *resourceDetectionProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { +func (rdp *resourceDetectionProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + res, schemaURL, _ := rdp.provider.Get(ctx, nil) rs := td.ResourceSpans() for i := 0; i < rs.Len(); i++ { rss := rs.At(i) - rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) - res := rss.Resource() - internal.MergeResource(res, rdp.resource, rdp.override) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), schemaURL)) + internal.MergeResource(rss.Resource(), res, rdp.override) } return td, nil } // processMetrics implements the ProcessMetricsFunc type. -func (rdp *resourceDetectionProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { +func (rdp *resourceDetectionProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + res, schemaURL, _ := rdp.provider.Get(ctx, nil) rm := md.ResourceMetrics() for i := 0; i < rm.Len(); i++ { rss := rm.At(i) - rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) - res := rss.Resource() - internal.MergeResource(res, rdp.resource, rdp.override) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), schemaURL)) + internal.MergeResource(rss.Resource(), res, rdp.override) } return md, nil } // processLogs implements the ProcessLogsFunc type. -func (rdp *resourceDetectionProcessor) processLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) { +func (rdp *resourceDetectionProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { + res, schemaURL, _ := rdp.provider.Get(ctx, nil) rl := ld.ResourceLogs() for i := 0; i < rl.Len(); i++ { rss := rl.At(i) - rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) - res := rss.Resource() - internal.MergeResource(res, rdp.resource, rdp.override) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), schemaURL)) + internal.MergeResource(rss.Resource(), res, rdp.override) } return ld, nil } // processProfiles implements the ProcessProfilesFunc type. -func (rdp *resourceDetectionProcessor) processProfiles(_ context.Context, ld pprofile.Profiles) (pprofile.Profiles, error) { +func (rdp *resourceDetectionProcessor) processProfiles(ctx context.Context, ld pprofile.Profiles) (pprofile.Profiles, error) { + res, schemaURL, _ := rdp.provider.Get(ctx, nil) rl := ld.ResourceProfiles() for i := 0; i < rl.Len(); i++ { rss := rl.At(i) - rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) - res := rss.Resource() - internal.MergeResource(res, rdp.resource, rdp.override) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), schemaURL)) + internal.MergeResource(rss.Resource(), res, rdp.override) } return ld, nil } diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go index 019808e4ed90a..9bcd963367962 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go @@ -297,6 +297,201 @@ func TestResourceProcessor(t *testing.T) { } } +func TestProcessor_RefreshInterval_UpdatesResource(t *testing.T) { + factory := &factory{providers: map[component.ID]*internal.ResourceProvider{}} + + // First detect returns res1, then res2. + md := &mockDetector{} + res1 := pcommon.NewResource() + require.NoError(t, res1.Attributes().FromRaw(map[string]any{"k": "v1"})) + res2 := pcommon.NewResource() + require.NoError(t, res2.Attributes().FromRaw(map[string]any{"k": "v2"})) + md.On("Detect").Return(res1, nil).Once() + md.On("Detect").Return(res2, nil) + + // Hook detector into factory. + factory.resourceProviderFactory = internal.NewProviderFactory( + map[internal.DetectorType]internal.DetectorFactory{ + "mock": func(processor.Settings, internal.DetectorConfig) (internal.Detector, error) { + return md, nil + }, + }, + ) + + cfg := &Config{ + Detectors: []string{"mock"}, + ClientConfig: confighttp.ClientConfig{Timeout: 500 * time.Millisecond}, + RefreshInterval: 50 * time.Millisecond, // short to trigger refresh quickly + } + + // Create metrics processor. + msink := new(consumertest.MetricsSink) + mp, err := factory.createMetricsProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), cfg, msink) + require.NoError(t, err) + require.NoError(t, mp.Start(t.Context(), componenttest.NewNopHost())) + defer func() { assert.NoError(t, mp.Shutdown(t.Context())) }() + + // Send one batch → should see res1. + md1 := pmetric.NewMetrics() + require.NoError(t, md1.ResourceMetrics().AppendEmpty().Resource().Attributes().FromRaw(map[string]any{})) + require.NoError(t, mp.ConsumeMetrics(t.Context(), md1)) + + require.Eventually(t, func() bool { + return len(msink.AllMetrics()) > 0 + }, time.Second, 20*time.Millisecond) + got1 := msink.AllMetrics()[0].ResourceMetrics().At(0).Resource().Attributes().AsRaw() + assert.Equal(t, map[string]any{"k": "v1"}, got1) + + // Verify Detect was called once (initial detection). + md.AssertNumberOfCalls(t, "Detect", 1) + + // Wait for refresh loop to trigger and update resource. + // Use Eventually to poll until the resource actually changes. + require.Eventually(t, func() bool { + // Keep sending metrics and check if resource has changed + mdTemp := pmetric.NewMetrics() + require.NoError(t, mdTemp.ResourceMetrics().AppendEmpty().Resource().Attributes().FromRaw(map[string]any{})) + require.NoError(t, mp.ConsumeMetrics(t.Context(), mdTemp)) + + // Check the latest metrics + allMetrics := msink.AllMetrics() + if len(allMetrics) == 0 { + return false + } + latestAttrs := allMetrics[len(allMetrics)-1].ResourceMetrics().At(0).Resource().Attributes().AsRaw() + + // Return true if we see v2 (refresh happened) + if v, ok := latestAttrs["k"]; ok && v == "v2" { + return true + } + return false + }, 500*time.Millisecond, 20*time.Millisecond, "refresh loop did not update resource from v1 to v2") + + // Verify Detect was called at least twice (initial + at least one refresh). + assert.GreaterOrEqual(t, len(md.Calls), 2, "Detect should have been called at least twice") + + // Send final batch to confirm resource is now res2. + md2 := pmetric.NewMetrics() + require.NoError(t, md2.ResourceMetrics().AppendEmpty().Resource().Attributes().FromRaw(map[string]any{})) + require.NoError(t, mp.ConsumeMetrics(t.Context(), md2)) + + require.Eventually(t, func() bool { + allMetrics := msink.AllMetrics() + return len(allMetrics) >= 2 + }, time.Second, 20*time.Millisecond) + + // Check the latest metric has v2 + allMetrics := msink.AllMetrics() + got2 := allMetrics[len(allMetrics)-1].ResourceMetrics().At(0).Resource().Attributes().AsRaw() + assert.Equal(t, map[string]any{"k": "v2"}, got2) +} + +func TestProcessor_RefreshInterval_KeepsLastGoodOnFailure(t *testing.T) { + factory := &factory{providers: map[component.ID]*internal.ResourceProvider{}} + + // Prepare resources. + res1 := pcommon.NewResource() + require.NoError(t, res1.Attributes().FromRaw(map[string]any{"k": "v1"})) + res2 := pcommon.NewResource() + require.NoError(t, res2.Attributes().FromRaw(map[string]any{"k": "v2"})) + + // Gates to coordinate 2nd (fail) and 3rd (success) detections. + failGate := make(chan struct{}) + successGate := make(chan struct{}) + + // Mock detector: + // 1) first call -> res1 (startup) + // 2) second call -> block until failGate is closed, then return error (refresh keeps last good) + // 3) third call -> block until successGate is closed, then return res2 (refresh updates) + md := &mockDetector{} + // 1) first call -> res1 (startup) + md.On("Detect").Return(res1, nil).Once() + + // 2) second call -> block, then fail + md.On("Detect"). + Run(func(_ mock.Arguments) { <-failGate }). + Return(pcommon.NewResource(), errors.New("boom")).Once() + + // 3) third call -> block, then succeed + md.On("Detect"). + Run(func(_ mock.Arguments) { <-successGate }). + Return(res2, nil).Once() + + // 4) any extra calls (ticker may fire again) -> return last good value + md.On("Detect").Return(res2, nil).Maybe() + + // Wire detector into factory. + factory.resourceProviderFactory = internal.NewProviderFactory( + map[internal.DetectorType]internal.DetectorFactory{ + "mock": func(processor.Settings, internal.DetectorConfig) (internal.Detector, error) { + return md, nil + }, + }, + ) + + cfg := &Config{ + Detectors: []string{"mock"}, + ClientConfig: confighttp.ClientConfig{Timeout: 500 * time.Millisecond}, + RefreshInterval: 25 * time.Millisecond, + } + + // Create and start a metrics processor so we can observe the applied resource. + msink := new(consumertest.MetricsSink) + mp, err := factory.createMetricsProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), cfg, msink) + require.NoError(t, err) + require.NoError(t, mp.Start(t.Context(), componenttest.NewNopHost())) + defer func() { assert.NoError(t, mp.Shutdown(t.Context())) }() + + // Helper to push one metrics batch and return the resource attrs of that batch. + getAttrsAfterConsume := func() map[string]any { + md := pmetric.NewMetrics() + require.NoError(t, md.ResourceMetrics().AppendEmpty().Resource().Attributes().FromRaw(map[string]any{})) + require.NoError(t, mp.ConsumeMetrics(t.Context(), md)) + + // Wait until sink has one more entry and return the last one's attrs. + var out map[string]any + require.Eventually(t, func() bool { + all := msink.AllMetrics() + if len(all) == 0 { + return false + } + last := all[len(all)-1] + out = last.ResourceMetrics().At(0).Resource().Attributes().AsRaw() + return true + }, time.Second, 10*time.Millisecond) + return out + } + + // 1) After startup, first detection applied -> expect v1. + got := getAttrsAfterConsume() + assert.Equal(t, map[string]any{"k": "v1"}, got) + + // 2) Let the next refresh run BUT keep it blocked on failGate. + // While blocked, a consume should still see v1. + // (The refresh goroutine is waiting; state must not change.) + time.Sleep(2 * cfg.RefreshInterval) // give the loop a chance to enter Detect and block + got = getAttrsAfterConsume() + assert.Equal(t, map[string]any{"k": "v1"}, got) + + // 3) Release the failure; refresh completes with error => last good (v1) must be kept. + close(failGate) + // Give the loop a brief moment to finish that failed refresh. + time.Sleep(2 * cfg.RefreshInterval) + got = getAttrsAfterConsume() + assert.Equal(t, map[string]any{"k": "v1"}, got) + + // 4) Now allow the next refresh to succeed (return res2). + close(successGate) + // Give the loop a moment to complete the successful refresh. + require.Eventually(t, func() bool { + attrs := getAttrsAfterConsume() + return assert.ObjectsAreEqual(map[string]any{"k": "v2"}, attrs) + }, time.Second, 10*time.Millisecond) + + // Verify the mock saw exactly 3 Detect calls in the order we expected. + md.AssertExpectations(t) +} + func benchmarkConsumeTraces(b *testing.B, cfg *Config) { factory := NewFactory() sink := new(consumertest.TracesSink) diff --git a/processor/resourcedetectionprocessor/testdata/config.yaml b/processor/resourcedetectionprocessor/testdata/config.yaml index 6cde1f5251c5e..e58e488bf58bc 100644 --- a/processor/resourcedetectionprocessor/testdata/config.yaml +++ b/processor/resourcedetectionprocessor/testdata/config.yaml @@ -10,7 +10,7 @@ resourcedetection/openshift: insecure: true resourcedetection/aks: - detectors: [ env, aks ] + detectors: [env, aks] timeout: 2s override: false @@ -61,6 +61,12 @@ resourcedetection/heroku: timeout: 2s override: false +resourcedetection/refresh: + detectors: [system] + timeout: 2s + override: false + refresh_interval: 5s + resourcedetection/invalid: detectors: [env, system] timeout: 2s @@ -83,4 +89,4 @@ resourcedetection/resourceattributes: system: resource_attributes: os.type: - enabled: false \ No newline at end of file + enabled: false