Skip to content

Commit 1863395

Browse files
committed
feat: added size based eviction to batcher + reflectutil adds sizeof method
1 parent 5bb2161 commit 1863395

File tree

6 files changed

+279
-1
lines changed

6 files changed

+279
-1
lines changed

batcher/batcher.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
package batcher
22

33
import (
4+
"sync/atomic"
45
"time"
6+
7+
"github.com/DmitriyVTitov/size"
58
)
69

710
// FlushCallback is the callback function that will be called when the batcher is full or the flush interval is reached
811
type FlushCallback[T any] func([]T)
912

1013
// Batcher is a batcher for any type of data
1114
type Batcher[T any] struct {
12-
maxCapacity int
15+
maxCapacity int
16+
maxSize int32
17+
18+
currentSize atomic.Int32
19+
1320
flushInterval *time.Duration
1421
flushCallback FlushCallback[T]
1522

@@ -29,6 +36,13 @@ func WithMaxCapacity[T any](maxCapacity int) BatcherOption[T] {
2936
}
3037
}
3138

39+
// WithMaxSize sets the max size of the batcher
40+
func WithMaxSize[T any](maxSize int32) BatcherOption[T] {
41+
return func(b *Batcher[T]) {
42+
b.maxSize = maxSize
43+
}
44+
}
45+
3246
// WithFlushInterval sets the optional flush interval of the batcher
3347
func WithFlushInterval[T any](flushInterval time.Duration) BatcherOption[T] {
3448
return func(b *Batcher[T]) {
@@ -53,6 +67,9 @@ func New[T any](opts ...BatcherOption[T]) *Batcher[T] {
5367
for _, opt := range opts {
5468
opt(batcher)
5569
}
70+
if batcher.maxSize > 0 {
71+
batcher.currentSize = atomic.Int32{}
72+
}
5673
batcher.incomingData = make(chan T, batcher.maxCapacity)
5774
if batcher.flushCallback == nil {
5875
panic("batcher: flush callback is required")
@@ -66,11 +83,22 @@ func New[T any](opts ...BatcherOption[T]) *Batcher[T] {
6683
// Append appends data to the batcher
6784
func (b *Batcher[T]) Append(d ...T) {
6885
for _, item := range d {
86+
sizeofItem := size.Of(item)
87+
currentSize := b.currentSize.Load()
88+
89+
if b.maxSize > 0 && currentSize+int32(sizeofItem) > int32(b.maxSize) {
90+
b.full <- true
91+
b.incomingData <- item
92+
b.currentSize.Add(int32(sizeofItem))
93+
continue
94+
}
95+
6996
if !b.put(item) {
7097
// will wait until space available
7198
b.full <- true
7299
b.incomingData <- item
73100
}
101+
b.currentSize.Add(int32(sizeofItem))
74102
}
75103
}
76104

@@ -148,6 +176,7 @@ func (b *Batcher[T]) doCallback() {
148176
for item := range b.incomingData {
149177
items[k] = item
150178
k++
179+
b.currentSize.Add(-int32(size.Of(item)))
151180
if k >= n {
152181
break
153182
}

batcher/batcher_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package batcher
22

33
import (
4+
"crypto/rand"
45
"testing"
56
"time"
67

@@ -74,3 +75,45 @@ func TestBatcherWithInterval(t *testing.T) {
7475
require.Equal(t, wanted, got)
7576
require.True(t, minWantedBatches <= gotBatches)
7677
}
78+
79+
type exampleBatcherStruct struct {
80+
Value []byte
81+
}
82+
83+
func TestBatcherWithSizeLimit(t *testing.T) {
84+
var (
85+
batchSize = 100
86+
maxSize = 1000
87+
wanted = 10
88+
gotBatches int
89+
)
90+
var failedIteration bool
91+
92+
callback := func(ta []exampleBatcherStruct) {
93+
gotBatches++
94+
95+
if len(ta) != 5 {
96+
failedIteration = true
97+
}
98+
}
99+
bat := New[exampleBatcherStruct](
100+
WithMaxCapacity[exampleBatcherStruct](batchSize),
101+
WithMaxSize[exampleBatcherStruct](int32(maxSize)),
102+
WithFlushCallback[exampleBatcherStruct](callback),
103+
)
104+
105+
bat.Run()
106+
107+
for i := 0; i < wanted; i++ {
108+
randData := make([]byte, 200)
109+
_, _ = rand.Read(randData)
110+
bat.Append(exampleBatcherStruct{Value: randData})
111+
}
112+
113+
bat.Stop()
114+
115+
bat.WaitDone()
116+
117+
require.Equal(t, 2, gotBatches)
118+
require.False(t, failedIteration)
119+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/projectdiscovery/utils
33
go 1.21
44

55
require (
6+
github.com/DmitriyVTitov/size v1.5.0
67
github.com/Masterminds/semver/v3 v3.2.1
78
github.com/Mzack9999/gcache v0.0.0-20230410081825-519e28eab057
89
github.com/andybalholm/brotli v1.0.6

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
aead.dev/minisign v0.2.0 h1:kAWrq/hBRu4AARY6AlciO83xhNnW9UaC8YipS2uhLPk=
22
aead.dev/minisign v0.2.0/go.mod h1:zdq6LdSd9TbuSxchxwhpA9zEb9YXcVGoE8JakuiGaIQ=
33
cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
4+
github.com/DmitriyVTitov/size v1.5.0 h1:/PzqxYrOyOUX1BXj6J9OuVRVGe+66VL4D9FlUaW515g=
5+
github.com/DmitriyVTitov/size v1.5.0/go.mod h1:le6rNI4CoLQV1b9gzp1+3d7hMAD/uu2QcJ+aYbNgiU0=
46
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
57
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
68
github.com/Mzack9999/gcache v0.0.0-20230410081825-519e28eab057 h1:KFac3SiGbId8ub47e7kd2PLZeACxc1LkiiNoDOFRClE=
@@ -71,6 +73,8 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU
7173
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
7274
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
7375
github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
76+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
77+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
7478
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
7579
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
7680
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

reflect/reflectutil.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,138 @@ func setUnexportedField(field reflect.Value, value interface{}) {
9494
Elem().
9595
Set(reflect.ValueOf(value))
9696
}
97+
98+
// SizeOf returns the size of 'v' in bytes.
99+
// If there is an error during calculation, Of returns -1.
100+
//
101+
// Implementation is taken from https://github.com/DmitriyVTitov/size/blob/v1.5.0/size.go#L14 which
102+
// in turn is inspired from binary.Size of stdlib
103+
func SizeOf(v interface{}) int {
104+
// Cache with every visited pointer so we don't count two pointers
105+
// to the same memory twice.
106+
cache := make(map[uintptr]bool)
107+
return sizeOf(reflect.Indirect(reflect.ValueOf(v)), cache)
108+
}
109+
110+
// sizeOf returns the number of bytes the actual data represented by v occupies in memory.
111+
// If there is an error, sizeOf returns -1.
112+
func sizeOf(v reflect.Value, cache map[uintptr]bool) int {
113+
switch v.Kind() {
114+
115+
case reflect.Array:
116+
sum := 0
117+
for i := 0; i < v.Len(); i++ {
118+
s := sizeOf(v.Index(i), cache)
119+
if s < 0 {
120+
return -1
121+
}
122+
sum += s
123+
}
124+
125+
return sum + (v.Cap()-v.Len())*int(v.Type().Elem().Size())
126+
127+
case reflect.Slice:
128+
// return 0 if this node has been visited already
129+
if cache[v.Pointer()] {
130+
return 0
131+
}
132+
cache[v.Pointer()] = true
133+
134+
sum := 0
135+
for i := 0; i < v.Len(); i++ {
136+
s := sizeOf(v.Index(i), cache)
137+
if s < 0 {
138+
return -1
139+
}
140+
sum += s
141+
}
142+
143+
sum += (v.Cap() - v.Len()) * int(v.Type().Elem().Size())
144+
145+
return sum + int(v.Type().Size())
146+
147+
case reflect.Struct:
148+
sum := 0
149+
for i, n := 0, v.NumField(); i < n; i++ {
150+
s := sizeOf(v.Field(i), cache)
151+
if s < 0 {
152+
return -1
153+
}
154+
sum += s
155+
}
156+
157+
// Look for struct padding.
158+
padding := int(v.Type().Size())
159+
for i, n := 0, v.NumField(); i < n; i++ {
160+
padding -= int(v.Field(i).Type().Size())
161+
}
162+
163+
return sum + padding
164+
165+
case reflect.String:
166+
s := v.String()
167+
hdr := (*reflect.StringHeader)(unsafe.Pointer(&s))
168+
if cache[hdr.Data] {
169+
return int(v.Type().Size())
170+
}
171+
cache[hdr.Data] = true
172+
return len(s) + int(v.Type().Size())
173+
174+
case reflect.Ptr:
175+
// return Ptr size if this node has been visited already (infinite recursion)
176+
if cache[v.Pointer()] {
177+
return int(v.Type().Size())
178+
}
179+
cache[v.Pointer()] = true
180+
if v.IsNil() {
181+
return int(reflect.New(v.Type()).Type().Size())
182+
}
183+
s := sizeOf(reflect.Indirect(v), cache)
184+
if s < 0 {
185+
return -1
186+
}
187+
return s + int(v.Type().Size())
188+
189+
case reflect.Bool,
190+
reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
191+
reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
192+
reflect.Int, reflect.Uint,
193+
reflect.Chan,
194+
reflect.Uintptr,
195+
reflect.Float32, reflect.Float64, reflect.Complex64, reflect.Complex128,
196+
reflect.Func:
197+
return int(v.Type().Size())
198+
199+
case reflect.Map:
200+
// return 0 if this node has been visited already (infinite recursion)
201+
if cache[v.Pointer()] {
202+
return 0
203+
}
204+
cache[v.Pointer()] = true
205+
sum := 0
206+
keys := v.MapKeys()
207+
for i := range keys {
208+
val := v.MapIndex(keys[i])
209+
// calculate size of key and value separately
210+
sv := sizeOf(val, cache)
211+
if sv < 0 {
212+
return -1
213+
}
214+
sum += sv
215+
sk := sizeOf(keys[i], cache)
216+
if sk < 0 {
217+
return -1
218+
}
219+
sum += sk
220+
}
221+
// Include overhead due to unused map buckets. 10.79 comes
222+
// from https://golang.org/src/runtime/map.go.
223+
return sum + int(v.Type().Size()) + int(float64(len(keys))*10.79)
224+
225+
case reflect.Interface:
226+
return sizeOf(v.Elem(), cache) + int(v.Type().Size())
227+
228+
}
229+
230+
return -1
231+
}

reflect/reflectutil_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,69 @@ func TestUnexportedField(t *testing.T) {
4949
value := GetUnexportedField(testStruct, "unexported")
5050
require.Equal(t, value, "test")
5151
}
52+
53+
// Test taken from https://github.com/DmitriyVTitov/size/blob/v1.5.0/size_test.go
54+
func TestSizeOf(t *testing.T) {
55+
tests := []struct {
56+
name string
57+
v interface{}
58+
want int
59+
}{
60+
{
61+
name: "Array",
62+
v: [3]int32{1, 2, 3}, // 3 * 4 = 12
63+
want: 12,
64+
},
65+
{
66+
name: "Slice",
67+
v: make([]int64, 2, 5), // 5 * 8 + 24 = 64
68+
want: 64,
69+
},
70+
{
71+
name: "String",
72+
v: "ABCdef", // 6 + 16 = 22
73+
want: 22,
74+
},
75+
{
76+
name: "Map",
77+
// (8 + 3 + 16) + (8 + 4 + 16) = 55
78+
// 55 + 8 + 10.79 * 2 = 84
79+
v: map[int64]string{0: "ABC", 1: "DEFG"},
80+
want: 84,
81+
},
82+
{
83+
name: "Struct",
84+
v: struct {
85+
slice []int64
86+
array [2]bool
87+
structure struct {
88+
i int8
89+
s string
90+
}
91+
}{
92+
slice: []int64{12345, 67890}, // 2 * 8 + 24 = 40
93+
array: [2]bool{true, false}, // 2 * 1 = 2
94+
structure: struct {
95+
i int8
96+
s string
97+
}{
98+
i: 5, // 1
99+
s: "abc", // 3 * 1 + 16 = 19
100+
}, // 20 + 7 (padding) = 27
101+
}, // 40 + 2 + 27 = 69 + 6 (padding) = 75
102+
want: 75,
103+
},
104+
{
105+
name: "Pointer",
106+
v: new(int64), // 8
107+
want: 8,
108+
},
109+
}
110+
for _, tt := range tests {
111+
t.Run(tt.name, func(t *testing.T) {
112+
if got := SizeOf(tt.v); got != tt.want {
113+
t.Errorf("Of() = %v, want %v", got, tt.want)
114+
}
115+
})
116+
}
117+
}

0 commit comments

Comments
 (0)