From 05f573f77c1a1edc1c1eb817d2eb1abad78a742e Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 9 May 2025 00:53:30 +0200 Subject: [PATCH 01/15] [chore] Update bugfix release guidelines (#12999) #### Description This updates the bugfix guidelines to make them less strict. In particular, after this change, we would start releasing bugfix releases under the following cases: 1. Lack of consensus of SIG leads on whether to release a bugfix version within one working day after a report has been made 2. Issues that have not been reported by multiple people, but that are known to be used in production This also: - Explicitly lists difficulties with debugging and troubleshooting as 'severe enough' - Explicitly states that the release manager is responsible for bugfix releases --- docs/release.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/release.md b/docs/release.md index 50c5d9160ea..e6994b85e9a 100644 --- a/docs/release.md +++ b/docs/release.md @@ -178,20 +178,22 @@ When considering making a bugfix release on the `v0.N.x` release cycle, the bug - Changing the configuration to an easy to find value. 2. The bug happens in common setups. To gauge this, maintainers can consider the following: - If the bug is specific to a certain platform, and if that platform is in [Tier 1](../docs/platform-support.md#tiered-platform-support-model). - - The bug happens with the default configuration or with a commonly used one (e.g. has been reported by multiple people) + - The bug happens with the default configuration or with one that is known to be used in production. 3. The bug is sufficiently severe. For example (non-exhaustive list): - The bug makes the Collector crash reliably - The bug makes the Collector fail to start under an accepted configuration - The bug produces significant data loss - The bug makes the Collector negatively affect its environment (e.g. significantly affects its host machine) + - The bug makes it difficult to troubleshoot or debug Collector setups We aim to provide a release that fixes security-related issues in at most 30 days since they are publicly announced; with the current release schedule this means security issues will typically not warrant a bugfix release. An exception is critical vulnerabilities (CVSSv3 score >= 9.0), which will warrant a release within five business days. The OpenTelemetry Collector maintainers will ultimately have the responsibility to assess if a given bug or security issue fulfills all the necessary criteria and may grant exceptions in a case-by-case basis. +If the maintainers are unable to reach consensus within one working day, we will lean towards releasing a bugfix version. ### Bugfix release procedure -The following documents the procedure to release a bugfix +The release manager of a minor version is responsible for releasing any bugfix versions on this release series. The following documents the procedure to release a bugfix 1. Create a pull request against the `release/` (e.g. `release/v0.90.x`) branch to apply the fix. 2. Make sure you are on `release/`. Prepare release commits with `prepare-release` make target, e.g. `make prepare-release PREVIOUS_VERSION=0.90.0 RELEASE_CANDIDATE=0.90.1 MODSET=beta`, and create a pull request against the `release/` branch. From 4d929a9d6a7ecac0c151f774407dad7b0a0bf81d Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 8 May 2025 20:58:55 -0700 Subject: [PATCH 02/15] [chore] Fix BenchmarkMemoryQueueWaitForResult test (#13006) Fixes http://github.com/open-telemetry/opentelemetry-collector/issues/13004 Signed-off-by: Bogdan Drutu --- .../exporterhelper/internal/queuebatch/memory_queue_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go b/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go index 9687f12c31f..0dcf45a5db2 100644 --- a/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go @@ -198,8 +198,10 @@ func BenchmarkMemoryQueueWaitForResult(b *testing.B) { wg := sync.WaitGroup{} consumed := &atomic.Int64{} q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1000, waitForResult: true}) + require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) // Consume async new data. + wg.Add(1) go func() { defer wg.Done() for { @@ -212,7 +214,6 @@ func BenchmarkMemoryQueueWaitForResult(b *testing.B) { } }() - require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { From 6bd77b33a5f68d462845b4e5d92f27d12dc46ef6 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 9 May 2025 11:36:54 +0200 Subject: [PATCH 03/15] [chore] Add tests loading nil to any (#12998) #### Description While working on #12981, I would have found it useful to have these tests, since they surfaced a bug when enabling `DecodeNil`. --- cmd/mdatagen/internal/loader_test.go | 16 ++++++++++++++++ .../internal/testdata/empty_test_config.yaml | 9 +++++++++ confmap/confmap_test.go | 14 ++++++++++++++ 3 files changed, 39 insertions(+) create mode 100644 cmd/mdatagen/internal/testdata/empty_test_config.yaml diff --git a/cmd/mdatagen/internal/loader_test.go b/cmd/mdatagen/internal/loader_test.go index 08dda39e6ef..a50d7e2487a 100644 --- a/cmd/mdatagen/internal/loader_test.go +++ b/cmd/mdatagen/internal/loader_test.go @@ -371,6 +371,22 @@ func TestLoadMetadata(t *testing.T) { }, }, }, + { + name: "testdata/empty_test_config.yaml", + want: Metadata{ + Type: "test", + GeneratedPackageName: "metadata", + ScopeName: "go.opentelemetry.io/collector/cmd/mdatagen/internal", + ShortFolderName: "testdata", + Tests: Tests{Host: "componenttest.NewNopHost()"}, + Status: &Status{ + Class: "receiver", + Stability: map[component.StabilityLevel][]string{ + component.StabilityLevelBeta: {"logs"}, + }, + }, + }, + }, { name: "testdata/invalid_type_rattr.yaml", want: Metadata{}, diff --git a/cmd/mdatagen/internal/testdata/empty_test_config.yaml b/cmd/mdatagen/internal/testdata/empty_test_config.yaml new file mode 100644 index 00000000000..5b13e2dff33 --- /dev/null +++ b/cmd/mdatagen/internal/testdata/empty_test_config.yaml @@ -0,0 +1,9 @@ +type: test + +status: + class: receiver + stability: + beta: [logs] + +tests: + config: diff --git a/confmap/confmap_test.go b/confmap/confmap_test.go index 65be60fcc07..026ba8f04ad 100644 --- a/confmap/confmap_test.go +++ b/confmap/confmap_test.go @@ -99,6 +99,20 @@ func TestToStringMap(t *testing.T) { } } +type testConfigAny struct { + AnyField any `mapstructure:"any_field"` +} + +func TestNilToAnyField(t *testing.T) { + stringMap := map[string]any{ + "any_field": nil, + } + conf := NewFromStringMap(stringMap) + cfg := &testConfigAny{} + require.NoError(t, conf.Unmarshal(cfg)) + assert.Nil(t, cfg.AnyField) +} + func TestExpandNilStructPointersHookFunc(t *testing.T) { stringMap := map[string]any{ "boolean": nil, From 5800834fbac45f07f909aaa747a9794a5948ec59 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 9 May 2025 16:34:39 +0200 Subject: [PATCH 04/15] [chore] Make mapstructure hooks safe against untyped nils (#13001) #### Description If we enable `DecodeNil` as true, we may have [untyped nils](https://go.dev/doc/faq#nil_error) being passed. Unfortunately, these are not valid values for `reflect`, which leads to surprising behavior such as golang/go/issues/51649. Unfortunately, the default hooks from mapstructure do not deal with this properly. To account for this, we: - Vendor and change the `ComposeDecodeHookFunc` function so that this case is accounted for the kinds of hooks that just won't work with untyped nils - Create a safe wrapper for the hooks that do work with untyped nils. This wrapper is used in all hooks, but in the interest of keeping as close to what I would imagine upstream will accept, I did not add this to the compose function. This should not have any end-user observable behavior. #### Link to tracking issue Attempt to work around https://github.com/open-telemetry/opentelemetry-collector/pull/12996#issuecomment-2862859367 --------- Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- confmap/confmap.go | 40 +++++-- .../third_party/composehook/compose_hook.go | 103 ++++++++++++++++++ 2 files changed, 132 insertions(+), 11 deletions(-) create mode 100644 confmap/internal/third_party/composehook/compose_hook.go diff --git a/confmap/confmap.go b/confmap/confmap.go index 9b95e4cf1bc..3040778dcd9 100644 --- a/confmap/confmap.go +++ b/confmap/confmap.go @@ -19,6 +19,7 @@ import ( "github.com/knadh/koanf/v2" encoder "go.opentelemetry.io/collector/confmap/internal/mapstructure" + "go.opentelemetry.io/collector/confmap/internal/third_party/composehook" ) const ( @@ -234,7 +235,7 @@ func decodeConfig(m *Conf, result any, errorUnused bool, skipTopLevelUnmarshaler TagName: MapstructureTag, WeaklyTypedInput: false, MatchName: caseSensitiveMatchName, - DecodeHook: mapstructure.ComposeDecodeHookFunc( + DecodeHook: composehook.ComposeDecodeHookFunc( useExpandValue(), expandNilStructPointersHookFunc(), mapstructure.StringToSliceHookFunc(","), @@ -306,6 +307,23 @@ func isStringyStructure(t reflect.Type) bool { return false } +// safeWrapDecodeHookFunc wraps a DecodeHookFuncValue to ensure fromVal is a valid `reflect.Value` +// object and therefore it is safe to call `reflect.Value` methods on fromVal. +// +// Use this only if the hook does not need to be called on untyped nil values. +// Typed nil values are safe to call and will be passed to the hook. +// See https://github.com/golang/go/issues/51649 +func safeWrapDecodeHookFunc( + f mapstructure.DecodeHookFuncValue, +) mapstructure.DecodeHookFuncValue { + return func(fromVal reflect.Value, toVal reflect.Value) (any, error) { + if !fromVal.IsValid() { + return nil, nil + } + return f(fromVal, toVal) + } +} + // When a value has been loaded from an external source via a provider, we keep both the // parsed value and the original string value. This allows us to expand the value to its // original string representation when decoding into a string field, and use the original otherwise. @@ -355,7 +373,7 @@ func useExpandValue() mapstructure.DecodeHookFuncType { // we want an unmarshaled Config to be equivalent to // Config{Thing: &SomeStruct{}} instead of Config{Thing: nil} func expandNilStructPointersHookFunc() mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { // ensure we are dealing with map to map comparison if from.Kind() == reflect.Map && to.Kind() == reflect.Map { toElem := to.Type().Elem() @@ -375,7 +393,7 @@ func expandNilStructPointersHookFunc() mapstructure.DecodeHookFuncValue { } } return from.Interface(), nil - } + }) } // mapKeyStringToMapKeyTextUnmarshalerHookFunc returns a DecodeHookFuncType that checks that a conversion from @@ -422,7 +440,7 @@ func mapKeyStringToMapKeyTextUnmarshalerHookFunc() mapstructure.DecodeHookFuncTy // unmarshalerEmbeddedStructsHookFunc provides a mechanism for embedded structs to define their own unmarshal logic, // by implementing the Unmarshaler interface. func unmarshalerEmbeddedStructsHookFunc() mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { if to.Type().Kind() != reflect.Struct { return from.Interface(), nil } @@ -455,14 +473,14 @@ func unmarshalerEmbeddedStructsHookFunc() mapstructure.DecodeHookFuncValue { } } return fromAsMap, nil - } + }) } // Provides a mechanism for individual structs to define their own unmarshal logic, // by implementing the Unmarshaler interface, unless skipTopLevelUnmarshaler is // true and the struct matches the top level object being unmarshaled. func unmarshalerHookFunc(result any, skipTopLevelUnmarshaler bool) mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { if !to.CanAddr() { return from.Interface(), nil } @@ -495,14 +513,14 @@ func unmarshalerHookFunc(result any, skipTopLevelUnmarshaler bool) mapstructure. } return unmarshaler, nil - } + }) } // marshalerHookFunc returns a DecodeHookFuncValue that checks structs that aren't // the original to see if they implement the Marshaler interface. func marshalerHookFunc(orig any) mapstructure.DecodeHookFuncValue { origType := reflect.TypeOf(orig) - return func(from reflect.Value, _ reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, _ reflect.Value) (any, error) { if from.Kind() != reflect.Struct { return from.Interface(), nil } @@ -520,7 +538,7 @@ func marshalerHookFunc(orig any) mapstructure.DecodeHookFuncValue { return nil, err } return conf.ToStringMap(), nil - } + }) } // Unmarshaler interface may be implemented by types to customize their behavior when being unmarshaled from a Conf. @@ -562,7 +580,7 @@ type Marshaler interface { // 4. configuration have no `keys` field specified, the output should be default config // - for example, input is {}, then output is Config{ Keys: ["a", "b"]} func zeroSliceHookFunc() mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { if to.CanSet() && to.Kind() == reflect.Slice && from.Kind() == reflect.Slice { if from.IsNil() { // input slice is nil, set output slice to nil. @@ -574,7 +592,7 @@ func zeroSliceHookFunc() mapstructure.DecodeHookFuncValue { } return from.Interface(), nil - } + }) } type moduleFactory[T any, S any] interface { diff --git a/confmap/internal/third_party/composehook/compose_hook.go b/confmap/internal/third_party/composehook/compose_hook.go new file mode 100644 index 00000000000..f51050f66ed --- /dev/null +++ b/confmap/internal/third_party/composehook/compose_hook.go @@ -0,0 +1,103 @@ +// Copyright (c) 2013 Mitchell Hashimoto +// SPDX-License-Identifier: MIT +// This code is a modified version of https://github.com/go-viper/mapstructure + +package composehook // import "go.opentelemetry.io/collector/confmap/internal/third_party/composehook" + +import ( + "errors" + "reflect" + + "github.com/go-viper/mapstructure/v2" +) + +// typedDecodeHook takes a raw DecodeHookFunc (an any) and turns +// it into the proper DecodeHookFunc type, such as DecodeHookFuncType. +func typedDecodeHook(h mapstructure.DecodeHookFunc) mapstructure.DecodeHookFunc { + // Create variables here so we can reference them with the reflect pkg + var f1 mapstructure.DecodeHookFuncType + var f2 mapstructure.DecodeHookFuncKind + var f3 mapstructure.DecodeHookFuncValue + + // Fill in the variables into this interface and the rest is done + // automatically using the reflect package. + potential := []any{f3, f1, f2} + + v := reflect.ValueOf(h) + vt := v.Type() + for _, raw := range potential { + pt := reflect.ValueOf(raw).Type() + if vt.ConvertibleTo(pt) { + return v.Convert(pt).Interface() + } + } + + return nil +} + +// cachedDecodeHook takes a raw DecodeHookFunc (an any) and turns +// it into a closure to be used directly +// if the type fails to convert we return a closure always erroring to keep the previous behavior +func cachedDecodeHook(raw mapstructure.DecodeHookFunc) func(reflect.Value, reflect.Value) (any, error) { + switch f := typedDecodeHook(raw).(type) { + case mapstructure.DecodeHookFuncType: + return func(from reflect.Value, to reflect.Value) (any, error) { + // CHANGE FROM UPSTREAM: check if from is valid and return nil if not + if !from.IsValid() { + return nil, nil + } + return f(from.Type(), to.Type(), from.Interface()) + } + case mapstructure.DecodeHookFuncKind: + return func(from reflect.Value, to reflect.Value) (any, error) { + // CHANGE FROM UPSTREAM: check if from is valid and return nil if not + if !from.IsValid() { + return nil, nil + } + return f(from.Kind(), to.Kind(), from.Interface()) + } + case mapstructure.DecodeHookFuncValue: + return func(from reflect.Value, to reflect.Value) (any, error) { + return f(from, to) + } + default: + return func(reflect.Value, reflect.Value) (any, error) { + return nil, errors.New("invalid decode hook signature") + } + } +} + +// ComposeDecodeHookFunc creates a single DecodeHookFunc that +// automatically composes multiple DecodeHookFuncs. +// +// The composed funcs are called in order, with the result of the +// previous transformation. +// +// This is a copy of [mapstructure.ComposeDecodeHookFunc] but with +// validation added. +func ComposeDecodeHookFunc(fs ...mapstructure.DecodeHookFunc) mapstructure.DecodeHookFunc { + cached := make([]func(reflect.Value, reflect.Value) (any, error), 0, len(fs)) + for _, f := range fs { + cached = append(cached, cachedDecodeHook(f)) + } + return func(f reflect.Value, t reflect.Value) (any, error) { + var err error + + // CHANGE FROM UPSTREAM: check if f is valid before calling f.Interface() + var data any + if f.IsValid() { + data = f.Interface() + } + + newFrom := f + for _, c := range cached { + data, err = c(newFrom, t) + if err != nil { + return nil, err + } + newFrom = reflect.ValueOf(data) + } + + return data, nil + } +} From c8057ef13e44a967c3fa155389309d3959b942bf Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Tue, 6 May 2025 12:57:08 -0400 Subject: [PATCH 05/15] OTEL-2540 Add SpanContext to persistent queue --- .../internal/queuebatch/persistent_queue.go | 100 +++++++++- .../queuebatch/persistent_queue_test.go | 179 ++++++++++++++++++ 2 files changed, 271 insertions(+), 8 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go index a1106103f8b..2354f9303f7 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go @@ -6,11 +6,14 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel import ( "context" "encoding/binary" + "encoding/hex" + "encoding/json" "errors" "fmt" "strconv" "sync" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.opentelemetry.io/collector/component" @@ -29,6 +32,8 @@ const ( writeIndexKey = "wi" currentlyDispatchedItemsKey = "di" queueSizeKey = "si" + + errInvalidTraceFlagsLength = "trace flags must only be 1 byte" ) var ( @@ -238,6 +243,56 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { return pq.putInternal(ctx, req) } +type marshaledRequestWithSpanContext struct { + RequestBytes []byte `json:"request"` + SpanContextJSON json.RawMessage `json:"span_context"` +} + +type spanContextConfigWrapper struct { + TraceID string + SpanID string + TraceFlags string + TraceState string + Remote bool +} + +func SpanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContext, error) { + traceID, err := trace.TraceIDFromHex(wrapper.TraceID) + if err != nil { + return nil, err + } + spanID, err := trace.SpanIDFromHex(wrapper.SpanID) + if err != nil { + return nil, err + } + decoded, err := hex.DecodeString(wrapper.TraceFlags) + if err != nil { + return nil, err + } + if len(decoded) != 1 { + return nil, errors.New(errInvalidTraceFlagsLength) + } + traceFlags := trace.TraceFlags(decoded[0]) + traceState, err := trace.ParseTraceState(wrapper.TraceState) + if err != nil { + return nil, err + } + + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: traceFlags, + TraceState: traceState, + Remote: wrapper.Remote, + }) + + if !sc.IsValid() { + return nil, nil + } + + return &sc, nil +} + // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { reqSize := pq.set.sizer.Sizeof(req) @@ -254,11 +309,24 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { if err != nil { return err } - + // Retrieve SpanContext object from provided context, and store alongside the request + sc := trace.SpanContextFromContext(ctx) + scJSON, err := json.Marshal(sc) + if err != nil { + return err + } + envelope := marshaledRequestWithSpanContext{ + RequestBytes: reqBuf, + SpanContextJSON: scJSON, + } + envelopeBytes, err := json.Marshal(envelope) + if err != nil { + return err + } // Carry out a transaction where we both add the item and update the write index ops := []*storage.Operation{ storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1)), - storage.SetOperation(getItemKey(pq.writeIndex), reqBuf), + storage.SetOperation(getItemKey(pq.writeIndex), envelopeBytes), } if err = pq.client.Batch(ctx, ops...); err != nil { return err @@ -291,7 +359,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don // Read until either a successful retrieved element or no more elements in the storage. for pq.readIndex != pq.writeIndex { - index, req, consumed := pq.getNextItem(ctx) + index, req, consumed, restoredContext := pq.getNextItem(ctx) // Ensure the used size and the channel size are in sync. if pq.readIndex == pq.writeIndex { pq.queueSize = 0 @@ -300,7 +368,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don if consumed { id := indexDonePool.Get().(*indexDone) id.reset(index, pq.set.sizer.Sizeof(req), pq) - return context.Background(), req, id, true + return restoredContext, req, id, true } } @@ -313,7 +381,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don // getNextItem pulls the next available item from the persistent storage along with its index. Once processing is // finished, the index should be called with onDone to clean up the storage. If no new item is available, // returns false. -func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) { +func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool, context.Context) { index := pq.readIndex // Increase here, so even if errors happen below, it always iterates pq.readIndex++ @@ -325,8 +393,24 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) getOp) var request T + restoredContext := context.Background() if err == nil { - request, err = pq.set.encoding.Unmarshal(getOp.Value) + var envelope marshaledRequestWithSpanContext + if err = json.Unmarshal(getOp.Value, &envelope); err == nil { + // Unmarshal the request using the specified encoding + if request, err = pq.set.encoding.Unmarshal(envelope.RequestBytes); err == nil { + // Unmarshal the SpanContext from JSON + var wrapper spanContextConfigWrapper + if len(envelope.SpanContextJSON) > 0 { + if err = json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil { + var sc *trace.SpanContext + if sc, err = SpanContextFromWrapper(wrapper); err == nil && sc != nil { + restoredContext = trace.ContextWithSpanContext(restoredContext, *sc) + } + } + } + } + } } if err != nil { @@ -336,14 +420,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) pq.logger.Error("Error deleting item from queue", zap.Error(err)) } - return 0, request, false + return 0, request, false, restoredContext } // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. pq.refClient++ - return index, request, true + return index, request, true, restoredContext } // onDone should be called to remove the item of the given index from the queue once processing is finished. diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index 1d25ea8a0fa..1e69765ba01 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -1205,3 +1206,181 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[uint defer pq.mu.Unlock() assert.ElementsMatch(t, compare, pq.currentlyDispatchedItems) } + +func TestSpanContextFromWrapper(t *testing.T) { + testCases := []struct { + name string + wrapper spanContextConfigWrapper + expectErr bool + errContains string + expectNil bool + expectValid bool + expectTraceID string + expectSpanID string + expectFlags string + expectState string + expectRemote bool + }{ + { + name: "invalid trace id", + wrapper: spanContextConfigWrapper{ + TraceID: "invalidtraceid", + SpanID: "0102030405060708", + TraceFlags: "01", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "invalid span id", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "invalidspanid", + TraceFlags: "01", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "invalid trace flags hex", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "zz", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "invalid trace flags length", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "0102", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + errContains: errInvalidTraceFlagsLength, + }, + { + name: "invalid trace state", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "01", + TraceState: "invalid=tracestate,=bad", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "valid span context", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "01", + TraceState: "vendor=value", + Remote: true, + }, + expectErr: false, + expectNil: false, + expectValid: true, + expectTraceID: "0102030405060708090a0b0c0d0e0f10", + expectSpanID: "0102030405060708", + expectFlags: "01", + expectState: "vendor=value", + expectRemote: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + scc, err := SpanContextFromWrapper(tc.wrapper) + if tc.expectErr { + require.Error(t, err) + if tc.errContains != "" { + assert.Contains(t, err.Error(), tc.errContains) + } + } else { + require.NoError(t, err) + } + if tc.expectNil { + assert.Nil(t, scc) + } else { + assert.NotNil(t, scc) + if tc.expectValid { + assert.True(t, scc.IsValid()) + assert.Equal(t, tc.expectTraceID, scc.TraceID().String()) + assert.Equal(t, tc.expectSpanID, scc.SpanID().String()) + assert.Equal(t, tc.expectFlags, scc.TraceFlags().String()) + assert.Equal(t, tc.expectState, scc.TraceState().String()) + assert.Equal(t, tc.expectRemote, scc.IsRemote()) + } + } + }) + } +} + +func TestPersistentQueue_SpanContextRoundTrip(t *testing.T) { + // Setup a minimal persistent queue using uint64Encoding and uint64 + pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ + sizer: request.RequestsSizer[uint64]{}, + capacity: 10, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: uint64Encoding{}, + id: component.NewID(exportertest.NopType), + telemetry: componenttest.NewNopTelemetrySettings(), + }).(*persistentQueue[uint64]) + + ext := storagetest.NewMockStorageExtension(nil) + client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String()) + require.NoError(t, err) + pq.initClient(context.Background(), client) + + // Create a valid SpanContext + traceID, _ := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10") + spanID, _ := trace.SpanIDFromHex("0102030405060708") + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: 0x01, + TraceState: trace.TraceState{}, + Remote: true, + }) + ctxWithSC := trace.ContextWithSpanContext(context.Background(), sc) + + // Offer a request with this context + req := uint64(42) + require.NoError(t, pq.Offer(ctxWithSC, req)) + + // Read the request and restored context + restoredCtx, gotReq, _, ok := pq.Read(context.Background()) + require.True(t, ok) + assert.Equal(t, req, gotReq) + restoredSC := trace.SpanContextFromContext(restoredCtx) + assert.True(t, restoredSC.IsValid()) + assert.Equal(t, sc.TraceID(), restoredSC.TraceID()) + assert.Equal(t, sc.SpanID(), restoredSC.SpanID()) + assert.Equal(t, sc.TraceFlags(), restoredSC.TraceFlags()) + assert.Equal(t, sc.TraceState().String(), restoredSC.TraceState().String()) + assert.Equal(t, sc.IsRemote(), restoredSC.IsRemote()) + + // Also test with a context with no SpanContext + req2 := uint64(99) + require.NoError(t, pq.Offer(context.Background(), req2)) + restoredCtx2, gotReq2, _, ok2 := pq.Read(context.Background()) + require.True(t, ok2) + assert.Equal(t, req2, gotReq2) + restoredSC2 := trace.SpanContextFromContext(restoredCtx2) + assert.False(t, restoredSC2.IsValid()) +} From 1847b389eac3870176bfc960ad9e5b2fb428c060 Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Tue, 6 May 2025 13:05:21 -0400 Subject: [PATCH 06/15] OTEL-2540 add changelog --- ...pack4-add-spancontext-persistentqueue.yaml | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .chloggen/jackgopack4-add-spancontext-persistentqueue.yaml diff --git a/.chloggen/jackgopack4-add-spancontext-persistentqueue.yaml b/.chloggen/jackgopack4-add-spancontext-persistentqueue.yaml new file mode 100644 index 00000000000..ead18f6f15a --- /dev/null +++ b/.chloggen/jackgopack4-add-spancontext-persistentqueue.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: 'exporter/exporterhelper' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: 'Propagate SpanContext with requests in the persistent queue' + +# One or more tracking issues or pull requests related to the change +issues: [11740, 12212, 12934] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This change will allow internal telemetry spans to be processed when using persistent queue/storage. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From ff52b0c0d20209538f413049459893e5d4d5f0b2 Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Wed, 7 May 2025 15:33:14 -0400 Subject: [PATCH 07/15] OTEL-2540 add marshalRequestWithSpanContext and unmarshalRequestWithSpanContext --- .../internal/queuebatch/persistent_queue.go | 86 +++++++++++-------- .../queuebatch/persistent_queue_test.go | 2 +- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go index 2354f9303f7..99142287774 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go @@ -245,7 +245,7 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { type marshaledRequestWithSpanContext struct { RequestBytes []byte `json:"request"` - SpanContextJSON json.RawMessage `json:"span_context"` + SpanContextJSON json.RawMessage `json:"span_context,omitempty"` } type spanContextConfigWrapper struct { @@ -293,6 +293,53 @@ func SpanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContex return &sc, nil } +// unmarshalRequestWithSpanContext unmarshals a marshaledRequestWithSpanContext from bytes, returning the request +// and a context with the restored SpanContext (if present). +func unmarshalRequestWithSpanContext[T any](encoding Encoding[T], value []byte) (T, context.Context, error) { + var req T + restoredContext := context.Background() + var envelope marshaledRequestWithSpanContext + if err := json.Unmarshal(value, &envelope); err != nil { + return req, restoredContext, err + } + request, err := encoding.Unmarshal(envelope.RequestBytes) + if err != nil { + return req, restoredContext, err + } + if len(envelope.SpanContextJSON) > 0 { + var wrapper spanContextConfigWrapper + if err := json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil { + if sc, err := SpanContextFromWrapper(wrapper); err == nil && sc != nil { + restoredContext = trace.ContextWithSpanContext(restoredContext, *sc) + } + } + } + return request, restoredContext, nil +} + +// marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a marshaledRequestWithSpanContext envelope as bytes. +func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding[T], req T) ([]byte, error) { + reqBuf, err := encoding.Marshal(req) + if err != nil { + return nil, err + } + sc := trace.SpanContextFromContext(ctx) + var scJSON []byte + if sc.IsValid() { + scJSON, err = json.Marshal(sc) + if err != nil { + return nil, err + } + } else { + scJSON = nil // Will be omitted due to omitempty + } + envelope := marshaledRequestWithSpanContext{ + RequestBytes: reqBuf, + SpanContextJSON: scJSON, + } + return json.Marshal(envelope) +} + // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { reqSize := pq.set.sizer.Sizeof(req) @@ -305,28 +352,14 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { } } - reqBuf, err := pq.set.encoding.Marshal(req) - if err != nil { - return err - } - // Retrieve SpanContext object from provided context, and store alongside the request - sc := trace.SpanContextFromContext(ctx) - scJSON, err := json.Marshal(sc) - if err != nil { - return err - } - envelope := marshaledRequestWithSpanContext{ - RequestBytes: reqBuf, - SpanContextJSON: scJSON, - } - envelopeBytes, err := json.Marshal(envelope) + reqBuf, err := marshalRequestWithSpanContext(ctx, pq.set.encoding, req) if err != nil { return err } // Carry out a transaction where we both add the item and update the write index ops := []*storage.Operation{ storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1)), - storage.SetOperation(getItemKey(pq.writeIndex), envelopeBytes), + storage.SetOperation(getItemKey(pq.writeIndex), reqBuf), } if err = pq.client.Batch(ctx, ops...); err != nil { return err @@ -395,22 +428,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool, var request T restoredContext := context.Background() if err == nil { - var envelope marshaledRequestWithSpanContext - if err = json.Unmarshal(getOp.Value, &envelope); err == nil { - // Unmarshal the request using the specified encoding - if request, err = pq.set.encoding.Unmarshal(envelope.RequestBytes); err == nil { - // Unmarshal the SpanContext from JSON - var wrapper spanContextConfigWrapper - if len(envelope.SpanContextJSON) > 0 { - if err = json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil { - var sc *trace.SpanContext - if sc, err = SpanContextFromWrapper(wrapper); err == nil && sc != nil { - restoredContext = trace.ContextWithSpanContext(restoredContext, *sc) - } - } - } - } - } + request, restoredContext, err = unmarshalRequestWithSpanContext(pq.set.encoding, getOp.Value) } if err != nil { @@ -519,7 +537,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } - req, err := pq.set.encoding.Unmarshal(op.Value) + req, _, err := unmarshalRequestWithSpanContext(pq.set.encoding, op.Value) // If error happened or item is nil, it will be efficiently ignored if err != nil { pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index 1e69765ba01..9a507177fe8 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -914,7 +914,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { } func TestPersistentQueue_StorageFull(t *testing.T) { - marshaled, err := uint64Encoding{}.Marshal(uint64(50)) + marshaled, err := marshalRequestWithSpanContext(context.Background(), uint64Encoding{}, uint64(50)) require.NoError(t, err) maxSizeInBytes := len(marshaled) * 5 // arbitrary small number From 03905d0a22e41f9fd7b1c137e91c601061709c90 Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Wed, 7 May 2025 16:39:00 -0400 Subject: [PATCH 08/15] add test coverage --- .../internal/queuebatch/persistent_queue.go | 8 +- .../queuebatch/persistent_queue_test.go | 101 +++++++++++++++++- 2 files changed, 102 insertions(+), 7 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go index 99142287774..34f5fac6e60 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go @@ -256,7 +256,7 @@ type spanContextConfigWrapper struct { Remote bool } -func SpanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContext, error) { +func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContext, error) { traceID, err := trace.TraceIDFromHex(wrapper.TraceID) if err != nil { return nil, err @@ -286,10 +286,6 @@ func SpanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContex Remote: wrapper.Remote, }) - if !sc.IsValid() { - return nil, nil - } - return &sc, nil } @@ -309,7 +305,7 @@ func unmarshalRequestWithSpanContext[T any](encoding Encoding[T], value []byte) if len(envelope.SpanContextJSON) > 0 { var wrapper spanContextConfigWrapper if err := json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil { - if sc, err := SpanContextFromWrapper(wrapper); err == nil && sc != nil { + if sc, err := spanContextFromWrapper(wrapper); err == nil && sc != nil { restoredContext = trace.ContextWithSpanContext(restoredContext, *sc) } } diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index 9a507177fe8..42ac8b38682 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -6,6 +6,7 @@ package queuebatch import ( "context" "encoding/binary" + "encoding/json" "errors" "fmt" "math" @@ -1304,7 +1305,7 @@ func TestSpanContextFromWrapper(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - scc, err := SpanContextFromWrapper(tc.wrapper) + scc, err := spanContextFromWrapper(tc.wrapper) if tc.expectErr { require.Error(t, err) if tc.errContains != "" { @@ -1384,3 +1385,101 @@ func TestPersistentQueue_SpanContextRoundTrip(t *testing.T) { restoredSC2 := trace.SpanContextFromContext(restoredCtx2) assert.False(t, restoredSC2.IsValid()) } + +func TestMarshalUnmarshalRequestWithSpanContext(t *testing.T) { + t.Run("valid SpanContext round-trip", func(t *testing.T) { + traceID, _ := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10") + spanID, _ := trace.SpanIDFromHex("0102030405060708") + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: 0x01, + TraceState: trace.TraceState{}, + Remote: true, + }) + ctxWithSC := trace.ContextWithSpanContext(context.Background(), sc) + request := uint64(42) + data, err := marshalRequestWithSpanContext(ctxWithSC, uint64Encoding{}, request) + require.NoError(t, err) + gotReq, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) + require.NoError(t, err) + assert.Equal(t, request, gotReq) + restoredSC := trace.SpanContextFromContext(gotCtx) + assert.True(t, restoredSC.IsValid()) + assert.Equal(t, sc.TraceID(), restoredSC.TraceID()) + assert.Equal(t, sc.SpanID(), restoredSC.SpanID()) + assert.Equal(t, sc.TraceFlags(), restoredSC.TraceFlags()) + assert.Equal(t, sc.TraceState().String(), restoredSC.TraceState().String()) + assert.Equal(t, sc.IsRemote(), restoredSC.IsRemote()) + }) + + t.Run("invalid SpanContext is omitted", func(t *testing.T) { + // An invalid SpanContext (zero value) + ctxWithInvalidSC := trace.ContextWithSpanContext(context.Background(), trace.SpanContext{}) + request := uint64(99) + data, err := marshalRequestWithSpanContext(ctxWithInvalidSC, uint64Encoding{}, request) + require.NoError(t, err) + // Should not contain span_context field + assert.NotContains(t, string(data), "span_context") + gotReq, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) + require.NoError(t, err) + assert.Equal(t, request, gotReq) + restoredSC := trace.SpanContextFromContext(gotCtx) + assert.False(t, restoredSC.IsValid()) + }) + + t.Run("no SpanContext in context is omitted", func(t *testing.T) { + request := uint64(123) + data, err := marshalRequestWithSpanContext(context.Background(), uint64Encoding{}, request) + require.NoError(t, err) + assert.NotContains(t, string(data), "span_context") + gotReq, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) + require.NoError(t, err) + assert.Equal(t, request, gotReq) + restoredSC := trace.SpanContextFromContext(gotCtx) + assert.False(t, restoredSC.IsValid()) + }) + + t.Run("corrupted span_context field", func(t *testing.T) { + // Manually create a bad envelope + envelope := marshaledRequestWithSpanContext{ + RequestBytes: []byte{0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + SpanContextJSON: []byte(`{"TraceID":123}`), // invalid TraceID + } + data, err := json.Marshal(envelope) + require.NoError(t, err) + _, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) + require.NoError(t, err) + // Should not panic, should return background context + restoredSC := trace.SpanContextFromContext(gotCtx) + assert.False(t, restoredSC.IsValid()) + }) + + t.Run("valid JSON, invalid RequestBytes returns error", func(t *testing.T) { + envelope := marshaledRequestWithSpanContext{ + RequestBytes: []byte{0x01, 0x02}, // too short for uint64Encoding.Unmarshal + SpanContextJSON: nil, + } + data, err := json.Marshal(envelope) + require.NoError(t, err) + _, _, err = unmarshalRequestWithSpanContext(uint64Encoding{}, data) + require.Error(t, err) + }) +} + +type errorEncoding struct{} + +func (errorEncoding) Marshal(_ uint64) ([]byte, error) { + return nil, errors.New("marshal error") +} + +func (errorEncoding) Unmarshal(_ []byte) (uint64, error) { + return 0, nil +} + +func TestMarshalRequestWithSpanContext_MarshalError(t *testing.T) { + ctx := context.Background() + _, err := marshalRequestWithSpanContext(ctx, errorEncoding{}, uint64(123)) + require.Error(t, err) + assert.Contains(t, err.Error(), "marshal error") +} From 65c9e491e914bd076ccef74a13f7a90e5f800ee4 Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Thu, 8 May 2025 22:02:43 -0400 Subject: [PATCH 09/15] add persistentqueue benchmark test OTEL-2540 --- .../persistent_queue_benchmark_test.go | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go new file mode 100644 index 00000000000..b99864d2a3c --- /dev/null +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go @@ -0,0 +1,158 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queuebatch + +import ( + "context" + "encoding/json" + "testing" + + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pipeline" +) + +type requestTypeKeyType struct{} + +var requestTypeKey = requestTypeKeyType{} + +const ( + originalRequestValue = "original_request" + contextRequestValue = "context_request" + contextAndKeyValueRequestValue = "context_and_key_value_request" +) + +type largeRequest struct { + ID int + Meta MetaInfo + Body BodyLevel1 +} + +type MetaInfo struct { + Source string + Tags []string +} + +type BodyLevel1 struct { + Level2 BodyLevel2 + Extra string +} + +type BodyLevel2 struct { + Level3 BodyLevel3 + Values []int +} + +type BodyLevel3 struct { + Payload []byte + Note string +} + +type largeRequestEncoding struct{} + +func (largeRequestEncoding) Marshal(val largeRequest) ([]byte, error) { + return json.Marshal(val) +} + +func (largeRequestEncoding) Unmarshal(buf []byte) (largeRequest, error) { + var req largeRequest + if err := json.Unmarshal(buf, &req); err != nil { + return largeRequest{}, err + } + return req, nil +} + +func BenchmarkPersistentQueue_LargeRequests(b *testing.B) { + const ( + numRequests = 3000 + dataSize = 20 * 1024 // 20 KB + ) + // Prepare large requests + requests := make([]largeRequest, numRequests) + for i := range requests { + requests[i] = largeRequest{ + ID: i, + Meta: MetaInfo{ + Source: "benchmark", + Tags: []string{"tag1", "tag2", "tag3"}, + }, + Body: BodyLevel1{ + Level2: BodyLevel2{ + Level3: BodyLevel3{ + Payload: make([]byte, dataSize), + Note: "deep payload", + }, + Values: []int{i, i + 1, i + 2}, + }, + Extra: "extra info", + }, + } + for j := range requests[i].Body.Level2.Level3.Payload { + requests[i].Body.Level2.Level3.Payload[j] = byte((i + j) % 256) + } + } + + // Use a persistent queue with large capacity + pq := newPersistentQueue[largeRequest](persistentQueueSettings[largeRequest]{ + sizer: request.RequestsSizer[largeRequest]{}, + capacity: numRequests, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: largeRequestEncoding{}, + id: component.NewID(exportertest.NopType), + telemetry: componenttest.NewNopTelemetrySettings(), + }).(*persistentQueue[largeRequest]) + + ext := storagetest.NewMockStorageExtension(nil) + client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String()) + if err != nil { + b.Fatalf("failed to get storage client: %v", err) + } + + contextValues := []string{originalRequestValue, contextRequestValue} // , contextAndKeyValueRequestValue} + for _, value := range contextValues { + contextType := context.WithValue(context.Background(), requestTypeKey, value) + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}, + SpanID: trace.SpanID{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + TraceFlags: trace.TraceFlags(0x1), + TraceState: trace.TraceState{}, + Remote: false, + }) + sharedContext := trace.ContextWithSpanContext(contextType, sc) + pq.initClient(contextType, client) + + b.ResetTimer() + b.ReportAllocs() + + // Offer all requests + for i := 0; i < numRequests; i++ { + err := pq.Offer(sharedContext, requests[i]) + if err != nil { + b.Fatalf("Offer failed at %d: %v", i, err) + } + } + + // Read and OnDone all requests + for i := 0; i < numRequests; i++ { + _, req, done, ok := pq.Read(sharedContext) + if !ok { + b.Fatalf("Read failed at %d", i) + } + if req.ID != i { + b.Fatalf("Request ID mismatch at %d: got %d", i, req.ID) + } + done.OnDone(nil) + } + + if pq.Size() != 0 { + b.Fatalf("Queue not empty after all operations: size=%d", pq.Size()) + } + } +} From b82031e164b132c7788abd0bcbde0115a60b9e6c Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Fri, 9 May 2025 12:14:48 -0400 Subject: [PATCH 10/15] switch marshal from JSON to byte-based --- .../internal/queuebatch/persistent_queue.go | 59 ++++++++++++------- .../queuebatch/persistent_queue_test.go | 39 ++++++------ 2 files changed, 59 insertions(+), 39 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go index 34f5fac6e60..3d193b3f1dd 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "strconv" "sync" @@ -243,11 +244,6 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { return pq.putInternal(ctx, req) } -type marshaledRequestWithSpanContext struct { - RequestBytes []byte `json:"request"` - SpanContextJSON json.RawMessage `json:"span_context,omitempty"` -} - type spanContextConfigWrapper struct { TraceID string SpanID string @@ -289,31 +285,42 @@ func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContex return &sc, nil } -// unmarshalRequestWithSpanContext unmarshals a marshaledRequestWithSpanContext from bytes, returning the request -// and a context with the restored SpanContext (if present). +// unmarshalRequestWithSpanContext unmarshals a binary envelope, returning the request and a context with the restored SpanContext (if present). func unmarshalRequestWithSpanContext[T any](encoding Encoding[T], value []byte) (T, context.Context, error) { var req T restoredContext := context.Background() - var envelope marshaledRequestWithSpanContext - if err := json.Unmarshal(value, &envelope); err != nil { - return req, restoredContext, err + if len(value) < 8 { + return req, restoredContext, errors.New("envelope too short") + } + reqLen := binary.LittleEndian.Uint32(value[:4]) + if len(value) < int(4+reqLen+4) { + return req, restoredContext, errors.New("envelope too short for request") } - request, err := encoding.Unmarshal(envelope.RequestBytes) + reqBytes := value[4 : 4+reqLen] + scLen := binary.LittleEndian.Uint32(value[4+reqLen : 8+reqLen]) + if len(value) < int(8+reqLen+scLen) { + return req, restoredContext, errors.New("envelope too short for span context") + } + scBytes := value[8+reqLen : 8+reqLen+scLen] + // Unmarshal request + r, err := encoding.Unmarshal(reqBytes) if err != nil { return req, restoredContext, err } - if len(envelope.SpanContextJSON) > 0 { + req = r + // Unmarshal span context if present + if scLen > 0 { var wrapper spanContextConfigWrapper - if err := json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil { + if err := json.Unmarshal(scBytes, &wrapper); err == nil { if sc, err := spanContextFromWrapper(wrapper); err == nil && sc != nil { restoredContext = trace.ContextWithSpanContext(restoredContext, *sc) } } } - return request, restoredContext, nil + return req, restoredContext, nil } -// marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a marshaledRequestWithSpanContext envelope as bytes. +// marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a binary envelope as bytes. func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding[T], req T) ([]byte, error) { reqBuf, err := encoding.Marshal(req) if err != nil { @@ -326,14 +333,22 @@ func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding if err != nil { return nil, err } - } else { - scJSON = nil // Will be omitted due to omitempty - } - envelope := marshaledRequestWithSpanContext{ - RequestBytes: reqBuf, - SpanContextJSON: scJSON, } - return json.Marshal(envelope) + if len(reqBuf) > int(math.MaxInt32) { + return nil, fmt.Errorf("request too large to encode: %d bytes", len(reqBuf)) + } + if len(scJSON) > int(math.MaxInt32) { + return nil, fmt.Errorf("span context too large to encode: %d bytes", len(scJSON)) + } + // Compose binary envelope: [4 bytes reqLen][req][4 bytes scLen][scJSON] + buf := make([]byte, 0, 8+len(reqBuf)+len(scJSON)) + //nolint:gosec // G115: integer overflow conversion int -> uint32 + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(reqBuf))) + buf = append(buf, reqBuf...) + //nolint:gosec // G115: integer overflow conversion int -> uint32 + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(scJSON))) + buf = append(buf, scJSON...) + return buf, nil } // putInternal is the internal version that requires caller to hold the mutex lock. diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index 42ac8b38682..da4982cfe83 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -6,7 +6,6 @@ package queuebatch import ( "context" "encoding/binary" - "encoding/json" "errors" "fmt" "math" @@ -1441,28 +1440,34 @@ func TestMarshalUnmarshalRequestWithSpanContext(t *testing.T) { }) t.Run("corrupted span_context field", func(t *testing.T) { - // Manually create a bad envelope - envelope := marshaledRequestWithSpanContext{ - RequestBytes: []byte{0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, - SpanContextJSON: []byte(`{"TraceID":123}`), // invalid TraceID - } - data, err := json.Marshal(envelope) - require.NoError(t, err) - _, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) + // Manually create a bad envelope using the new binary format + requestBytes := []byte{0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + badSpanContext := []byte(`{"TraceID":123}`) // invalid TraceID + buf := make([]byte, 0, 8+len(requestBytes)+len(badSpanContext)) + //nolint:gosec // G115: integer overflow conversion int -> uint32 + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(requestBytes))) + buf = append(buf, requestBytes...) + //nolint:gosec // G115: integer overflow conversion int -> uint32 + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(badSpanContext))) + buf = append(buf, badSpanContext...) + _, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, buf) require.NoError(t, err) // Should not panic, should return background context restoredSC := trace.SpanContextFromContext(gotCtx) assert.False(t, restoredSC.IsValid()) }) - t.Run("valid JSON, invalid RequestBytes returns error", func(t *testing.T) { - envelope := marshaledRequestWithSpanContext{ - RequestBytes: []byte{0x01, 0x02}, // too short for uint64Encoding.Unmarshal - SpanContextJSON: nil, - } - data, err := json.Marshal(envelope) - require.NoError(t, err) - _, _, err = unmarshalRequestWithSpanContext(uint64Encoding{}, data) + t.Run("valid binary, invalid RequestBytes returns error", func(t *testing.T) { + requestBytes := []byte{0x01, 0x02} // too short for uint64Encoding.Unmarshal + badSpanContext := []byte{} + buf := make([]byte, 0, 8+len(requestBytes)+len(badSpanContext)) + //nolint:gosec // G115: integer overflow conversion int -> uint32 + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(requestBytes))) + buf = append(buf, requestBytes...) + //nolint:gosec // G115: integer overflow conversion int -> uint32 + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(badSpanContext))) + buf = append(buf, badSpanContext...) + _, _, err := unmarshalRequestWithSpanContext(uint64Encoding{}, buf) require.Error(t, err) }) } From c49367e2fcc894d17323d89d5ff9d8e1e5e1a37c Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Fri, 9 May 2025 14:54:56 -0400 Subject: [PATCH 11/15] switch approach to make multiple storage operations OTEL-2540 --- .../internal/queuebatch/persistent_queue.go | 202 ++++++++++-------- .../persistent_queue_benchmark_test.go | 158 -------------- .../queuebatch/persistent_queue_test.go | 119 +---------- 3 files changed, 117 insertions(+), 362 deletions(-) delete mode 100644 exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go index 3d193b3f1dd..4298a1b1b4e 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go @@ -10,7 +10,6 @@ import ( "encoding/json" "errors" "fmt" - "math" "strconv" "sync" @@ -252,16 +251,31 @@ type spanContextConfigWrapper struct { Remote bool } -func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContext, error) { - traceID, err := trace.TraceIDFromHex(wrapper.TraceID) +type spanContext trace.SpanContext + +// func (sc *spanContext) MarshalJSON() ([]byte, error) { +// return json.Marshal(sc) +// } +func (sc *spanContext) UnmarshalJSON(data []byte) error { + var scc spanContextConfigWrapper + err := json.Unmarshal(data, &scc) if err != nil { - return nil, err + fmt.Println("Error unmarshaling JSON into spanContextConfigWrapper:", err) + return err } - spanID, err := trace.SpanIDFromHex(wrapper.SpanID) + fmt.Println("Unmarshaled spanContextConfigWrapper:", scc) + + scfw, err := spanContextFromWrapper(scc) if err != nil { - return nil, err + fmt.Println("Error converting spanContextConfigWrapper to spanContext:", err) + return err } - decoded, err := hex.DecodeString(wrapper.TraceFlags) + *sc = *scfw + return nil +} + +func traceFlagsFromHex(hexStr string) (*trace.TraceFlags, error) { + decoded, err := hex.DecodeString(hexStr) if err != nil { return nil, err } @@ -269,86 +283,48 @@ func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContex return nil, errors.New(errInvalidTraceFlagsLength) } traceFlags := trace.TraceFlags(decoded[0]) + return &traceFlags, nil +} + +func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*spanContext, error) { + traceID, err := trace.TraceIDFromHex(wrapper.TraceID) + if err != nil { + return nil, err + } + spanID, err := trace.SpanIDFromHex(wrapper.SpanID) + if err != nil { + return nil, err + } + traceFlags, err := traceFlagsFromHex(wrapper.TraceFlags) + if err != nil { + return nil, err + } traceState, err := trace.ParseTraceState(wrapper.TraceState) if err != nil { return nil, err } - sc := trace.NewSpanContext(trace.SpanContextConfig{ + sc := spanContext(trace.NewSpanContext(trace.SpanContextConfig{ TraceID: traceID, SpanID: spanID, - TraceFlags: traceFlags, + TraceFlags: *traceFlags, TraceState: traceState, Remote: wrapper.Remote, - }) + })) return &sc, nil } -// unmarshalRequestWithSpanContext unmarshals a binary envelope, returning the request and a context with the restored SpanContext (if present). -func unmarshalRequestWithSpanContext[T any](encoding Encoding[T], value []byte) (T, context.Context, error) { - var req T - restoredContext := context.Background() - if len(value) < 8 { - return req, restoredContext, errors.New("envelope too short") - } - reqLen := binary.LittleEndian.Uint32(value[:4]) - if len(value) < int(4+reqLen+4) { - return req, restoredContext, errors.New("envelope too short for request") - } - reqBytes := value[4 : 4+reqLen] - scLen := binary.LittleEndian.Uint32(value[4+reqLen : 8+reqLen]) - if len(value) < int(8+reqLen+scLen) { - return req, restoredContext, errors.New("envelope too short for span context") +func getAndMarshalSpanContext(ctx context.Context) ([]byte, error) { + sc := trace.SpanContextFromContext(ctx) + if !sc.IsValid() { + return nil, nil } - scBytes := value[8+reqLen : 8+reqLen+scLen] - // Unmarshal request - r, err := encoding.Unmarshal(reqBytes) - if err != nil { - return req, restoredContext, err - } - req = r - // Unmarshal span context if present - if scLen > 0 { - var wrapper spanContextConfigWrapper - if err := json.Unmarshal(scBytes, &wrapper); err == nil { - if sc, err := spanContextFromWrapper(wrapper); err == nil && sc != nil { - restoredContext = trace.ContextWithSpanContext(restoredContext, *sc) - } - } - } - return req, restoredContext, nil -} - -// marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a binary envelope as bytes. -func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding[T], req T) ([]byte, error) { - reqBuf, err := encoding.Marshal(req) + scJSON, err := json.Marshal(sc) if err != nil { return nil, err } - sc := trace.SpanContextFromContext(ctx) - var scJSON []byte - if sc.IsValid() { - scJSON, err = json.Marshal(sc) - if err != nil { - return nil, err - } - } - if len(reqBuf) > int(math.MaxInt32) { - return nil, fmt.Errorf("request too large to encode: %d bytes", len(reqBuf)) - } - if len(scJSON) > int(math.MaxInt32) { - return nil, fmt.Errorf("span context too large to encode: %d bytes", len(scJSON)) - } - // Compose binary envelope: [4 bytes reqLen][req][4 bytes scLen][scJSON] - buf := make([]byte, 0, 8+len(reqBuf)+len(scJSON)) - //nolint:gosec // G115: integer overflow conversion int -> uint32 - buf = binary.LittleEndian.AppendUint32(buf, uint32(len(reqBuf))) - buf = append(buf, reqBuf...) - //nolint:gosec // G115: integer overflow conversion int -> uint32 - buf = binary.LittleEndian.AppendUint32(buf, uint32(len(scJSON))) - buf = append(buf, scJSON...) - return buf, nil + return scJSON, nil } // putInternal is the internal version that requires caller to hold the mutex lock. @@ -362,15 +338,23 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { return err } } - - reqBuf, err := marshalRequestWithSpanContext(ctx, pq.set.encoding, req) + var reqBuf []byte + var err error + var ops []*storage.Operation + var contextBuf []byte + reqBuf, err = pq.set.encoding.Marshal(req) + if err != nil { + return err + } + contextBuf, err = getAndMarshalSpanContext(ctx) if err != nil { return err } // Carry out a transaction where we both add the item and update the write index - ops := []*storage.Operation{ - storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1)), - storage.SetOperation(getItemKey(pq.writeIndex), reqBuf), + ops = append(ops, storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1))) + ops = append(ops, storage.SetOperation(getItemKey(pq.writeIndex), reqBuf)) + if contextBuf != nil { + ops = append(ops, storage.SetOperation(getContextKey(pq.writeIndex), contextBuf)) } if err = pq.client.Batch(ctx, ops...); err != nil { return err @@ -431,15 +415,28 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool, pq.readIndex++ pq.currentlyDispatchedItems = append(pq.currentlyDispatchedItems, index) getOp := storage.GetOperation(getItemKey(index)) - err := pq.client.Batch(ctx, - storage.SetOperation(readIndexKey, itemIndexToBytes(pq.readIndex)), - storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)), - getOp) + var ops []*storage.Operation + ctxOp := storage.GetOperation(getContextKey(index)) + ops = append(ops, storage.SetOperation(readIndexKey, itemIndexToBytes(pq.readIndex))) + ops = append(ops, storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems))) + ops = append(ops, getOp) + ops = append(ops, ctxOp) var request T restoredContext := context.Background() + err := pq.client.Batch(ctx, ops...) if err == nil { - request, restoredContext, err = unmarshalRequestWithSpanContext(pq.set.encoding, getOp.Value) + request, err = pq.set.encoding.Unmarshal(getOp.Value) + if err != nil { + return 0, request, false, restoredContext + } + var sc spanContext + if ctxOp.Value != nil { + err = json.Unmarshal(ctxOp.Value, &sc) + if err == nil && trace.SpanContext(sc).IsValid() { + restoredContext = trace.ContextWithSpanContext(restoredContext, trace.SpanContext(sc)) + } + } } if err != nil { @@ -523,12 +520,15 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co pq.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems))) - retrieveBatch := make([]*storage.Operation, len(dispatchedItems)) - cleanupBatch := make([]*storage.Operation, len(dispatchedItems)) + retrieveBatch := make([]*storage.Operation, len(dispatchedItems)*2) + cleanupBatch := make([]*storage.Operation, len(dispatchedItems)*2) for i, it := range dispatchedItems { - key := getItemKey(it) - retrieveBatch[i] = storage.GetOperation(key) - cleanupBatch[i] = storage.DeleteOperation(key) + reqKey := getItemKey(it) + ctxKey := getContextKey(it) + retrieveBatch[i*2] = storage.GetOperation(reqKey) + retrieveBatch[i*2+1] = storage.GetOperation(ctxKey) + cleanupBatch[i*2] = storage.DeleteOperation(reqKey) + cleanupBatch[i*2+1] = storage.DeleteOperation(ctxKey) } retrieveErr := pq.client.Batch(ctx, retrieveBatch...) cleanupErr := pq.client.Batch(ctx, cleanupBatch...) @@ -543,18 +543,30 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co } errCount := 0 - for _, op := range retrieveBatch { + for idx := 0; idx < len(retrieveBatch); idx += 2 { + op := retrieveBatch[idx] if op.Value == nil { pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } - req, _, err := unmarshalRequestWithSpanContext(pq.set.encoding, op.Value) + restoredContext := ctx + req, err := pq.set.encoding.Unmarshal(op.Value) + if err == nil && idx+1 < len(retrieveBatch) { + nextOp := retrieveBatch[idx+1] + if nextOp.Value != nil { + var sc *spanContext + err = json.Unmarshal(nextOp.Value, &sc) + if err == nil && sc != nil { + restoredContext = trace.ContextWithSpanContext(restoredContext, trace.SpanContext(*sc)) + } + } + } // If error happened or item is nil, it will be efficiently ignored if err != nil { pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) continue } - if pq.putInternal(ctx, req) != nil { + if pq.putInternal(restoredContext, req) != nil { errCount++ } } @@ -579,9 +591,9 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u } } - setOp := storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)) - deleteOp := storage.DeleteOperation(getItemKey(index)) - if err := pq.client.Batch(ctx, setOp, deleteOp); err != nil { + setOps := []*storage.Operation{storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems))} + deleteOps := []*storage.Operation{storage.DeleteOperation(getItemKey(index)), storage.DeleteOperation(getContextKey(index))} + if err := pq.client.Batch(ctx, append(setOps, deleteOps...)...); err != nil { // got an error, try to gracefully handle it pq.logger.Warn("Failed updating currently dispatched items, trying to delete the item first", zap.Error(err)) @@ -590,12 +602,12 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u return nil } - if err := pq.client.Batch(ctx, deleteOp); err != nil { + if err := pq.client.Batch(ctx, deleteOps...); err != nil { // Return an error here, as this indicates an issue with the underlying storage medium return fmt.Errorf("failed deleting item from queue, got error from storage: %w", err) } - if err := pq.client.Batch(ctx, setOp); err != nil { + if err := pq.client.Batch(ctx, setOps...); err != nil { // even if this fails, we still have the right dispatched items in memory // at worst, we'll have the wrong list in storage, and we'll discard the nonexistent items during startup return fmt.Errorf("failed updating currently dispatched items, but deleted item successfully: %w", err) @@ -622,6 +634,10 @@ func getItemKey(index uint64) string { return strconv.FormatUint(index, 10) } +func getContextKey(index uint64) string { + return strconv.FormatUint(index, 10) + "_context" +} + func itemIndexToBytes(value uint64) []byte { return binary.LittleEndian.AppendUint64([]byte{}, value) } diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go deleted file mode 100644 index b99864d2a3c..00000000000 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_benchmark_test.go +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package queuebatch - -import ( - "context" - "encoding/json" - "testing" - - "go.opentelemetry.io/otel/trace" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pipeline" -) - -type requestTypeKeyType struct{} - -var requestTypeKey = requestTypeKeyType{} - -const ( - originalRequestValue = "original_request" - contextRequestValue = "context_request" - contextAndKeyValueRequestValue = "context_and_key_value_request" -) - -type largeRequest struct { - ID int - Meta MetaInfo - Body BodyLevel1 -} - -type MetaInfo struct { - Source string - Tags []string -} - -type BodyLevel1 struct { - Level2 BodyLevel2 - Extra string -} - -type BodyLevel2 struct { - Level3 BodyLevel3 - Values []int -} - -type BodyLevel3 struct { - Payload []byte - Note string -} - -type largeRequestEncoding struct{} - -func (largeRequestEncoding) Marshal(val largeRequest) ([]byte, error) { - return json.Marshal(val) -} - -func (largeRequestEncoding) Unmarshal(buf []byte) (largeRequest, error) { - var req largeRequest - if err := json.Unmarshal(buf, &req); err != nil { - return largeRequest{}, err - } - return req, nil -} - -func BenchmarkPersistentQueue_LargeRequests(b *testing.B) { - const ( - numRequests = 3000 - dataSize = 20 * 1024 // 20 KB - ) - // Prepare large requests - requests := make([]largeRequest, numRequests) - for i := range requests { - requests[i] = largeRequest{ - ID: i, - Meta: MetaInfo{ - Source: "benchmark", - Tags: []string{"tag1", "tag2", "tag3"}, - }, - Body: BodyLevel1{ - Level2: BodyLevel2{ - Level3: BodyLevel3{ - Payload: make([]byte, dataSize), - Note: "deep payload", - }, - Values: []int{i, i + 1, i + 2}, - }, - Extra: "extra info", - }, - } - for j := range requests[i].Body.Level2.Level3.Payload { - requests[i].Body.Level2.Level3.Payload[j] = byte((i + j) % 256) - } - } - - // Use a persistent queue with large capacity - pq := newPersistentQueue[largeRequest](persistentQueueSettings[largeRequest]{ - sizer: request.RequestsSizer[largeRequest]{}, - capacity: numRequests, - signal: pipeline.SignalTraces, - storageID: component.ID{}, - encoding: largeRequestEncoding{}, - id: component.NewID(exportertest.NopType), - telemetry: componenttest.NewNopTelemetrySettings(), - }).(*persistentQueue[largeRequest]) - - ext := storagetest.NewMockStorageExtension(nil) - client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String()) - if err != nil { - b.Fatalf("failed to get storage client: %v", err) - } - - contextValues := []string{originalRequestValue, contextRequestValue} // , contextAndKeyValueRequestValue} - for _, value := range contextValues { - contextType := context.WithValue(context.Background(), requestTypeKey, value) - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}, - SpanID: trace.SpanID{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, - TraceFlags: trace.TraceFlags(0x1), - TraceState: trace.TraceState{}, - Remote: false, - }) - sharedContext := trace.ContextWithSpanContext(contextType, sc) - pq.initClient(contextType, client) - - b.ResetTimer() - b.ReportAllocs() - - // Offer all requests - for i := 0; i < numRequests; i++ { - err := pq.Offer(sharedContext, requests[i]) - if err != nil { - b.Fatalf("Offer failed at %d: %v", i, err) - } - } - - // Read and OnDone all requests - for i := 0; i < numRequests; i++ { - _, req, done, ok := pq.Read(sharedContext) - if !ok { - b.Fatalf("Read failed at %d", i) - } - if req.ID != i { - b.Fatalf("Request ID mismatch at %d: got %d", i, req.ID) - } - done.OnDone(nil) - } - - if pq.Size() != 0 { - b.Fatalf("Queue not empty after all operations: size=%d", pq.Size()) - } - } -} diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index da4982cfe83..f438eacd61f 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -914,7 +914,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { } func TestPersistentQueue_StorageFull(t *testing.T) { - marshaled, err := marshalRequestWithSpanContext(context.Background(), uint64Encoding{}, uint64(50)) + marshaled, err := uint64Encoding{}.Marshal(uint64(50)) require.NoError(t, err) maxSizeInBytes := len(marshaled) * 5 // arbitrary small number @@ -1318,12 +1318,13 @@ func TestSpanContextFromWrapper(t *testing.T) { } else { assert.NotNil(t, scc) if tc.expectValid { - assert.True(t, scc.IsValid()) - assert.Equal(t, tc.expectTraceID, scc.TraceID().String()) - assert.Equal(t, tc.expectSpanID, scc.SpanID().String()) - assert.Equal(t, tc.expectFlags, scc.TraceFlags().String()) - assert.Equal(t, tc.expectState, scc.TraceState().String()) - assert.Equal(t, tc.expectRemote, scc.IsRemote()) + sccObject := trace.SpanContext(*scc) + assert.True(t, sccObject.IsValid()) + assert.Equal(t, tc.expectTraceID, sccObject.TraceID().String()) + assert.Equal(t, tc.expectSpanID, sccObject.SpanID().String()) + assert.Equal(t, tc.expectFlags, sccObject.TraceFlags().String()) + assert.Equal(t, tc.expectState, sccObject.TraceState().String()) + assert.Equal(t, tc.expectRemote, sccObject.IsRemote()) } } }) @@ -1384,107 +1385,3 @@ func TestPersistentQueue_SpanContextRoundTrip(t *testing.T) { restoredSC2 := trace.SpanContextFromContext(restoredCtx2) assert.False(t, restoredSC2.IsValid()) } - -func TestMarshalUnmarshalRequestWithSpanContext(t *testing.T) { - t.Run("valid SpanContext round-trip", func(t *testing.T) { - traceID, _ := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10") - spanID, _ := trace.SpanIDFromHex("0102030405060708") - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: spanID, - TraceFlags: 0x01, - TraceState: trace.TraceState{}, - Remote: true, - }) - ctxWithSC := trace.ContextWithSpanContext(context.Background(), sc) - request := uint64(42) - data, err := marshalRequestWithSpanContext(ctxWithSC, uint64Encoding{}, request) - require.NoError(t, err) - gotReq, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) - require.NoError(t, err) - assert.Equal(t, request, gotReq) - restoredSC := trace.SpanContextFromContext(gotCtx) - assert.True(t, restoredSC.IsValid()) - assert.Equal(t, sc.TraceID(), restoredSC.TraceID()) - assert.Equal(t, sc.SpanID(), restoredSC.SpanID()) - assert.Equal(t, sc.TraceFlags(), restoredSC.TraceFlags()) - assert.Equal(t, sc.TraceState().String(), restoredSC.TraceState().String()) - assert.Equal(t, sc.IsRemote(), restoredSC.IsRemote()) - }) - - t.Run("invalid SpanContext is omitted", func(t *testing.T) { - // An invalid SpanContext (zero value) - ctxWithInvalidSC := trace.ContextWithSpanContext(context.Background(), trace.SpanContext{}) - request := uint64(99) - data, err := marshalRequestWithSpanContext(ctxWithInvalidSC, uint64Encoding{}, request) - require.NoError(t, err) - // Should not contain span_context field - assert.NotContains(t, string(data), "span_context") - gotReq, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) - require.NoError(t, err) - assert.Equal(t, request, gotReq) - restoredSC := trace.SpanContextFromContext(gotCtx) - assert.False(t, restoredSC.IsValid()) - }) - - t.Run("no SpanContext in context is omitted", func(t *testing.T) { - request := uint64(123) - data, err := marshalRequestWithSpanContext(context.Background(), uint64Encoding{}, request) - require.NoError(t, err) - assert.NotContains(t, string(data), "span_context") - gotReq, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, data) - require.NoError(t, err) - assert.Equal(t, request, gotReq) - restoredSC := trace.SpanContextFromContext(gotCtx) - assert.False(t, restoredSC.IsValid()) - }) - - t.Run("corrupted span_context field", func(t *testing.T) { - // Manually create a bad envelope using the new binary format - requestBytes := []byte{0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - badSpanContext := []byte(`{"TraceID":123}`) // invalid TraceID - buf := make([]byte, 0, 8+len(requestBytes)+len(badSpanContext)) - //nolint:gosec // G115: integer overflow conversion int -> uint32 - buf = binary.LittleEndian.AppendUint32(buf, uint32(len(requestBytes))) - buf = append(buf, requestBytes...) - //nolint:gosec // G115: integer overflow conversion int -> uint32 - buf = binary.LittleEndian.AppendUint32(buf, uint32(len(badSpanContext))) - buf = append(buf, badSpanContext...) - _, gotCtx, err := unmarshalRequestWithSpanContext(uint64Encoding{}, buf) - require.NoError(t, err) - // Should not panic, should return background context - restoredSC := trace.SpanContextFromContext(gotCtx) - assert.False(t, restoredSC.IsValid()) - }) - - t.Run("valid binary, invalid RequestBytes returns error", func(t *testing.T) { - requestBytes := []byte{0x01, 0x02} // too short for uint64Encoding.Unmarshal - badSpanContext := []byte{} - buf := make([]byte, 0, 8+len(requestBytes)+len(badSpanContext)) - //nolint:gosec // G115: integer overflow conversion int -> uint32 - buf = binary.LittleEndian.AppendUint32(buf, uint32(len(requestBytes))) - buf = append(buf, requestBytes...) - //nolint:gosec // G115: integer overflow conversion int -> uint32 - buf = binary.LittleEndian.AppendUint32(buf, uint32(len(badSpanContext))) - buf = append(buf, badSpanContext...) - _, _, err := unmarshalRequestWithSpanContext(uint64Encoding{}, buf) - require.Error(t, err) - }) -} - -type errorEncoding struct{} - -func (errorEncoding) Marshal(_ uint64) ([]byte, error) { - return nil, errors.New("marshal error") -} - -func (errorEncoding) Unmarshal(_ []byte) (uint64, error) { - return 0, nil -} - -func TestMarshalRequestWithSpanContext_MarshalError(t *testing.T) { - ctx := context.Background() - _, err := marshalRequestWithSpanContext(ctx, errorEncoding{}, uint64(123)) - require.Error(t, err) - assert.Contains(t, err.Error(), "marshal error") -} From 597c167b9cc464eaa4ebf19b765a15818d888652 Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Fri, 9 May 2025 15:08:09 -0400 Subject: [PATCH 12/15] add new benchmark with filebacked storage extension OTEL-2540 --- cmd/otelcorecol/go.mod | 1 + cmd/otelcorecol/go.sum | 2 + exporter/debugexporter/go.sum | 2 + .../queuebatch/persistent_queue_test.go | 113 ++++++++++++++++++ .../exporterhelper/xexporterhelper/go.sum | 2 + exporter/exportertest/go.sum | 2 + exporter/go.mod | 1 + exporter/go.sum | 2 + exporter/otlpexporter/go.sum | 2 + exporter/otlphttpexporter/go.sum | 2 + internal/e2e/go.sum | 2 + 11 files changed, 131 insertions(+) diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index 19de3e2151a..0c2afa5648f 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -68,6 +68,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.2.3 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/cmd/otelcorecol/go.sum b/cmd/otelcorecol/go.sum index 3e69b85ebe9..06f2f3c2c8d 100644 --- a/cmd/otelcorecol/go.sum +++ b/cmd/otelcorecol/go.sum @@ -87,6 +87,8 @@ github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mL github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/exporter/debugexporter/go.sum b/exporter/debugexporter/go.sum index 77492d1a74b..d810285183b 100644 --- a/exporter/debugexporter/go.sum +++ b/exporter/debugexporter/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index f438eacd61f..719e7daa8ed 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + contribstoragetest "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" @@ -29,6 +30,8 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/extension/xextension/storage" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" ) @@ -1385,3 +1388,113 @@ func TestPersistentQueue_SpanContextRoundTrip(t *testing.T) { restoredSC2 := trace.SpanContextFromContext(restoredCtx2) assert.False(t, restoredSC2.IsValid()) } + +// ptraceTracesEncoding implements Encoding for ptrace.Traces using ProtoMarshaler/ProtoUnmarshaler. +type ptraceTracesEncoding struct { + marshaler *ptrace.ProtoMarshaler + unmarshaler *ptrace.ProtoUnmarshaler +} + +func (e ptraceTracesEncoding) Marshal(val ptrace.Traces) ([]byte, error) { + return e.marshaler.MarshalTraces(val) +} + +func (e ptraceTracesEncoding) Unmarshal(buf []byte) (ptrace.Traces, error) { + return e.unmarshaler.UnmarshalTraces(buf) +} + +func BenchmarkPersistentQueue_PtraceTraces(b *testing.B) { + const ( + numRequests = 1000 // Fewer, as Traces can be large + spansPerRequest = 100 + payloadSize = 20 * 1024 // 20 KB per span attribute + ) + // Prepare large Traces requests + requests := make([]ptrace.Traces, numRequests) + for i := range requests { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("source", "benchmark") + ils := rs.ScopeSpans().AppendEmpty() + ils.Scope().SetName("benchscope") + for j := 0; j < spansPerRequest; j++ { + sp := ils.Spans().AppendEmpty() + sp.SetName("span-bench") + sp.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + sp.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + now := time.Now() + sp.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + sp.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second))) + payload := make([]byte, payloadSize) + for k := range payload { + payload[k] = byte((i + j + k) % 256) + } + sp.Attributes().PutEmptyBytes("payload").FromRaw(payload) + } + requests[i] = traces + } + + // Use a persistent queue with large capacity + pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ + sizer: request.RequestsSizer[ptrace.Traces]{}, + capacity: int64(numRequests), + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: ptraceTracesEncoding{marshaler: &ptrace.ProtoMarshaler{}, unmarshaler: &ptrace.ProtoUnmarshaler{}}, + id: component.NewID(exportertest.NopType), + telemetry: componenttest.NewNopTelemetrySettings(), + }).(*persistentQueue[ptrace.Traces]) + + // Set up real file-backed storage extension for the benchmark + storageDir := b.TempDir() + ext := contribstoragetest.NewFileBackedStorageExtension(storageDir, storageDir) + defer func() { + if err := ext.Shutdown(context.Background()); err != nil { + b.Fatalf("failed to shutdown storage extension: %v", err) + } + }() + + client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String()) + if err != nil { + b.Fatalf("failed to get storage client: %v", err) + } + + b.Run("BenchmarkWithPTraceEncoding", func(b *testing.B) { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}, + SpanID: trace.SpanID{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + TraceFlags: trace.TraceFlags(0x1), + TraceState: trace.TraceState{}, + Remote: false, + }) + sharedContext := trace.ContextWithSpanContext(context.Background(), sc) + pq.initClient(sharedContext, client) + + b.ResetTimer() + b.ReportAllocs() + + // Offer all requests + for i := 0; i < numRequests; i++ { + err := pq.Offer(sharedContext, requests[i]) + if err != nil { + b.Fatalf("Offer failed at %d: %v", i, err) + } + } + + // Read and OnDone all requests + for i := 0; i < numRequests; i++ { + _, req, done, ok := pq.Read(sharedContext) + if !ok { + b.Fatalf("Read failed at %d", i) + } + if req.SpanCount() != spansPerRequest { + b.Fatalf("SpanCount mismatch at %d: got %d", i, req.SpanCount()) + } + done.OnDone(nil) + } + + if pq.Size() != 0 { + b.Fatalf("Queue not empty after all operations: size=%d", pq.Size()) + } + }) +} diff --git a/exporter/exporterhelper/xexporterhelper/go.sum b/exporter/exporterhelper/xexporterhelper/go.sum index ac88a3d73cb..f33a5984051 100644 --- a/exporter/exporterhelper/xexporterhelper/go.sum +++ b/exporter/exporterhelper/xexporterhelper/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/exportertest/go.sum b/exporter/exportertest/go.sum index ac88a3d73cb..f33a5984051 100644 --- a/exporter/exportertest/go.sum +++ b/exporter/exportertest/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/go.mod b/exporter/go.mod index 3170108a386..1e01eb59cc9 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -4,6 +4,7 @@ go 1.23.0 require ( github.com/cenkalti/backoff/v5 v5.0.2 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/client v1.31.0 go.opentelemetry.io/collector/component v1.31.0 diff --git a/exporter/go.sum b/exporter/go.sum index ac88a3d73cb..f33a5984051 100644 --- a/exporter/go.sum +++ b/exporter/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index 0e1ccd4bc48..37de6ce6980 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -63,6 +63,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mLPqKctH7Uo//I= github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index e70b55a2007..271822c3c38 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -63,6 +63,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/internal/e2e/go.sum b/internal/e2e/go.sum index 9868c547d64..a2003fc83ac 100644 --- a/internal/e2e/go.sum +++ b/internal/e2e/go.sum @@ -84,6 +84,8 @@ github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mL github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= From 70604f03dac953ff0de9d526036a12c8ac12ad12 Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Fri, 9 May 2025 15:28:08 -0400 Subject: [PATCH 13/15] remove inadvertent debug code --- .../exporterhelper/internal/queuebatch/persistent_queue.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go index 4298a1b1b4e..dd3e23784a4 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go @@ -260,14 +260,10 @@ func (sc *spanContext) UnmarshalJSON(data []byte) error { var scc spanContextConfigWrapper err := json.Unmarshal(data, &scc) if err != nil { - fmt.Println("Error unmarshaling JSON into spanContextConfigWrapper:", err) return err } - fmt.Println("Unmarshaled spanContextConfigWrapper:", scc) - scfw, err := spanContextFromWrapper(scc) if err != nil { - fmt.Println("Error converting spanContextConfigWrapper to spanContext:", err) return err } *sc = *scfw From b998fc8f2132dfbb61263ca44c39f944d19c77f1 Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Fri, 9 May 2025 15:29:49 -0400 Subject: [PATCH 14/15] fix new benchark --- .../queuebatch/persistent_queue_test.go | 101 +++++++----------- 1 file changed, 39 insertions(+), 62 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index 719e7daa8ed..9b126e442d4 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -1405,39 +1405,34 @@ func (e ptraceTracesEncoding) Unmarshal(buf []byte) (ptrace.Traces, error) { func BenchmarkPersistentQueue_PtraceTraces(b *testing.B) { const ( - numRequests = 1000 // Fewer, as Traces can be large spansPerRequest = 100 payloadSize = 20 * 1024 // 20 KB per span attribute ) // Prepare large Traces requests - requests := make([]ptrace.Traces, numRequests) - for i := range requests { - traces := ptrace.NewTraces() - rs := traces.ResourceSpans().AppendEmpty() - rs.Resource().Attributes().PutStr("source", "benchmark") - ils := rs.ScopeSpans().AppendEmpty() - ils.Scope().SetName("benchscope") - for j := 0; j < spansPerRequest; j++ { - sp := ils.Spans().AppendEmpty() - sp.SetName("span-bench") - sp.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) - sp.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) - now := time.Now() - sp.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) - sp.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second))) - payload := make([]byte, payloadSize) - for k := range payload { - payload[k] = byte((i + j + k) % 256) - } - sp.Attributes().PutEmptyBytes("payload").FromRaw(payload) + req := ptrace.NewTraces() + rs := req.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("source", "benchmark") + ils := rs.ScopeSpans().AppendEmpty() + ils.Scope().SetName("benchscope") + for j := 0; j < spansPerRequest; j++ { + sp := ils.Spans().AppendEmpty() + sp.SetName("span-bench") + sp.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + sp.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + now := time.Now() + sp.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + sp.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second))) + payload := make([]byte, payloadSize) + for k := range payload { + payload[k] = byte((j + k) % 256) } - requests[i] = traces + sp.Attributes().PutEmptyBytes("payload").FromRaw(payload) } // Use a persistent queue with large capacity pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ sizer: request.RequestsSizer[ptrace.Traces]{}, - capacity: int64(numRequests), + capacity: int64(b.N), signal: pipeline.SignalTraces, storageID: component.ID{}, encoding: ptraceTracesEncoding{marshaler: &ptrace.ProtoMarshaler{}, unmarshaler: &ptrace.ProtoUnmarshaler{}}, @@ -1454,47 +1449,29 @@ func BenchmarkPersistentQueue_PtraceTraces(b *testing.B) { } }() - client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String()) - if err != nil { - b.Fatalf("failed to get storage client: %v", err) - } - - b.Run("BenchmarkWithPTraceEncoding", func(b *testing.B) { - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}, - SpanID: trace.SpanID{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, - TraceFlags: trace.TraceFlags(0x1), - TraceState: trace.TraceState{}, - Remote: false, - }) - sharedContext := trace.ContextWithSpanContext(context.Background(), sc) - pq.initClient(sharedContext, client) + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}, + SpanID: trace.SpanID{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + TraceFlags: trace.TraceFlags(0x1), + TraceState: trace.TraceState{}, + Remote: false, + }) + sharedContext := trace.ContextWithSpanContext(context.Background(), sc) + require.NoError(b, pq.Start(sharedContext, hosttest.NewHost(map[component.ID]component.Component{{}: ext}))) - b.ResetTimer() - b.ReportAllocs() + b.ResetTimer() + b.ReportAllocs() - // Offer all requests - for i := 0; i < numRequests; i++ { - err := pq.Offer(sharedContext, requests[i]) - if err != nil { - b.Fatalf("Offer failed at %d: %v", i, err) - } - } + for i := 0; i < b.N; i++ { + require.NoError(b, pq.Offer(context.Background(), req)) + } - // Read and OnDone all requests - for i := 0; i < numRequests; i++ { - _, req, done, ok := pq.Read(sharedContext) - if !ok { - b.Fatalf("Read failed at %d", i) - } - if req.SpanCount() != spansPerRequest { - b.Fatalf("SpanCount mismatch at %d: got %d", i, req.SpanCount()) - } - done.OnDone(nil) - } + for i := 0; i < b.N; i++ { + require.True(b, consume(pq, func(context.Context, ptrace.Traces) error { return nil })) + } + require.NoError(b, ext.Shutdown(context.Background())) - if pq.Size() != 0 { - b.Fatalf("Queue not empty after all operations: size=%d", pq.Size()) - } - }) + if pq.Size() != 0 { + b.Fatalf("Queue not empty after all operations: size=%d", pq.Size()) + } } From 5c95f231b0a9b99c31a7d02950fb576e4409477e Mon Sep 17 00:00:00 2001 From: jackgopack4 Date: Fri, 9 May 2025 15:56:25 -0400 Subject: [PATCH 15/15] make gotidy --- cmd/otelcorecol/go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index 0c2afa5648f..19de3e2151a 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -68,7 +68,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.2.3 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect