Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The following configuration options can be modified:
- `sending_queue`
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping; ignored if `enabled` is `false`
- `queue_size_batches` (default = 5000): Maximum number of batches kept in memory before dropping; ignored if `enabled` is `false`
User should calculate this as `num_seconds * requests_per_second / requests_per_batch` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds
Expand All @@ -38,6 +38,8 @@ With this build tag set, additional configuration option can be enabled:
- `sending_queue`
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)
- `queue_size_bytes` (default = 2^63 - 1): Maximum number of bytes available for batches in the storage before dropping;
It is a separate limit from `queue_size_batches`, both limits are being checked when adding to the queue.

The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which,
similarly as for in-memory buffering, defaults to 5000 batches).
Expand Down
9 changes: 7 additions & 2 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ type persistentQueue struct {
}

// NewPersistentQueue creates a new queue backed by file storage; name parameter must be a unique value that identifies the queue
func NewPersistentQueue(ctx context.Context, name string, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
func NewPersistentQueue(ctx context.Context, name string, capacityBatches int, capacityBytes int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
return &persistentQueue{
logger: logger,
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, name, uint64(capacity), logger, client, unmarshaler),
storage: newPersistentContiguousStorage(ctx, name, uint64(capacityBatches), uint64(capacityBytes), logger, client, unmarshaler),
}
}

Expand Down Expand Up @@ -89,3 +89,8 @@ func (pq *persistentQueue) Stop() {
func (pq *persistentQueue) Size() int {
return int(pq.storage.size())
}

// SizeBytes returns the current sum of sizes of elements in the queue in bytes, excluding the item already in the storage channel (if any)
func (pq *persistentQueue) SizeBytes() int {
return int(pq.storage.sizeBytes())
}
49 changes: 43 additions & 6 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

func createTestQueue(extension storage.Extension, capacity int) *persistentQueue {
func createTestQueue(extension storage.Extension, capacityBatches int, capacityBytes int) *persistentQueue {
logger := zap.NewNop()

client, err := extension.GetClient(context.Background(), component.KindReceiver, config.ComponentID{}, "")
if err != nil {
panic(err)
}

wq := NewPersistentQueue(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
wq := NewPersistentQueue(context.Background(), "foo", capacityBatches, capacityBytes, logger, client, newFakeTracesRequestUnmarshalerFunc())
return wq.(*persistentQueue)
}

func TestPersistentQueue_Capacity(t *testing.T) {
func TestPersistentQueue_CapacityBatches(t *testing.T) {
path := t.TempDir()

for i := 0; i < 100; i++ {
ext := createStorageExtension(path)
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 5)
wq := createTestQueue(ext, 5, 100_000)
require.Equal(t, 0, wq.Size())

traces := newTraces(1, 10)
Expand All @@ -79,13 +79,50 @@ func TestPersistentQueue_Capacity(t *testing.T) {
}
}

func TestPersistentQueue_CapacityBytes(t *testing.T) {
path := t.TempDir()

for i := 0; i < 100; i++ {
ext := createStorageExtension(path)
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

traces := newTraces(1, 10)
req := newFakeTracesRequest(traces)

requestBytes, err := req.Marshal()
require.NoError(t, err)
requestSize := len(requestBytes)

wq := createTestQueue(ext, 1000, 5*requestSize)
require.Equal(t, 0, wq.SizeBytes())

for i := 0; i < 10; i++ {
result := wq.Produce(req)
if i < 6 {
require.True(t, result)
} else {
require.False(t, result)
}

// Let's make sure the loop picks the first element into the channel,
// so the capacity could be used in full
if i == 0 {
require.Eventually(t, func() bool {
return wq.SizeBytes() == 0
}, 5*time.Second, 10*time.Millisecond)
}
}
require.Equal(t, 5*requestSize, wq.SizeBytes())
}
}

func TestPersistentQueue_Close(t *testing.T) {
path := t.TempDir()

ext := createStorageExtension(path)
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 1001)
wq := createTestQueue(ext, 1001, 1001*10000)
traces := newTraces(1, 10)
req := newFakeTracesRequest(traces)

Expand Down Expand Up @@ -141,7 +178,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
req := newFakeTracesRequest(traces)

ext := createStorageExtension(path)
tq := createTestQueue(ext, 5000)
tq := createTestQueue(ext, 5000, 5000*10000)

defer tq.Stop()
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })
Expand Down
94 changes: 73 additions & 21 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type persistentStorage interface {
get() <-chan PersistentRequest
// size returns the current size of the persistent storage with items waiting for processing
size() uint64
// sizeBytes returns the sum of sizes of items waiting for processing in bytes
sizeBytes() uint64
// stop gracefully stops the storage
stop()
}
Expand Down Expand Up @@ -71,10 +73,11 @@ type persistentContiguousStorage struct {
client storage.Client
unmarshaler RequestUnmarshaler

putChan chan struct{}
stopChan chan struct{}
stopOnce sync.Once
capacity uint64
putChan chan struct{}
stopChan chan struct{}
stopOnce sync.Once
capacityBatches uint64
capacityBytes uint64

reqChan chan PersistentRequest

Expand All @@ -84,6 +87,7 @@ type persistentContiguousStorage struct {
currentlyDispatchedItems []itemIndex

itemsCount *atomic.Uint64
bytesCount *atomic.Uint64
}

type itemIndex uint64
Expand All @@ -97,28 +101,31 @@ const (
readIndexKey = "ri"
writeIndexKey = "wi"
currentlyDispatchedItemsKey = "di"
bytesSizeKey = "bs"
)

var (
errMaxCapacityReached = errors.New("max capacity reached")
errValueNotSet = errors.New("value not set")
errKeyNotPresentInBatch = errors.New("key was not present in get batchStruct")
errKeyNotPresentInBatch = errors.New("key was not present in batchStruct")
)

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
// The queue needs to be initialized separately using initPersistentContiguousStorage.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacityBatches uint64, capacityBytes uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
client: client,
queueName: queueName,
unmarshaler: unmarshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
reqChan: make(chan PersistentRequest),
stopChan: make(chan struct{}),
itemsCount: atomic.NewUint64(0),
logger: logger,
client: client,
queueName: queueName,
unmarshaler: unmarshaler,
capacityBatches: capacityBatches,
capacityBytes: capacityBytes,
putChan: make(chan struct{}, capacityBatches),
reqChan: make(chan PersistentRequest),
stopChan: make(chan struct{}),
itemsCount: atomic.NewUint64(0),
bytesCount: atomic.NewUint64(0),
}

initPersistentContiguousStorage(ctx, pcs)
Expand All @@ -142,7 +149,9 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, capac
func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContiguousStorage) {
var writeIndex itemIndex
var readIndex itemIndex
batch, err := newBatch(pcs).get(readIndexKey, writeIndexKey).execute(ctx)
var bytesSize uint64

batch, err := newBatch(pcs).get(readIndexKey, writeIndexKey, bytesSizeKey).execute(ctx)

if err == nil {
readIndex, err = batch.getItemIndexResult(readIndexKey)
Expand All @@ -152,6 +161,10 @@ func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContigu
writeIndex, err = batch.getItemIndexResult(writeIndexKey)
}

if err == nil {
bytesSize, err = batch.getUint64Result(bytesSizeKey)
}

if err != nil {
if errors.Is(err, errValueNotSet) {
pcs.logger.Info("Initializing new persistent queue", zap.String(zapQueueNameKey, pcs.queueName))
Expand All @@ -162,12 +175,15 @@ func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContigu
}
pcs.readIndex = 0
pcs.writeIndex = 0
bytesSize = 0
} else {
pcs.readIndex = readIndex
pcs.writeIndex = writeIndex
// bytesSize is already set
}

pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))
pcs.bytesCount.Store(bytesSize)
}

func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []PersistentRequest) {
Expand Down Expand Up @@ -217,6 +233,10 @@ func (pcs *persistentContiguousStorage) size() uint64 {
return pcs.itemsCount.Load()
}

func (pcs *persistentContiguousStorage) sizeBytes() uint64 {
return pcs.bytesCount.Load()
}

func (pcs *persistentContiguousStorage) stop() {
pcs.logger.Debug("Stopping persistentContiguousStorage", zap.String(zapQueueNameKey, pcs.queueName))
pcs.stopOnce.Do(func() {
Expand All @@ -234,17 +254,31 @@ func (pcs *persistentContiguousStorage) put(req PersistentRequest) error {
pcs.mu.Lock()
defer pcs.mu.Unlock()

if pcs.size() >= pcs.capacity {
pcs.logger.Warn("Maximum queue capacity reached", zap.String(zapQueueNameKey, pcs.queueName))
if pcs.size() >= pcs.capacityBatches {
pcs.logger.Warn("Maximum queue capacity reached: too many batches", zap.String(zapQueueNameKey, pcs.queueName))
return errMaxCapacityReached
}

oldBytesCount := pcs.sizeBytes()
itemKey := pcs.itemKey(pcs.writeIndex)
batch := newBatch(pcs).setRequest(itemKey, req)
batchSize, err := batch.getSizeByKey(itemKey)

if err != nil {
return err
}

if oldBytesCount+batchSize > pcs.capacityBytes {
pcs.logger.Warn("Maximum queue capacity reached: too many bytes", zap.String(zapQueueNameKey, pcs.queueName))
return errMaxCapacityReached
}

pcs.writeIndex++
pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))
newSize := pcs.bytesCount.Add(batchSize)

ctx := context.Background()
_, err := newBatch(pcs).setItemIndex(writeIndexKey, pcs.writeIndex).setRequest(itemKey, req).execute(ctx)
_, err = batch.setItemIndex(writeIndexKey, pcs.writeIndex).setUint64(bytesSizeKey, newSize).execute(ctx)

// Inform the loop that there's some data to process
pcs.putChan <- struct{}{}
Expand All @@ -267,9 +301,16 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Persis
pcs.itemDispatchingStart(ctx, index)

var req PersistentRequest
batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx)
var bytesSize uint64
itemKey := pcs.itemKey(index)
batch, err := newBatch(pcs).get(itemKey).execute(ctx)
if err == nil {
req, err = batch.getRequestResult(pcs.itemKey(index))
req, err = batch.getRequestResult(itemKey)
if err == nil {
bytesSize, err = batch.getSizeByKey(itemKey)
pcs.bytesCount.Sub(bytesSize)
pcs.updateBytesSize(ctx)
}
}

if err != nil || req == nil {
Expand Down Expand Up @@ -405,6 +446,17 @@ func (pcs *persistentContiguousStorage) updateReadIndex(ctx context.Context) {
}
}

func (pcs *persistentContiguousStorage) updateBytesSize(ctx context.Context) {
_, err := newBatch(pcs).
setUint64(bytesSizeKey, pcs.sizeBytes()).
execute(ctx)

if err != nil {
pcs.logger.Debug("Failed updating bytes size",
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
}
}

func (pcs *persistentContiguousStorage) itemKey(index itemIndex) string {
return strconv.FormatUint(uint64(index), 10)
}
21 changes: 21 additions & 0 deletions exporter/exporterhelper/internal/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (bof *batchStruct) execute(ctx context.Context) (*batchStruct, error) {
}

// set adds a Set operation to the batch
// Returns the batch and the size of given value after marshalling (in bytes).
func (bof *batchStruct) set(key string, value interface{}, marshal func(interface{}) ([]byte, error)) *batchStruct {
valueBytes, err := marshal(value)
if err != nil {
Expand Down Expand Up @@ -149,6 +150,13 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error)
return itemIndexArrIf.([]itemIndex), nil
}

func (bof *batchStruct) getUint64Result(key string) (uint64, error) {
// itemIndex type is declared as uint64,
// so we can use the same function to retrieve values of type uint64.
res, err := bof.getItemIndexResult(key)
return uint64(res), err
}

// setRequest adds Set operation over a given request to the batch
func (bof *batchStruct) setRequest(key string, value PersistentRequest) *batchStruct {
return bof.set(key, value, requestToBytes)
Expand All @@ -164,6 +172,10 @@ func (bof *batchStruct) setItemIndexArray(key string, value []itemIndex) *batchS
return bof.set(key, value, itemIndexArrayToBytes)
}

func (bof *batchStruct) setUint64(key string, value uint64) *batchStruct {
return bof.set(key, itemIndex(value), itemIndexToBytes)
}

func itemIndexToBytes(val interface{}) ([]byte, error) {
var buf bytes.Buffer
err := binary.Write(&buf, binary.LittleEndian, val)
Expand Down Expand Up @@ -227,3 +239,12 @@ func requestToBytes(req interface{}) ([]byte, error) {
func (bof *batchStruct) bytesToRequest(b []byte) (interface{}, error) {
return bof.pcs.unmarshaler(b)
}

func (bof *batchStruct) getSizeByKey(key string) (uint64, error) {
for i := 0; i < len(bof.operations); i++ {
if bof.operations[i].Key == key {
return uint64(len(bof.operations[i].Value)), nil
}
}
return 0, errKeyNotPresentInBatch
}
Loading