diff --git a/.chloggen/jackgopack4-add-spancontext-persistentqueue.yaml b/.chloggen/jackgopack4-add-spancontext-persistentqueue.yaml new file mode 100644 index 00000000000..ead18f6f15a --- /dev/null +++ b/.chloggen/jackgopack4-add-spancontext-persistentqueue.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: 'exporter/exporterhelper' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: 'Propagate SpanContext with requests in the persistent queue' + +# One or more tracking issues or pull requests related to the change +issues: [11740, 12212, 12934] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This change will allow internal telemetry spans to be processed when using persistent queue/storage. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/mdatagen/internal/loader_test.go b/cmd/mdatagen/internal/loader_test.go index 08dda39e6ef..a50d7e2487a 100644 --- a/cmd/mdatagen/internal/loader_test.go +++ b/cmd/mdatagen/internal/loader_test.go @@ -371,6 +371,22 @@ func TestLoadMetadata(t *testing.T) { }, }, }, + { + name: "testdata/empty_test_config.yaml", + want: Metadata{ + Type: "test", + GeneratedPackageName: "metadata", + ScopeName: "go.opentelemetry.io/collector/cmd/mdatagen/internal", + ShortFolderName: "testdata", + Tests: Tests{Host: "componenttest.NewNopHost()"}, + Status: &Status{ + Class: "receiver", + Stability: map[component.StabilityLevel][]string{ + component.StabilityLevelBeta: {"logs"}, + }, + }, + }, + }, { name: "testdata/invalid_type_rattr.yaml", want: Metadata{}, diff --git a/cmd/mdatagen/internal/testdata/empty_test_config.yaml b/cmd/mdatagen/internal/testdata/empty_test_config.yaml new file mode 100644 index 00000000000..5b13e2dff33 --- /dev/null +++ b/cmd/mdatagen/internal/testdata/empty_test_config.yaml @@ -0,0 +1,9 @@ +type: test + +status: + class: receiver + stability: + beta: [logs] + +tests: + config: diff --git a/cmd/otelcorecol/go.sum b/cmd/otelcorecol/go.sum index 3e69b85ebe9..06f2f3c2c8d 100644 --- a/cmd/otelcorecol/go.sum +++ b/cmd/otelcorecol/go.sum @@ -87,6 +87,8 @@ github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mL github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/confmap/confmap.go b/confmap/confmap.go index 9b95e4cf1bc..3040778dcd9 100644 --- a/confmap/confmap.go +++ b/confmap/confmap.go @@ -19,6 +19,7 @@ import ( "github.com/knadh/koanf/v2" encoder "go.opentelemetry.io/collector/confmap/internal/mapstructure" + "go.opentelemetry.io/collector/confmap/internal/third_party/composehook" ) const ( @@ -234,7 +235,7 @@ func decodeConfig(m *Conf, result any, errorUnused bool, skipTopLevelUnmarshaler TagName: MapstructureTag, WeaklyTypedInput: false, MatchName: caseSensitiveMatchName, - DecodeHook: mapstructure.ComposeDecodeHookFunc( + DecodeHook: composehook.ComposeDecodeHookFunc( useExpandValue(), expandNilStructPointersHookFunc(), mapstructure.StringToSliceHookFunc(","), @@ -306,6 +307,23 @@ func isStringyStructure(t reflect.Type) bool { return false } +// safeWrapDecodeHookFunc wraps a DecodeHookFuncValue to ensure fromVal is a valid `reflect.Value` +// object and therefore it is safe to call `reflect.Value` methods on fromVal. +// +// Use this only if the hook does not need to be called on untyped nil values. +// Typed nil values are safe to call and will be passed to the hook. +// See https://github.com/golang/go/issues/51649 +func safeWrapDecodeHookFunc( + f mapstructure.DecodeHookFuncValue, +) mapstructure.DecodeHookFuncValue { + return func(fromVal reflect.Value, toVal reflect.Value) (any, error) { + if !fromVal.IsValid() { + return nil, nil + } + return f(fromVal, toVal) + } +} + // When a value has been loaded from an external source via a provider, we keep both the // parsed value and the original string value. This allows us to expand the value to its // original string representation when decoding into a string field, and use the original otherwise. @@ -355,7 +373,7 @@ func useExpandValue() mapstructure.DecodeHookFuncType { // we want an unmarshaled Config to be equivalent to // Config{Thing: &SomeStruct{}} instead of Config{Thing: nil} func expandNilStructPointersHookFunc() mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { // ensure we are dealing with map to map comparison if from.Kind() == reflect.Map && to.Kind() == reflect.Map { toElem := to.Type().Elem() @@ -375,7 +393,7 @@ func expandNilStructPointersHookFunc() mapstructure.DecodeHookFuncValue { } } return from.Interface(), nil - } + }) } // mapKeyStringToMapKeyTextUnmarshalerHookFunc returns a DecodeHookFuncType that checks that a conversion from @@ -422,7 +440,7 @@ func mapKeyStringToMapKeyTextUnmarshalerHookFunc() mapstructure.DecodeHookFuncTy // unmarshalerEmbeddedStructsHookFunc provides a mechanism for embedded structs to define their own unmarshal logic, // by implementing the Unmarshaler interface. func unmarshalerEmbeddedStructsHookFunc() mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { if to.Type().Kind() != reflect.Struct { return from.Interface(), nil } @@ -455,14 +473,14 @@ func unmarshalerEmbeddedStructsHookFunc() mapstructure.DecodeHookFuncValue { } } return fromAsMap, nil - } + }) } // Provides a mechanism for individual structs to define their own unmarshal logic, // by implementing the Unmarshaler interface, unless skipTopLevelUnmarshaler is // true and the struct matches the top level object being unmarshaled. func unmarshalerHookFunc(result any, skipTopLevelUnmarshaler bool) mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { if !to.CanAddr() { return from.Interface(), nil } @@ -495,14 +513,14 @@ func unmarshalerHookFunc(result any, skipTopLevelUnmarshaler bool) mapstructure. } return unmarshaler, nil - } + }) } // marshalerHookFunc returns a DecodeHookFuncValue that checks structs that aren't // the original to see if they implement the Marshaler interface. func marshalerHookFunc(orig any) mapstructure.DecodeHookFuncValue { origType := reflect.TypeOf(orig) - return func(from reflect.Value, _ reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, _ reflect.Value) (any, error) { if from.Kind() != reflect.Struct { return from.Interface(), nil } @@ -520,7 +538,7 @@ func marshalerHookFunc(orig any) mapstructure.DecodeHookFuncValue { return nil, err } return conf.ToStringMap(), nil - } + }) } // Unmarshaler interface may be implemented by types to customize their behavior when being unmarshaled from a Conf. @@ -562,7 +580,7 @@ type Marshaler interface { // 4. configuration have no `keys` field specified, the output should be default config // - for example, input is {}, then output is Config{ Keys: ["a", "b"]} func zeroSliceHookFunc() mapstructure.DecodeHookFuncValue { - return func(from reflect.Value, to reflect.Value) (any, error) { + return safeWrapDecodeHookFunc(func(from reflect.Value, to reflect.Value) (any, error) { if to.CanSet() && to.Kind() == reflect.Slice && from.Kind() == reflect.Slice { if from.IsNil() { // input slice is nil, set output slice to nil. @@ -574,7 +592,7 @@ func zeroSliceHookFunc() mapstructure.DecodeHookFuncValue { } return from.Interface(), nil - } + }) } type moduleFactory[T any, S any] interface { diff --git a/confmap/confmap_test.go b/confmap/confmap_test.go index 65be60fcc07..026ba8f04ad 100644 --- a/confmap/confmap_test.go +++ b/confmap/confmap_test.go @@ -99,6 +99,20 @@ func TestToStringMap(t *testing.T) { } } +type testConfigAny struct { + AnyField any `mapstructure:"any_field"` +} + +func TestNilToAnyField(t *testing.T) { + stringMap := map[string]any{ + "any_field": nil, + } + conf := NewFromStringMap(stringMap) + cfg := &testConfigAny{} + require.NoError(t, conf.Unmarshal(cfg)) + assert.Nil(t, cfg.AnyField) +} + func TestExpandNilStructPointersHookFunc(t *testing.T) { stringMap := map[string]any{ "boolean": nil, diff --git a/confmap/internal/third_party/composehook/compose_hook.go b/confmap/internal/third_party/composehook/compose_hook.go new file mode 100644 index 00000000000..f51050f66ed --- /dev/null +++ b/confmap/internal/third_party/composehook/compose_hook.go @@ -0,0 +1,103 @@ +// Copyright (c) 2013 Mitchell Hashimoto +// SPDX-License-Identifier: MIT +// This code is a modified version of https://github.com/go-viper/mapstructure + +package composehook // import "go.opentelemetry.io/collector/confmap/internal/third_party/composehook" + +import ( + "errors" + "reflect" + + "github.com/go-viper/mapstructure/v2" +) + +// typedDecodeHook takes a raw DecodeHookFunc (an any) and turns +// it into the proper DecodeHookFunc type, such as DecodeHookFuncType. +func typedDecodeHook(h mapstructure.DecodeHookFunc) mapstructure.DecodeHookFunc { + // Create variables here so we can reference them with the reflect pkg + var f1 mapstructure.DecodeHookFuncType + var f2 mapstructure.DecodeHookFuncKind + var f3 mapstructure.DecodeHookFuncValue + + // Fill in the variables into this interface and the rest is done + // automatically using the reflect package. + potential := []any{f3, f1, f2} + + v := reflect.ValueOf(h) + vt := v.Type() + for _, raw := range potential { + pt := reflect.ValueOf(raw).Type() + if vt.ConvertibleTo(pt) { + return v.Convert(pt).Interface() + } + } + + return nil +} + +// cachedDecodeHook takes a raw DecodeHookFunc (an any) and turns +// it into a closure to be used directly +// if the type fails to convert we return a closure always erroring to keep the previous behavior +func cachedDecodeHook(raw mapstructure.DecodeHookFunc) func(reflect.Value, reflect.Value) (any, error) { + switch f := typedDecodeHook(raw).(type) { + case mapstructure.DecodeHookFuncType: + return func(from reflect.Value, to reflect.Value) (any, error) { + // CHANGE FROM UPSTREAM: check if from is valid and return nil if not + if !from.IsValid() { + return nil, nil + } + return f(from.Type(), to.Type(), from.Interface()) + } + case mapstructure.DecodeHookFuncKind: + return func(from reflect.Value, to reflect.Value) (any, error) { + // CHANGE FROM UPSTREAM: check if from is valid and return nil if not + if !from.IsValid() { + return nil, nil + } + return f(from.Kind(), to.Kind(), from.Interface()) + } + case mapstructure.DecodeHookFuncValue: + return func(from reflect.Value, to reflect.Value) (any, error) { + return f(from, to) + } + default: + return func(reflect.Value, reflect.Value) (any, error) { + return nil, errors.New("invalid decode hook signature") + } + } +} + +// ComposeDecodeHookFunc creates a single DecodeHookFunc that +// automatically composes multiple DecodeHookFuncs. +// +// The composed funcs are called in order, with the result of the +// previous transformation. +// +// This is a copy of [mapstructure.ComposeDecodeHookFunc] but with +// validation added. +func ComposeDecodeHookFunc(fs ...mapstructure.DecodeHookFunc) mapstructure.DecodeHookFunc { + cached := make([]func(reflect.Value, reflect.Value) (any, error), 0, len(fs)) + for _, f := range fs { + cached = append(cached, cachedDecodeHook(f)) + } + return func(f reflect.Value, t reflect.Value) (any, error) { + var err error + + // CHANGE FROM UPSTREAM: check if f is valid before calling f.Interface() + var data any + if f.IsValid() { + data = f.Interface() + } + + newFrom := f + for _, c := range cached { + data, err = c(newFrom, t) + if err != nil { + return nil, err + } + newFrom = reflect.ValueOf(data) + } + + return data, nil + } +} diff --git a/docs/release.md b/docs/release.md index 50c5d9160ea..e6994b85e9a 100644 --- a/docs/release.md +++ b/docs/release.md @@ -178,20 +178,22 @@ When considering making a bugfix release on the `v0.N.x` release cycle, the bug - Changing the configuration to an easy to find value. 2. The bug happens in common setups. To gauge this, maintainers can consider the following: - If the bug is specific to a certain platform, and if that platform is in [Tier 1](../docs/platform-support.md#tiered-platform-support-model). - - The bug happens with the default configuration or with a commonly used one (e.g. has been reported by multiple people) + - The bug happens with the default configuration or with one that is known to be used in production. 3. The bug is sufficiently severe. For example (non-exhaustive list): - The bug makes the Collector crash reliably - The bug makes the Collector fail to start under an accepted configuration - The bug produces significant data loss - The bug makes the Collector negatively affect its environment (e.g. significantly affects its host machine) + - The bug makes it difficult to troubleshoot or debug Collector setups We aim to provide a release that fixes security-related issues in at most 30 days since they are publicly announced; with the current release schedule this means security issues will typically not warrant a bugfix release. An exception is critical vulnerabilities (CVSSv3 score >= 9.0), which will warrant a release within five business days. The OpenTelemetry Collector maintainers will ultimately have the responsibility to assess if a given bug or security issue fulfills all the necessary criteria and may grant exceptions in a case-by-case basis. +If the maintainers are unable to reach consensus within one working day, we will lean towards releasing a bugfix version. ### Bugfix release procedure -The following documents the procedure to release a bugfix +The release manager of a minor version is responsible for releasing any bugfix versions on this release series. The following documents the procedure to release a bugfix 1. Create a pull request against the `release/` (e.g. `release/v0.90.x`) branch to apply the fix. 2. Make sure you are on `release/`. Prepare release commits with `prepare-release` make target, e.g. `make prepare-release PREVIOUS_VERSION=0.90.0 RELEASE_CANDIDATE=0.90.1 MODSET=beta`, and create a pull request against the `release/` branch. diff --git a/exporter/debugexporter/go.sum b/exporter/debugexporter/go.sum index 77492d1a74b..d810285183b 100644 --- a/exporter/debugexporter/go.sum +++ b/exporter/debugexporter/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go b/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go index 9687f12c31f..0dcf45a5db2 100644 --- a/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/memory_queue_test.go @@ -198,8 +198,10 @@ func BenchmarkMemoryQueueWaitForResult(b *testing.B) { wg := sync.WaitGroup{} consumed := &atomic.Int64{} q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1000, waitForResult: true}) + require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) // Consume async new data. + wg.Add(1) go func() { defer wg.Done() for { @@ -212,7 +214,6 @@ func BenchmarkMemoryQueueWaitForResult(b *testing.B) { } }() - require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go index a1106103f8b..dd3e23784a4 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue.go @@ -6,11 +6,14 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel import ( "context" "encoding/binary" + "encoding/hex" + "encoding/json" "errors" "fmt" "strconv" "sync" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.opentelemetry.io/collector/component" @@ -29,6 +32,8 @@ const ( writeIndexKey = "wi" currentlyDispatchedItemsKey = "di" queueSizeKey = "si" + + errInvalidTraceFlagsLength = "trace flags must only be 1 byte" ) var ( @@ -238,6 +243,86 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { return pq.putInternal(ctx, req) } +type spanContextConfigWrapper struct { + TraceID string + SpanID string + TraceFlags string + TraceState string + Remote bool +} + +type spanContext trace.SpanContext + +// func (sc *spanContext) MarshalJSON() ([]byte, error) { +// return json.Marshal(sc) +// } +func (sc *spanContext) UnmarshalJSON(data []byte) error { + var scc spanContextConfigWrapper + err := json.Unmarshal(data, &scc) + if err != nil { + return err + } + scfw, err := spanContextFromWrapper(scc) + if err != nil { + return err + } + *sc = *scfw + return nil +} + +func traceFlagsFromHex(hexStr string) (*trace.TraceFlags, error) { + decoded, err := hex.DecodeString(hexStr) + if err != nil { + return nil, err + } + if len(decoded) != 1 { + return nil, errors.New(errInvalidTraceFlagsLength) + } + traceFlags := trace.TraceFlags(decoded[0]) + return &traceFlags, nil +} + +func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*spanContext, error) { + traceID, err := trace.TraceIDFromHex(wrapper.TraceID) + if err != nil { + return nil, err + } + spanID, err := trace.SpanIDFromHex(wrapper.SpanID) + if err != nil { + return nil, err + } + traceFlags, err := traceFlagsFromHex(wrapper.TraceFlags) + if err != nil { + return nil, err + } + traceState, err := trace.ParseTraceState(wrapper.TraceState) + if err != nil { + return nil, err + } + + sc := spanContext(trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: *traceFlags, + TraceState: traceState, + Remote: wrapper.Remote, + })) + + return &sc, nil +} + +func getAndMarshalSpanContext(ctx context.Context) ([]byte, error) { + sc := trace.SpanContextFromContext(ctx) + if !sc.IsValid() { + return nil, nil + } + scJSON, err := json.Marshal(sc) + if err != nil { + return nil, err + } + return scJSON, nil +} + // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { reqSize := pq.set.sizer.Sizeof(req) @@ -249,16 +334,23 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { return err } } - - reqBuf, err := pq.set.encoding.Marshal(req) + var reqBuf []byte + var err error + var ops []*storage.Operation + var contextBuf []byte + reqBuf, err = pq.set.encoding.Marshal(req) + if err != nil { + return err + } + contextBuf, err = getAndMarshalSpanContext(ctx) if err != nil { return err } - // Carry out a transaction where we both add the item and update the write index - ops := []*storage.Operation{ - storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1)), - storage.SetOperation(getItemKey(pq.writeIndex), reqBuf), + ops = append(ops, storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1))) + ops = append(ops, storage.SetOperation(getItemKey(pq.writeIndex), reqBuf)) + if contextBuf != nil { + ops = append(ops, storage.SetOperation(getContextKey(pq.writeIndex), contextBuf)) } if err = pq.client.Batch(ctx, ops...); err != nil { return err @@ -291,7 +383,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don // Read until either a successful retrieved element or no more elements in the storage. for pq.readIndex != pq.writeIndex { - index, req, consumed := pq.getNextItem(ctx) + index, req, consumed, restoredContext := pq.getNextItem(ctx) // Ensure the used size and the channel size are in sync. if pq.readIndex == pq.writeIndex { pq.queueSize = 0 @@ -300,7 +392,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don if consumed { id := indexDonePool.Get().(*indexDone) id.reset(index, pq.set.sizer.Sizeof(req), pq) - return context.Background(), req, id, true + return restoredContext, req, id, true } } @@ -313,20 +405,34 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don // getNextItem pulls the next available item from the persistent storage along with its index. Once processing is // finished, the index should be called with onDone to clean up the storage. If no new item is available, // returns false. -func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) { +func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool, context.Context) { index := pq.readIndex // Increase here, so even if errors happen below, it always iterates pq.readIndex++ pq.currentlyDispatchedItems = append(pq.currentlyDispatchedItems, index) getOp := storage.GetOperation(getItemKey(index)) - err := pq.client.Batch(ctx, - storage.SetOperation(readIndexKey, itemIndexToBytes(pq.readIndex)), - storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)), - getOp) + var ops []*storage.Operation + ctxOp := storage.GetOperation(getContextKey(index)) + ops = append(ops, storage.SetOperation(readIndexKey, itemIndexToBytes(pq.readIndex))) + ops = append(ops, storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems))) + ops = append(ops, getOp) + ops = append(ops, ctxOp) var request T + restoredContext := context.Background() + err := pq.client.Batch(ctx, ops...) if err == nil { request, err = pq.set.encoding.Unmarshal(getOp.Value) + if err != nil { + return 0, request, false, restoredContext + } + var sc spanContext + if ctxOp.Value != nil { + err = json.Unmarshal(ctxOp.Value, &sc) + if err == nil && trace.SpanContext(sc).IsValid() { + restoredContext = trace.ContextWithSpanContext(restoredContext, trace.SpanContext(sc)) + } + } } if err != nil { @@ -336,14 +442,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) pq.logger.Error("Error deleting item from queue", zap.Error(err)) } - return 0, request, false + return 0, request, false, restoredContext } // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. pq.refClient++ - return index, request, true + return index, request, true, restoredContext } // onDone should be called to remove the item of the given index from the queue once processing is finished. @@ -410,12 +516,15 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co pq.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems))) - retrieveBatch := make([]*storage.Operation, len(dispatchedItems)) - cleanupBatch := make([]*storage.Operation, len(dispatchedItems)) + retrieveBatch := make([]*storage.Operation, len(dispatchedItems)*2) + cleanupBatch := make([]*storage.Operation, len(dispatchedItems)*2) for i, it := range dispatchedItems { - key := getItemKey(it) - retrieveBatch[i] = storage.GetOperation(key) - cleanupBatch[i] = storage.DeleteOperation(key) + reqKey := getItemKey(it) + ctxKey := getContextKey(it) + retrieveBatch[i*2] = storage.GetOperation(reqKey) + retrieveBatch[i*2+1] = storage.GetOperation(ctxKey) + cleanupBatch[i*2] = storage.DeleteOperation(reqKey) + cleanupBatch[i*2+1] = storage.DeleteOperation(ctxKey) } retrieveErr := pq.client.Batch(ctx, retrieveBatch...) cleanupErr := pq.client.Batch(ctx, cleanupBatch...) @@ -430,18 +539,30 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co } errCount := 0 - for _, op := range retrieveBatch { + for idx := 0; idx < len(retrieveBatch); idx += 2 { + op := retrieveBatch[idx] if op.Value == nil { pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } + restoredContext := ctx req, err := pq.set.encoding.Unmarshal(op.Value) + if err == nil && idx+1 < len(retrieveBatch) { + nextOp := retrieveBatch[idx+1] + if nextOp.Value != nil { + var sc *spanContext + err = json.Unmarshal(nextOp.Value, &sc) + if err == nil && sc != nil { + restoredContext = trace.ContextWithSpanContext(restoredContext, trace.SpanContext(*sc)) + } + } + } // If error happened or item is nil, it will be efficiently ignored if err != nil { pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) continue } - if pq.putInternal(ctx, req) != nil { + if pq.putInternal(restoredContext, req) != nil { errCount++ } } @@ -466,9 +587,9 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u } } - setOp := storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)) - deleteOp := storage.DeleteOperation(getItemKey(index)) - if err := pq.client.Batch(ctx, setOp, deleteOp); err != nil { + setOps := []*storage.Operation{storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems))} + deleteOps := []*storage.Operation{storage.DeleteOperation(getItemKey(index)), storage.DeleteOperation(getContextKey(index))} + if err := pq.client.Batch(ctx, append(setOps, deleteOps...)...); err != nil { // got an error, try to gracefully handle it pq.logger.Warn("Failed updating currently dispatched items, trying to delete the item first", zap.Error(err)) @@ -477,12 +598,12 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u return nil } - if err := pq.client.Batch(ctx, deleteOp); err != nil { + if err := pq.client.Batch(ctx, deleteOps...); err != nil { // Return an error here, as this indicates an issue with the underlying storage medium return fmt.Errorf("failed deleting item from queue, got error from storage: %w", err) } - if err := pq.client.Batch(ctx, setOp); err != nil { + if err := pq.client.Batch(ctx, setOps...); err != nil { // even if this fails, we still have the right dispatched items in memory // at worst, we'll have the wrong list in storage, and we'll discard the nonexistent items during startup return fmt.Errorf("failed updating currently dispatched items, but deleted item successfully: %w", err) @@ -509,6 +630,10 @@ func getItemKey(index uint64) string { return strconv.FormatUint(index, 10) } +func getContextKey(index uint64) string { + return strconv.FormatUint(index, 10) + "_context" +} + func itemIndexToBytes(value uint64) []byte { return binary.LittleEndian.AppendUint64([]byte{}, value) } diff --git a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go index 1d25ea8a0fa..9b126e442d4 100644 --- a/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go @@ -16,8 +16,10 @@ import ( "testing" "time" + contribstoragetest "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -28,6 +30,8 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/extension/xextension/storage" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" ) @@ -1205,3 +1209,269 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[uint defer pq.mu.Unlock() assert.ElementsMatch(t, compare, pq.currentlyDispatchedItems) } + +func TestSpanContextFromWrapper(t *testing.T) { + testCases := []struct { + name string + wrapper spanContextConfigWrapper + expectErr bool + errContains string + expectNil bool + expectValid bool + expectTraceID string + expectSpanID string + expectFlags string + expectState string + expectRemote bool + }{ + { + name: "invalid trace id", + wrapper: spanContextConfigWrapper{ + TraceID: "invalidtraceid", + SpanID: "0102030405060708", + TraceFlags: "01", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "invalid span id", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "invalidspanid", + TraceFlags: "01", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "invalid trace flags hex", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "zz", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "invalid trace flags length", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "0102", + TraceState: "", + Remote: false, + }, + expectErr: true, + expectNil: true, + errContains: errInvalidTraceFlagsLength, + }, + { + name: "invalid trace state", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "01", + TraceState: "invalid=tracestate,=bad", + Remote: false, + }, + expectErr: true, + expectNil: true, + }, + { + name: "valid span context", + wrapper: spanContextConfigWrapper{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + SpanID: "0102030405060708", + TraceFlags: "01", + TraceState: "vendor=value", + Remote: true, + }, + expectErr: false, + expectNil: false, + expectValid: true, + expectTraceID: "0102030405060708090a0b0c0d0e0f10", + expectSpanID: "0102030405060708", + expectFlags: "01", + expectState: "vendor=value", + expectRemote: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + scc, err := spanContextFromWrapper(tc.wrapper) + if tc.expectErr { + require.Error(t, err) + if tc.errContains != "" { + assert.Contains(t, err.Error(), tc.errContains) + } + } else { + require.NoError(t, err) + } + if tc.expectNil { + assert.Nil(t, scc) + } else { + assert.NotNil(t, scc) + if tc.expectValid { + sccObject := trace.SpanContext(*scc) + assert.True(t, sccObject.IsValid()) + assert.Equal(t, tc.expectTraceID, sccObject.TraceID().String()) + assert.Equal(t, tc.expectSpanID, sccObject.SpanID().String()) + assert.Equal(t, tc.expectFlags, sccObject.TraceFlags().String()) + assert.Equal(t, tc.expectState, sccObject.TraceState().String()) + assert.Equal(t, tc.expectRemote, sccObject.IsRemote()) + } + } + }) + } +} + +func TestPersistentQueue_SpanContextRoundTrip(t *testing.T) { + // Setup a minimal persistent queue using uint64Encoding and uint64 + pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ + sizer: request.RequestsSizer[uint64]{}, + capacity: 10, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: uint64Encoding{}, + id: component.NewID(exportertest.NopType), + telemetry: componenttest.NewNopTelemetrySettings(), + }).(*persistentQueue[uint64]) + + ext := storagetest.NewMockStorageExtension(nil) + client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String()) + require.NoError(t, err) + pq.initClient(context.Background(), client) + + // Create a valid SpanContext + traceID, _ := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10") + spanID, _ := trace.SpanIDFromHex("0102030405060708") + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: 0x01, + TraceState: trace.TraceState{}, + Remote: true, + }) + ctxWithSC := trace.ContextWithSpanContext(context.Background(), sc) + + // Offer a request with this context + req := uint64(42) + require.NoError(t, pq.Offer(ctxWithSC, req)) + + // Read the request and restored context + restoredCtx, gotReq, _, ok := pq.Read(context.Background()) + require.True(t, ok) + assert.Equal(t, req, gotReq) + restoredSC := trace.SpanContextFromContext(restoredCtx) + assert.True(t, restoredSC.IsValid()) + assert.Equal(t, sc.TraceID(), restoredSC.TraceID()) + assert.Equal(t, sc.SpanID(), restoredSC.SpanID()) + assert.Equal(t, sc.TraceFlags(), restoredSC.TraceFlags()) + assert.Equal(t, sc.TraceState().String(), restoredSC.TraceState().String()) + assert.Equal(t, sc.IsRemote(), restoredSC.IsRemote()) + + // Also test with a context with no SpanContext + req2 := uint64(99) + require.NoError(t, pq.Offer(context.Background(), req2)) + restoredCtx2, gotReq2, _, ok2 := pq.Read(context.Background()) + require.True(t, ok2) + assert.Equal(t, req2, gotReq2) + restoredSC2 := trace.SpanContextFromContext(restoredCtx2) + assert.False(t, restoredSC2.IsValid()) +} + +// ptraceTracesEncoding implements Encoding for ptrace.Traces using ProtoMarshaler/ProtoUnmarshaler. +type ptraceTracesEncoding struct { + marshaler *ptrace.ProtoMarshaler + unmarshaler *ptrace.ProtoUnmarshaler +} + +func (e ptraceTracesEncoding) Marshal(val ptrace.Traces) ([]byte, error) { + return e.marshaler.MarshalTraces(val) +} + +func (e ptraceTracesEncoding) Unmarshal(buf []byte) (ptrace.Traces, error) { + return e.unmarshaler.UnmarshalTraces(buf) +} + +func BenchmarkPersistentQueue_PtraceTraces(b *testing.B) { + const ( + spansPerRequest = 100 + payloadSize = 20 * 1024 // 20 KB per span attribute + ) + // Prepare large Traces requests + req := ptrace.NewTraces() + rs := req.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("source", "benchmark") + ils := rs.ScopeSpans().AppendEmpty() + ils.Scope().SetName("benchscope") + for j := 0; j < spansPerRequest; j++ { + sp := ils.Spans().AppendEmpty() + sp.SetName("span-bench") + sp.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + sp.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + now := time.Now() + sp.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + sp.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second))) + payload := make([]byte, payloadSize) + for k := range payload { + payload[k] = byte((j + k) % 256) + } + sp.Attributes().PutEmptyBytes("payload").FromRaw(payload) + } + + // Use a persistent queue with large capacity + pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ + sizer: request.RequestsSizer[ptrace.Traces]{}, + capacity: int64(b.N), + signal: pipeline.SignalTraces, + storageID: component.ID{}, + encoding: ptraceTracesEncoding{marshaler: &ptrace.ProtoMarshaler{}, unmarshaler: &ptrace.ProtoUnmarshaler{}}, + id: component.NewID(exportertest.NopType), + telemetry: componenttest.NewNopTelemetrySettings(), + }).(*persistentQueue[ptrace.Traces]) + + // Set up real file-backed storage extension for the benchmark + storageDir := b.TempDir() + ext := contribstoragetest.NewFileBackedStorageExtension(storageDir, storageDir) + defer func() { + if err := ext.Shutdown(context.Background()); err != nil { + b.Fatalf("failed to shutdown storage extension: %v", err) + } + }() + + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}, + SpanID: trace.SpanID{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + TraceFlags: trace.TraceFlags(0x1), + TraceState: trace.TraceState{}, + Remote: false, + }) + sharedContext := trace.ContextWithSpanContext(context.Background(), sc) + require.NoError(b, pq.Start(sharedContext, hosttest.NewHost(map[component.ID]component.Component{{}: ext}))) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + require.NoError(b, pq.Offer(context.Background(), req)) + } + + for i := 0; i < b.N; i++ { + require.True(b, consume(pq, func(context.Context, ptrace.Traces) error { return nil })) + } + require.NoError(b, ext.Shutdown(context.Background())) + + if pq.Size() != 0 { + b.Fatalf("Queue not empty after all operations: size=%d", pq.Size()) + } +} diff --git a/exporter/exporterhelper/xexporterhelper/go.sum b/exporter/exporterhelper/xexporterhelper/go.sum index ac88a3d73cb..f33a5984051 100644 --- a/exporter/exporterhelper/xexporterhelper/go.sum +++ b/exporter/exporterhelper/xexporterhelper/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/exportertest/go.sum b/exporter/exportertest/go.sum index ac88a3d73cb..f33a5984051 100644 --- a/exporter/exportertest/go.sum +++ b/exporter/exportertest/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/go.mod b/exporter/go.mod index 3170108a386..1e01eb59cc9 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -4,6 +4,7 @@ go 1.23.0 require ( github.com/cenkalti/backoff/v5 v5.0.2 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/client v1.31.0 go.opentelemetry.io/collector/component v1.31.0 diff --git a/exporter/go.sum b/exporter/go.sum index ac88a3d73cb..f33a5984051 100644 --- a/exporter/go.sum +++ b/exporter/go.sum @@ -47,6 +47,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index 0e1ccd4bc48..37de6ce6980 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -63,6 +63,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mLPqKctH7Uo//I= github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index e70b55a2007..271822c3c38 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -63,6 +63,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/internal/e2e/go.sum b/internal/e2e/go.sum index 9868c547d64..a2003fc83ac 100644 --- a/internal/e2e/go.sum +++ b/internal/e2e/go.sum @@ -84,6 +84,8 @@ github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mL github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0 h1:cT04h6LstyuEKyonbMnz8Tz8U0+0vuPUxeNXBKmO2dc= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.125.0/go.mod h1:GeOnmgaagn8/jNZlAsn4Pr6oDdS0xzGDkh6wh1ojMv4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=