Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
40.1.1
------
- Optimize an internal code path, no user facing changes.

40.1.0
------
- Add support for configuration of receiver's buffer size - `receive-buffer-size`
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ec2 v1.187.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/jessevdk/go-flags v1.5.0
github.com/json-iterator/go v1.1.12
Expand Down
43 changes: 36 additions & 7 deletions pkg/statsd/handler_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package statsd

import (
"context"
"slices"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -36,7 +37,7 @@ func NewTagHandlerFromViper(v *viper.Viper, handler gostatsd.PipelineHandler, ta
// NewTagHandler initialises a new handler which adds unique tags, and sends metrics/events to the next handler based
// on filter rules.
func NewTagHandler(handler gostatsd.PipelineHandler, tags gostatsd.Tags, filters []Filter) *TagHandler {
tags = uniqueTags(tags, gostatsd.Tags{}) // de-dupe tags
tags = uniqueTagsSimple(tags, gostatsd.Tags{}) // de-dupe tags
return &TagHandler{
handler: handler,
tags: tags,
Expand Down Expand Up @@ -147,7 +148,7 @@ func (th *TagHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.Metric
// Returns true if the metric should be processed further, or false to drop it.
func (th *TagHandler) uniqueFilterAndAddTags(mName string, mHostname *gostatsd.Source, mTags *gostatsd.Tags) bool {
if len(th.filters) == 0 {
*mTags = uniqueTags(*mTags, th.tags)
*mTags = uniqueTagsSimple(*mTags, th.tags)
return true
}

Expand Down Expand Up @@ -193,7 +194,7 @@ func (th *TagHandler) uniqueFilterAndAddTags(mName string, mHostname *gostatsd.S

// DispatchEvent adds the unique tags from the TagHandler to the event and passes it to the next stage in the pipeline
func (th *TagHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) {
e.Tags = uniqueTags(e.Tags, th.tags)
e.Tags = uniqueTagsSimple(e.Tags, th.tags)
th.handler.DispatchEvent(ctx, e)
}

Expand All @@ -202,12 +203,40 @@ func (th *TagHandler) WaitForEvents() {
th.handler.WaitForEvents()
}

// uniqueTags returns the set of t1 | t2. It may modify the contents of t1 and t2.
func uniqueTags(t1 gostatsd.Tags, t2 gostatsd.Tags) gostatsd.Tags {
return uniqueTagsWithSeen(map[string]struct{}{}, t1, t2)
// uniqueTagsSimple returns the set of t1 | t2. It may modify the contents of t1. It will not modify the contents
// of t2.
func uniqueTagsSimple(t1 gostatsd.Tags, t2 gostatsd.Tags) gostatsd.Tags {
// This originally tracked seen tags in a map, however as the number of tags is relatively small, it's actually
// faster to do a linear scan than to put things in a map, even if the map is pre-allocated. The break-even
// point is approximately 20 unique items.
//
// Benchmarking against the https://github.com/golang/go/wiki/SliceTricks style of filtering a slice shows
// this is slightly faster, at the expense of breaking "nearly sorted" ordering. Benchmarking with a
// `.SortedString()` on the output shows that this is still better.

last := len(t1)
for idx := 1; idx < last; { // start at 1 because we know the first item will be unique.
if slices.Contains(t1[:idx-1], t1[idx]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should here be t1[:idx] to reach the last element? since the loop condition is idx < len(t1)?

(see my other comment for a failed test case)

// Delete the current item by copying the last item in to this slot, and "shrinking" the slice.
last--
t1[idx] = t1[last]
} else {
idx++
}
}
t1 = t1[:last]

for _, tag := range t2 {
if !slices.Contains(t1, tag) {
t1 = append(t1, tag)
}
}

return t1
}

// uniqueTags returns the set of (t1 | t2) - seen. It may modify the contents of t1, t2, and seen.
// uniqueTagsWithSeen returns the set of (t1 | t2) - seen. It may modify the contents of t1 and seen. It will not
// modify the contents of t2.
func uniqueTagsWithSeen(seen map[string]struct{}, t1 gostatsd.Tags, t2 gostatsd.Tags) gostatsd.Tags {
last := len(t1)
for idx := 0; idx < last; {
Expand Down
71 changes: 71 additions & 0 deletions pkg/statsd/handler_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package statsd
import (
"bytes"
"context"
"slices"
"sort"
"strconv"
"strings"
"testing"

"github.com/google/uuid"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -15,6 +18,74 @@ import (
. "github.com/atlassian/gostatsd/internal/fixtures"
)

func benchmarkUnique(b *testing.B, t1 gostatsd.Tags, t2 gostatsd.Tags) {
// Sanity check.
u1 := uniqueTagsSimple(slices.Clone(t1), slices.Clone(t2))
u2 := uniqueTagsWithSeen(map[string]struct{}{}, slices.Clone(t1), slices.Clone(t2))
u3 := uniqueTagsWithSeen(make(map[string]struct{}, len(t1)), slices.Clone(t1), slices.Clone(t2))
require.Equal(b, u1.SortedString(), u2.SortedString())
require.Equal(b, u1.SortedString(), u3.SortedString())

runBenchmark := func(name string, f func(t1, t2 gostatsd.Tags) gostatsd.Tags) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_ = f(slices.Clone(t1), slices.Clone(t2))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we don't need to clone t2, but I had the code written...

// This can be used to measure the cost of sorting.
//_ = f(slices.Clone(t1), slices.Clone(t2)).SortedString()
}
})
}

runBenchmark("original", func(t1, t2 gostatsd.Tags) gostatsd.Tags {
return uniqueTagsWithSeen(map[string]struct{}{}, t1, t2)
})

runBenchmark("prealloc", func(t1, t2 gostatsd.Tags) gostatsd.Tags {
return uniqueTagsWithSeen(make(map[string]struct{}, len(t1)), t1, t2)
})

runBenchmark("array-search", func(t1, t2 gostatsd.Tags) gostatsd.Tags {
return uniqueTagsSimple(t1, t2)
})
}

func BenchmarkUniqueTagsPractical(b *testing.B) {
// Generate 5 tags. These are the tags emitted at the call-site, and are dynamic.
dynamicTags := gostatsd.Tags{
uuid.New().String() + ":" + uuid.New().String(),
uuid.New().String() + ":" + uuid.New().String(),
uuid.New().String() + ":" + uuid.New().String(),
uuid.New().String() + ":" + uuid.New().String(),
uuid.New().String() + ":" + uuid.New().String(),
}

// Generate 1 overlapping tag, and 3 unique tags. These are the tags added by TagHandler, and are static.
staticTags := gostatsd.Tags{
dynamicTags[0],
uuid.New().String() + ":" + uuid.New().String(),
uuid.New().String() + ":" + uuid.New().String(),
uuid.New().String() + ":" + uuid.New().String(),
}
benchmarkUnique(b, dynamicTags, staticTags)
}

func BenchmarkUniqueTagsWithSeen(b *testing.B) {
for tagCount := range 30 {
b.Run(strconv.Itoa(tagCount)+"-tags", func(b *testing.B) {
originalTags := gostatsd.Tags{}
for i := 0; i < tagCount; i++ {
originalTags = append(originalTags, uuid.New().String()+":"+uuid.New().String())
}
originalTags2 := slices.Clone(originalTags)
for i := tagCount / 2; i < tagCount; i++ {
originalTags2[i] = uuid.New().String() + ":" + uuid.New().String()
}
benchmarkUnique(b, originalTags, originalTags2)
})
}
}

func TestTagStripMergesCounters(t *testing.T) {
t.Parallel()
tch := &capturingHandler{}
Expand Down
Loading