Skip to content
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `WithDefaultAttributes` to `go.opentelemetry.io/otel/metric/x` to support setting default attributes on instruments. (#8135)
- Add `Settable` to `go.opentelemetry.io/otel/metric/x` to allow reusing attribute options. (#8178)
- Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8194)
- Add `LazyFilteredSet` type in `go.opentelemetry.io/otel/attribute` to support efficient, lazy evaluation of filtered attributes and their hash. (#8230)
- Add `OfferLazy` method to `FixedSizeReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` to support lazy evaluation of dropped attributes using `LazyFilteredSet`. (#8230)

### Changed

Expand All @@ -50,6 +52,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `go.opentelemetry.io/otel/sdk/log` now unwraps error chains created with `fmt.Errorf` when deriving the `error.type` attribute from errors on log records. (#8133)
- `Set.MarshalLog` method in `go.opentelemetry.io/otel/attribute` now uses `Value.String` formatting following the [OpenTelemetry AnyValue representation for non-OTLP protocols](https://opentelemetry.io/docs/specs/otel/common/#anyvalue). (#8169)
- Optimize `go.opentelemetry.io/otel/sdk/metric` to return a drop reservoir and short-circuit `Offer` calls to the exemplar reservoir when `exemplar.AlwaysOffFilter` is configured. (#8211) (#8267)
- Improve the performance of `go.opentelemetry.io/otel/sdk/metric` when an attribute.Filter is applied to measurements by a `View` or by `x.WithDefaultAttributes`. (#8230)

Comment thread
dashpole marked this conversation as resolved.
### Deprecated

Expand Down
197 changes: 197 additions & 0 deletions attribute/lazy_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute // import "go.opentelemetry.io/otel/attribute"

import (
"math/bits"

"go.opentelemetry.io/otel/attribute/internal/xxhash"
)

// LazyFilteredSet represents an attribute Set with a filter applied lazily.
// It is designed for performance-sensitive paths where filtering results
// should only be computed if needed.
type LazyFilteredSet struct {
orig Set
mask uint64
distinct Distinct
fallback *Set
dropped []KeyValue
}

// NewLazyFilteredSet creates a new LazyFilteredSet.
// It evaluates the filter exactly once for each attribute in the set.
func NewLazyFilteredSet(set Set, filter Filter) LazyFilteredSet {
if filter == nil {
return LazyFilteredSet{orig: set, distinct: set.Equivalent()}
}
if set.Len() == 0 {
return LazyFilteredSet{orig: set, distinct: Distinct{hash: emptySet.hash}}
}

if set.Len() > 64 {
var kvs []KeyValue
var dropped []KeyValue
iter := set.Iter()
for iter.Next() {
kv := iter.Attribute()
if filter(kv) {
kvs = append(kvs, kv)
} else {
dropped = append(dropped, kv)
}
}
Comment thread
dashpole marked this conversation as resolved.
filtered := newSet(kvs)
return LazyFilteredSet{orig: set, distinct: filtered.Equivalent(), fallback: &filtered, dropped: dropped}
}
Comment thread
dashpole marked this conversation as resolved.

h := xxhash.New()
var mask uint64
iter := set.Iter()
i := 0
hasAttributes := false
for iter.Next() {
kv := iter.Attribute()
if filter(kv) {
h = hashKV(h, kv)
mask |= 1 << i
hasAttributes = true
}
i++
}

var distinct Distinct
if !hasAttributes {
distinct = Distinct{hash: emptySet.hash}
} else {
distinct = Distinct{hash: h.Sum64()}
}

return LazyFilteredSet{orig: set, mask: mask, distinct: distinct}
}

// Distinct returns the hash of the filtered attributes.
func (s LazyFilteredSet) Distinct() Distinct {
return s.distinct
}

// Filtered materializes and returns the filtered attribute set.
func (s LazyFilteredSet) Filtered() Set {
if s.fallback != nil {
return *s.fallback
}
if s.distinct == s.orig.Equivalent() {
return s.orig
}
if s.mask == 0 {
return emptySet
}

count := bits.OnesCount64(s.mask)
kvs := make([]KeyValue, 0, count)

switch d := s.orig.data.(type) {
// case [1]KeyValue is unreachable because any set of size 1 will either be
// accepted in full (short-circuiting at s.distinct == s.orig.Equivalent())
// or rejected in full (short-circuiting at s.mask == 0).
case [2]KeyValue:
for i := range 2 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [3]KeyValue:
for i := range 3 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [4]KeyValue:
for i := range 4 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [5]KeyValue:
for i := range 5 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [6]KeyValue:
for i := range 6 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [7]KeyValue:
for i := range 7 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [8]KeyValue:
for i := range 8 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [9]KeyValue:
for i := range 9 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
case [10]KeyValue:
for i := range 10 {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, d[i])
}
}
default:
iter := s.orig.Iter()
i := 0
for iter.Next() {
if s.mask&(1<<i) != 0 {
kvs = append(kvs, iter.Attribute())
}
i++
}
}

sSet := Set{
hash: s.distinct.hash,
data: computeDataFixed(kvs),
}
if sSet.data == nil {
sSet.data = computeDataReflect(kvs)
}
return sSet
}

// Dropped materializes and returns the attributes that were filtered out.
func (s LazyFilteredSet) Dropped() []KeyValue {
if s.fallback != nil {
return s.dropped
}
if s.distinct == s.orig.Equivalent() {
return nil
}
if s.mask == 0 {
return s.orig.ToSlice()
}

count := s.orig.Len() - bits.OnesCount64(s.mask)
kvs := make([]KeyValue, 0, count)

iter := s.orig.Iter()
i := 0
for iter.Next() {
if s.mask&(1<<i) == 0 {
kvs = append(kvs, iter.Attribute())
}
i++
}
return kvs
}
154 changes: 154 additions & 0 deletions attribute/lazy_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
)

func TestLazyFilteredSet(t *testing.T) {
k0 := attribute.String("k0", "v0")
k1 := attribute.String("k1", "v1")
k2 := attribute.String("k2", "v2")
s := attribute.NewSet(k0, k1, k2)
empty := attribute.NewSet()

t.Run("NilFilter", func(t *testing.T) {
ls := attribute.NewLazyFilteredSet(s, nil)
assert.Equal(t, s.Equivalent(), ls.Distinct())
assert.Equal(t, s, ls.Filtered())
assert.Empty(t, ls.Dropped())
})

t.Run("FilterAll", func(t *testing.T) {
ls := attribute.NewLazyFilteredSet(s, func(_ attribute.KeyValue) bool { return true })
assert.Equal(t, s.Equivalent(), ls.Distinct())
assert.Equal(t, s, ls.Filtered())
assert.Empty(t, ls.Dropped())
})

t.Run("FilterNone", func(t *testing.T) {
ls := attribute.NewLazyFilteredSet(s, func(_ attribute.KeyValue) bool { return false })
assert.Equal(t, empty.Equivalent(), ls.Distinct())
assert.Equal(t, empty, ls.Filtered())
assert.ElementsMatch(t, []attribute.KeyValue{k0, k1, k2}, ls.Dropped())
})

t.Run("EmptySet", func(t *testing.T) {
ls := attribute.NewLazyFilteredSet(empty, func(_ attribute.KeyValue) bool { return true })
assert.Equal(t, empty.Equivalent(), ls.Distinct())
assert.Equal(t, empty, ls.Filtered())
assert.Empty(t, ls.Dropped())
})
}

func TestLazyFilteredSetVariousSizes(t *testing.T) {
testCases := []struct {
name string
size int
filter func(attribute.KeyValue) bool
}{
// Sizes 1-10 use default filter (accept all) to guarantee full coverage of switch cases
Comment thread
dashpole marked this conversation as resolved.
Outdated
{name: "Size1", size: 1},
{name: "Size2", size: 2},
{name: "Size3", size: 3},
{name: "Size4", size: 4},
{name: "Size5", size: 5},
{name: "Size6", size: 6},
{name: "Size7", size: 7},
{name: "Size8", size: 8},
{name: "Size9", size: 9},
{name: "Size10", size: 10},
Comment thread
dashpole marked this conversation as resolved.
Outdated
// Specific boundary tests with a realistic filter
{
name: "SmallSetFiltered",
size: 3,
filter: func(kv attribute.KeyValue) bool { return kv.Value.AsInt64()%2 == 0 },
},
{
name: "MediumSetFiltered",
size: 25,
filter: func(kv attribute.KeyValue) bool { return kv.Value.AsInt64()%2 == 0 },
},
{
name: "LargeSetFiltered",
size: 70,
filter: func(kv attribute.KeyValue) bool { return kv.Value.AsInt64()%2 == 0 },
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
var kvs []attribute.KeyValue
for i := range tt.size {
kvs = append(kvs, attribute.Int(fmt.Sprintf("k%d", i), i))
}
s := attribute.NewSet(kvs...)

fltr := tt.filter
if fltr == nil {
fltr = func(kv attribute.KeyValue) bool { return kv.Value.AsInt64()%2 == 0 }
}

ls := attribute.NewLazyFilteredSet(s, fltr)

filtered, dropped := s.Filter(fltr)

assert.Equal(t, filtered.Equivalent(), ls.Distinct())
assert.Equal(t, filtered, ls.Filtered())
assert.ElementsMatch(t, dropped, ls.Dropped())
})
}
}

func TestLazyFilteredSetInconsistentFilter(t *testing.T) {
testCases := []struct {
name string
size int
}{
{
name: "SmallSet",
size: 1,
},
{
name: "LargeSet",
size: 70,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
var kvs []attribute.KeyValue
for i := range tt.size {
kvs = append(kvs, attribute.Int(fmt.Sprintf("k%d", i), i))
}
s := attribute.NewSet(kvs...)

called := 0
filter := func(_ attribute.KeyValue) bool {
called++
return called <= tt.size // True only on first pass
}

ls := attribute.NewLazyFilteredSet(s, filter)

filtered := ls.Filtered()
dropped := ls.Dropped()

assert.Equal(t, tt.size, called, "filter should be called exactly once per attribute")

assert.Equal(t, s, filtered)
assert.Empty(t, dropped)

ls.Filtered()
ls.Dropped()
assert.Equal(t, tt.size, called, "filter should NOT be called again on materialization")
})
}
}
18 changes: 18 additions & 0 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,24 @@ func BenchmarkEndToEndCounterAdd(b *testing.B) {
)
},
},
{
name: "FilteredWithExemplars",
provider: func() metric.MeterProvider {
view := NewView(
Instrument{
Name: "test.counter",
},
// Filter out one attribute from each call.
Stream{AttributeFilter: attribute.NewDenyKeysFilter("a")},
)
return NewMeterProvider(
WithView(view),
WithReader(NewManualReader()),
// Offer an Exemplar on each call.
WithExemplarFilter(exemplar.AlwaysOnFilter),
)
},
},
} {
b.Run(mp.name, func(b *testing.B) {
for _, attrsLen := range []int{1, 5, 10} {
Expand Down
Loading
Loading