From c1598a9eeaf813c7310b647f7dc6c59953a5928a Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Mon, 25 Jul 2022 17:20:52 +0200 Subject: [PATCH 1/2] [exporterhelper] Allow to specify max queue size in bytes --- exporter/exporterhelper/README.md | 2 + .../internal/persistent_queue.go | 9 +- .../internal/persistent_queue_test.go | 49 ++++++++-- .../internal/persistent_storage.go | 94 ++++++++++++++----- .../internal/persistent_storage_batch.go | 21 +++++ .../internal/persistent_storage_batch_test.go | 16 +++- .../internal/persistent_storage_test.go | 11 ++- exporter/exporterhelper/obsreport.go | 7 ++ .../queued_retry_experimental.go | 25 ++++- 9 files changed, 193 insertions(+), 41 deletions(-) diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 51bd4383096..6258b0f87a1 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -38,6 +38,8 @@ With this build tag set, additional configuration option can be enabled: - `sending_queue` - `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension (note, `enable_unstable` build tag needs to be enabled first, see below for more details) + - `queue_size_bytes` (default = 2^63 - 1): Maximum number of bytes available for batches in the storage before dropping; + It is a separate limit from `queue_size`, both limits are being checked when adding to the queue. The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which, similarly as for in-memory buffering, defaults to 5000 batches). diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index 3b5012bb071..1169eca6ef9 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -37,11 +37,11 @@ type persistentQueue struct { } // NewPersistentQueue creates a new queue backed by file storage; name parameter must be a unique value that identifies the queue -func NewPersistentQueue(ctx context.Context, name string, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(ctx context.Context, name string, capacityBatches int, capacityBytes int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { return &persistentQueue{ logger: logger, stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, name, uint64(capacity), logger, client, unmarshaler), + storage: newPersistentContiguousStorage(ctx, name, uint64(capacityBatches), uint64(capacityBytes), logger, client, unmarshaler), } } @@ -89,3 +89,8 @@ func (pq *persistentQueue) Stop() { func (pq *persistentQueue) Size() int { return int(pq.storage.size()) } + +// SizeBytes returns the current sum of sizes of elements in the queue in bytes, excluding the item already in the storage channel (if any) +func (pq *persistentQueue) SizeBytes() int { + return int(pq.storage.sizeBytes()) +} diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 417d0fa1ef9..f354095cf0a 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -34,7 +34,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -func createTestQueue(extension storage.Extension, capacity int) *persistentQueue { +func createTestQueue(extension storage.Extension, capacityBatches int, capacityBytes int) *persistentQueue { logger := zap.NewNop() client, err := extension.GetClient(context.Background(), component.KindReceiver, config.ComponentID{}, "") @@ -42,18 +42,18 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue panic(err) } - wq := NewPersistentQueue(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + wq := NewPersistentQueue(context.Background(), "foo", capacityBatches, capacityBytes, logger, client, newFakeTracesRequestUnmarshalerFunc()) return wq.(*persistentQueue) } -func TestPersistentQueue_Capacity(t *testing.T) { +func TestPersistentQueue_CapacityBatches(t *testing.T) { path := t.TempDir() for i := 0; i < 100; i++ { ext := createStorageExtension(path) t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) - wq := createTestQueue(ext, 5) + wq := createTestQueue(ext, 5, 100_000) require.Equal(t, 0, wq.Size()) traces := newTraces(1, 10) @@ -79,13 +79,50 @@ func TestPersistentQueue_Capacity(t *testing.T) { } } +func TestPersistentQueue_CapacityBytes(t *testing.T) { + path := t.TempDir() + + for i := 0; i < 100; i++ { + ext := createStorageExtension(path) + t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) + + traces := newTraces(1, 10) + req := newFakeTracesRequest(traces) + + requestBytes, err := req.Marshal() + require.NoError(t, err) + requestSize := len(requestBytes) + + wq := createTestQueue(ext, 1000, 5*requestSize) + require.Equal(t, 0, wq.SizeBytes()) + + for i := 0; i < 10; i++ { + result := wq.Produce(req) + if i < 6 { + require.True(t, result) + } else { + require.False(t, result) + } + + // Let's make sure the loop picks the first element into the channel, + // so the capacity could be used in full + if i == 0 { + require.Eventually(t, func() bool { + return wq.SizeBytes() == 0 + }, 5*time.Second, 10*time.Millisecond) + } + } + require.Equal(t, 5*requestSize, wq.SizeBytes()) + } +} + func TestPersistentQueue_Close(t *testing.T) { path := t.TempDir() ext := createStorageExtension(path) t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) - wq := createTestQueue(ext, 1001) + wq := createTestQueue(ext, 1001, 1001*10000) traces := newTraces(1, 10) req := newFakeTracesRequest(traces) @@ -141,7 +178,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { req := newFakeTracesRequest(traces) ext := createStorageExtension(path) - tq := createTestQueue(ext, 5000) + tq := createTestQueue(ext, 5000, 5000*10000) defer tq.Stop() t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index a8e9e715a5d..e9b49ebac43 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -37,6 +37,8 @@ type persistentStorage interface { get() <-chan PersistentRequest // size returns the current size of the persistent storage with items waiting for processing size() uint64 + // sizeBytes returns the sum of sizes of items waiting for processing in bytes + sizeBytes() uint64 // stop gracefully stops the storage stop() } @@ -71,10 +73,11 @@ type persistentContiguousStorage struct { client storage.Client unmarshaler RequestUnmarshaler - putChan chan struct{} - stopChan chan struct{} - stopOnce sync.Once - capacity uint64 + putChan chan struct{} + stopChan chan struct{} + stopOnce sync.Once + capacityBatches uint64 + capacityBytes uint64 reqChan chan PersistentRequest @@ -84,6 +87,7 @@ type persistentContiguousStorage struct { currentlyDispatchedItems []itemIndex itemsCount *atomic.Uint64 + bytesCount *atomic.Uint64 } type itemIndex uint64 @@ -97,28 +101,31 @@ const ( readIndexKey = "ri" writeIndexKey = "wi" currentlyDispatchedItemsKey = "di" + bytesSizeKey = "bs" ) var ( errMaxCapacityReached = errors.New("max capacity reached") errValueNotSet = errors.New("value not set") - errKeyNotPresentInBatch = errors.New("key was not present in get batchStruct") + errKeyNotPresentInBatch = errors.New("key was not present in batchStruct") ) // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. // The queue needs to be initialized separately using initPersistentContiguousStorage. -func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, capacityBatches uint64, capacityBytes uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, - queueName: queueName, - unmarshaler: unmarshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), - reqChan: make(chan PersistentRequest), - stopChan: make(chan struct{}), - itemsCount: atomic.NewUint64(0), + logger: logger, + client: client, + queueName: queueName, + unmarshaler: unmarshaler, + capacityBatches: capacityBatches, + capacityBytes: capacityBytes, + putChan: make(chan struct{}, capacityBatches), + reqChan: make(chan PersistentRequest), + stopChan: make(chan struct{}), + itemsCount: atomic.NewUint64(0), + bytesCount: atomic.NewUint64(0), } initPersistentContiguousStorage(ctx, pcs) @@ -142,7 +149,9 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, capac func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContiguousStorage) { var writeIndex itemIndex var readIndex itemIndex - batch, err := newBatch(pcs).get(readIndexKey, writeIndexKey).execute(ctx) + var bytesSize uint64 + + batch, err := newBatch(pcs).get(readIndexKey, writeIndexKey, bytesSizeKey).execute(ctx) if err == nil { readIndex, err = batch.getItemIndexResult(readIndexKey) @@ -152,6 +161,10 @@ func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContigu writeIndex, err = batch.getItemIndexResult(writeIndexKey) } + if err == nil { + bytesSize, err = batch.getUint64Result(bytesSizeKey) + } + if err != nil { if errors.Is(err, errValueNotSet) { pcs.logger.Info("Initializing new persistent queue", zap.String(zapQueueNameKey, pcs.queueName)) @@ -162,12 +175,15 @@ func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContigu } pcs.readIndex = 0 pcs.writeIndex = 0 + bytesSize = 0 } else { pcs.readIndex = readIndex pcs.writeIndex = writeIndex + // bytesSize is already set } pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex)) + pcs.bytesCount.Store(bytesSize) } func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []PersistentRequest) { @@ -217,6 +233,10 @@ func (pcs *persistentContiguousStorage) size() uint64 { return pcs.itemsCount.Load() } +func (pcs *persistentContiguousStorage) sizeBytes() uint64 { + return pcs.bytesCount.Load() +} + func (pcs *persistentContiguousStorage) stop() { pcs.logger.Debug("Stopping persistentContiguousStorage", zap.String(zapQueueNameKey, pcs.queueName)) pcs.stopOnce.Do(func() { @@ -234,17 +254,31 @@ func (pcs *persistentContiguousStorage) put(req PersistentRequest) error { pcs.mu.Lock() defer pcs.mu.Unlock() - if pcs.size() >= pcs.capacity { - pcs.logger.Warn("Maximum queue capacity reached", zap.String(zapQueueNameKey, pcs.queueName)) + if pcs.size() >= pcs.capacityBatches { + pcs.logger.Warn("Maximum queue capacity reached: too many batches", zap.String(zapQueueNameKey, pcs.queueName)) return errMaxCapacityReached } + oldBytesCount := pcs.sizeBytes() itemKey := pcs.itemKey(pcs.writeIndex) + batch := newBatch(pcs).setRequest(itemKey, req) + batchSize, err := batch.getSizeByKey(itemKey) + + if err != nil { + return err + } + + if oldBytesCount+batchSize > pcs.capacityBytes { + pcs.logger.Warn("Maximum queue capacity reached: too many bytes", zap.String(zapQueueNameKey, pcs.queueName)) + return errMaxCapacityReached + } + pcs.writeIndex++ pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex)) + newSize := pcs.bytesCount.Add(batchSize) ctx := context.Background() - _, err := newBatch(pcs).setItemIndex(writeIndexKey, pcs.writeIndex).setRequest(itemKey, req).execute(ctx) + _, err = batch.setItemIndex(writeIndexKey, pcs.writeIndex).setUint64(bytesSizeKey, newSize).execute(ctx) // Inform the loop that there's some data to process pcs.putChan <- struct{}{} @@ -267,9 +301,16 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Persis pcs.itemDispatchingStart(ctx, index) var req PersistentRequest - batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx) + var bytesSize uint64 + itemKey := pcs.itemKey(index) + batch, err := newBatch(pcs).get(itemKey).execute(ctx) if err == nil { - req, err = batch.getRequestResult(pcs.itemKey(index)) + req, err = batch.getRequestResult(itemKey) + if err == nil { + bytesSize, err = batch.getSizeByKey(itemKey) + pcs.bytesCount.Sub(bytesSize) + pcs.updateBytesSize(ctx) + } } if err != nil || req == nil { @@ -405,6 +446,17 @@ func (pcs *persistentContiguousStorage) updateReadIndex(ctx context.Context) { } } +func (pcs *persistentContiguousStorage) updateBytesSize(ctx context.Context) { + _, err := newBatch(pcs). + setUint64(bytesSizeKey, pcs.sizeBytes()). + execute(ctx) + + if err != nil { + pcs.logger.Debug("Failed updating bytes size", + zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err)) + } +} + func (pcs *persistentContiguousStorage) itemKey(index itemIndex) string { return strconv.FormatUint(uint64(index), 10) } diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 3dfcb5f2f16..67a3f61c4b4 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -59,6 +59,7 @@ func (bof *batchStruct) execute(ctx context.Context) (*batchStruct, error) { } // set adds a Set operation to the batch +// Returns the batch and the size of given value after marshalling (in bytes). func (bof *batchStruct) set(key string, value interface{}, marshal func(interface{}) ([]byte, error)) *batchStruct { valueBytes, err := marshal(value) if err != nil { @@ -149,6 +150,13 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) return itemIndexArrIf.([]itemIndex), nil } +func (bof *batchStruct) getUint64Result(key string) (uint64, error) { + // itemIndex type is declared as uint64, + // so we can use the same function to retrieve values of type uint64. + res, err := bof.getItemIndexResult(key) + return uint64(res), err +} + // setRequest adds Set operation over a given request to the batch func (bof *batchStruct) setRequest(key string, value PersistentRequest) *batchStruct { return bof.set(key, value, requestToBytes) @@ -164,6 +172,10 @@ func (bof *batchStruct) setItemIndexArray(key string, value []itemIndex) *batchS return bof.set(key, value, itemIndexArrayToBytes) } +func (bof *batchStruct) setUint64(key string, value uint64) *batchStruct { + return bof.set(key, itemIndex(value), itemIndexToBytes) +} + func itemIndexToBytes(val interface{}) ([]byte, error) { var buf bytes.Buffer err := binary.Write(&buf, binary.LittleEndian, val) @@ -227,3 +239,12 @@ func requestToBytes(req interface{}) ([]byte, error) { func (bof *batchStruct) bytesToRequest(b []byte) (interface{}, error) { return bof.pcs.unmarshaler(b) } + +func (bof *batchStruct) getSizeByKey(key string) (uint64, error) { + for i := 0; i < len(bof.operations); i++ { + if bof.operations[i].Key == key { + return uint64(len(bof.operations[i].Value)), nil + } + } + return 0, errKeyNotPresentInBatch +} diff --git a/exporter/exporterhelper/internal/persistent_storage_batch_test.go b/exporter/exporterhelper/internal/persistent_storage_batch_test.go index 9a7aef221b8..5ebfb4d2b12 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch_test.go @@ -33,16 +33,18 @@ func TestPersistentStorageBatch_Operations(t *testing.T) { itemIndexValue := itemIndex(123) itemIndexArrayValue := []itemIndex{itemIndex(1), itemIndex(2)} + uint64Value := uint64(42) _, err := newBatch(ps). setItemIndex("index", itemIndexValue). setItemIndexArray("arr", itemIndexArrayValue). + setUint64("num", uint64Value). execute(context.Background()) require.NoError(t, err) batch, err := newBatch(ps). - get("index", "arr"). + get("index", "arr", "num"). execute(context.Background()) require.NoError(t, err) @@ -54,18 +56,26 @@ func TestPersistentStorageBatch_Operations(t *testing.T) { require.NoError(t, err) require.Equal(t, itemIndexArrayValue, retrievedItemIndexArrayValue) - _, err = newBatch(ps).delete("index", "arr").execute(context.Background()) + retrievedUint64Value, err := batch.getUint64Result("num") + require.NoError(t, err) + require.Equal(t, uint64Value, retrievedUint64Value) + + _, err = newBatch(ps).delete("index", "arr", "num").execute(context.Background()) require.NoError(t, err) batch, err = newBatch(ps). - get("index", "arr"). + get("index", "arr", "num"). execute(context.Background()) require.NoError(t, err) _, err = batch.getItemIndexResult("index") require.Error(t, err, errValueNotSet) + _, err = batch.getUint64Result("num") + require.Error(t, err, errValueNotSet) + retrievedItemIndexArrayValue, err = batch.getItemIndexArrayResult("arr") require.NoError(t, err) require.Nil(t, retrievedItemIndexArrayValue) + } diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index a08231fda65..d0322989fe6 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "math" "reflect" "sync" "testing" @@ -48,13 +49,13 @@ func createTestClient(extension storage.Extension) storage.Client { return client } -func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) +func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacityBatches uint64, capacityBytes uint64) *persistentContiguousStorage { + return newPersistentContiguousStorage(context.Background(), "foo", capacityBatches, capacityBytes, logger, client, newFakeTracesRequestUnmarshalerFunc()) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { logger := zap.NewNop() - return createTestPersistentStorageWithLoggingAndCapacity(client, logger, 1000) + return createTestPersistentStorageWithLoggingAndCapacity(client, logger, 1000, math.MaxUint64) } type fakeTracesRequest struct { @@ -317,7 +318,7 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { // No more items ext := createStorageExtension(path) - wq := createTestQueue(ext, 5000) + wq := createTestQueue(ext, 5000, 5000*10000) require.Equal(t, 0, wq.Size()) require.NoError(t, ext.Shutdown(context.Background())) } @@ -364,7 +365,7 @@ func BenchmarkPersistentStorage_TraceSpans(b *testing.B) { path := bb.TempDir() ext := createStorageExtension(path) client := createTestClient(ext) - ps := createTestPersistentStorageWithLoggingAndCapacity(client, zap.NewNop(), 10000000) + ps := createTestPersistentStorageWithLoggingAndCapacity(client, zap.NewNop(), 10_000_000, 10_000_000*2_000_000) traces := newTraces(c.numTraces, c.numSpansPerTrace) req := newFakeTracesRequest(traces) diff --git a/exporter/exporterhelper/obsreport.go b/exporter/exporterhelper/obsreport.go index 011c3fc99c2..9a885135902 100644 --- a/exporter/exporterhelper/obsreport.go +++ b/exporter/exporterhelper/obsreport.go @@ -41,6 +41,7 @@ type instruments struct { registry *metric.Registry queueSize *metric.Int64DerivedGauge queueCapacity *metric.Int64DerivedGauge + queueCapacityBytes *metric.Int64DerivedGauge failedToEnqueueTraceSpans *metric.Int64Cumulative failedToEnqueueMetricPoints *metric.Int64Cumulative failedToEnqueueLogRecords *metric.Int64Cumulative @@ -62,6 +63,12 @@ func newInstruments(registry *metric.Registry) *instruments { metric.WithLabelKeys(obsmetrics.ExporterKey), metric.WithUnit(metricdata.UnitDimensionless)) + insts.queueCapacityBytes, _ = registry.AddInt64DerivedGauge( + obsmetrics.ExporterKey+"/queue_capacity_bytes", + metric.WithDescription("Fixed capacity of the retry queue (in bytes)"), + metric.WithLabelKeys(obsmetrics.ExporterKey), + metric.WithUnit(metricdata.UnitDimensionless)) + insts.failedToEnqueueTraceSpans, _ = registry.AddInt64Cumulative( obsmetrics.ExporterKey+"/enqueue_failed_spans", metric.WithDescription("Number of spans failed to be added to the sending queue."), diff --git a/exporter/exporterhelper/queued_retry_experimental.go b/exporter/exporterhelper/queued_retry_experimental.go index a808e212c71..584dfc972e6 100644 --- a/exporter/exporterhelper/queued_retry_experimental.go +++ b/exporter/exporterhelper/queued_retry_experimental.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "math" "go.opencensus.io/metric/metricdata" "go.opentelemetry.io/otel/attribute" @@ -44,6 +45,8 @@ type QueueSettings struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` + // QueueSizeBytes is the maximum number of bytes allowed in queue at a given time. + QueueSizeBytes int `mapstructure:"queue_size_bytes"` // PersistentStorageEnabled describes whether persistence via a file storage extension is enabled PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"` } @@ -57,7 +60,9 @@ func NewDefaultQueueSettings() QueueSettings { // This is a pretty decent value for production. // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, // multiply that by the number of requests per seconds. - QueueSize: 5000, + QueueSize: 5000, + // By default, the limit should be big enough to ignore it. + QueueSizeBytes: math.MaxInt, PersistentStorageEnabled: false, } } @@ -69,7 +74,11 @@ func (qCfg *QueueSettings) Validate() error { } if qCfg.QueueSize <= 0 { - return errors.New("queue size must be positive") + return errors.New("queue size in batches must be positive") + } + + if qCfg.QueueSizeBytes <= 0 { + return errors.New("queue size in bytes must be positive") } return nil @@ -164,7 +173,7 @@ func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, hos return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName(), qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName(), qrs.cfg.QueueSize, qrs.cfg.QueueSizeBytes, qrs.logger, *storageClient, qrs.requestUnmarshaler) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -219,11 +228,19 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er if err != nil { return fmt.Errorf("failed to create retry queue size metric: %w", err) } + err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { return int64(qrs.cfg.QueueSize) }, metricdata.NewLabelValue(qrs.fullName())) if err != nil { - return fmt.Errorf("failed to create retry queue capacity metric: %w", err) + return fmt.Errorf("failed to create retry queue capacity in batches metric: %w", err) + } + + err = globalInstruments.queueCapacityBytes.UpsertEntry(func() int64 { + return int64(qrs.cfg.QueueSizeBytes) + }, metricdata.NewLabelValue(qrs.fullName())) + if err != nil { + return fmt.Errorf("failed to create retry queue capacity in bytes metric: %w", err) } } From 3e67a59ae6e286e1219271190dbbef9e206b0680 Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Wed, 27 Jul 2022 15:24:42 +0200 Subject: [PATCH 2/2] [exporterhelper] Deprecate queue_size configuration option The capacity of the queue can now be expressed in bytes. Because of that, parameter queue_size has been renamed to queue_size_batches. --- exporter/exporterhelper/README.md | 4 +-- .../queued_retry_experimental.go | 29 ++++++++++++++--- .../exporterhelper/queued_retry_inmemory.go | 32 +++++++++++++++---- exporter/exporterhelper/queued_retry_test.go | 6 ++-- 4 files changed, 55 insertions(+), 16 deletions(-) diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 6258b0f87a1..c20065ef413 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -17,7 +17,7 @@ The following configuration options can be modified: - `sending_queue` - `enabled` (default = true) - `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false` - - `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping; ignored if `enabled` is `false` + - `queue_size_batches` (default = 5000): Maximum number of batches kept in memory before dropping; ignored if `enabled` is `false` User should calculate this as `num_seconds * requests_per_second / requests_per_batch` where: - `num_seconds` is the number of seconds to buffer in case of a backend outage - `requests_per_second` is the average number of requests per seconds @@ -39,7 +39,7 @@ With this build tag set, additional configuration option can be enabled: - `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension (note, `enable_unstable` build tag needs to be enabled first, see below for more details) - `queue_size_bytes` (default = 2^63 - 1): Maximum number of bytes available for batches in the storage before dropping; - It is a separate limit from `queue_size`, both limits are being checked when adding to the queue. + It is a separate limit from `queue_size_batches`, both limits are being checked when adding to the queue. The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which, similarly as for in-memory buffering, defaults to 5000 batches). diff --git a/exporter/exporterhelper/queued_retry_experimental.go b/exporter/exporterhelper/queued_retry_experimental.go index 584dfc972e6..fe4beb74d1f 100644 --- a/exporter/exporterhelper/queued_retry_experimental.go +++ b/exporter/exporterhelper/queued_retry_experimental.go @@ -43,24 +43,33 @@ type QueueSettings struct { Enabled bool `mapstructure:"enabled"` // NumConsumers is the number of consumers from the queue. NumConsumers int `mapstructure:"num_consumers"` - // QueueSize is the maximum number of batches allowed in queue at a given time. + // QueueSize is a deprecated parameter that used to serve the same role as QueueSizeBatches. QueueSize int `mapstructure:"queue_size"` + // QueueSizeBatches is the maximum number of batches allowed in queue at a given time. + QueueSizeBatches int `mapstructure:"queue_size_batches"` // QueueSizeBytes is the maximum number of bytes allowed in queue at a given time. QueueSizeBytes int `mapstructure:"queue_size_bytes"` // PersistentStorageEnabled describes whether persistence via a file storage extension is enabled PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"` } +const ( + queueSizeDisabledValue = 0 +) + // NewDefaultQueueSettings returns the default settings for QueueSettings. func NewDefaultQueueSettings() QueueSettings { return QueueSettings{ Enabled: true, NumConsumers: 10, + // QueueSize is a deprecated parameter now. + // If the user configurates it, use it instead of QueueSizeBatches and print a warning. + QueueSize: queueSizeDisabledValue, // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. // This is a pretty decent value for production. // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, // multiply that by the number of requests per seconds. - QueueSize: 5000, + QueueSizeBatches: 5000, // By default, the limit should be big enough to ignore it. QueueSizeBytes: math.MaxInt, PersistentStorageEnabled: false, @@ -73,7 +82,7 @@ func (qCfg *QueueSettings) Validate() error { return nil } - if qCfg.QueueSize <= 0 { + if qCfg.QueueSizeBatches <= 0 || (qCfg.QueueSize <= 0 && qCfg.QueueSize != queueSizeDisabledValue) { return errors.New("queue size in batches must be positive") } @@ -173,7 +182,13 @@ func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, hos return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName(), qrs.cfg.QueueSize, qrs.cfg.QueueSizeBytes, qrs.logger, *storageClient, qrs.requestUnmarshaler) + actualSizeBatches := qrs.cfg.QueueSizeBatches + if qrs.cfg.QueueSize != queueSizeDisabledValue { + actualSizeBatches = qrs.cfg.QueueSize + qrs.logger.Warn("Configuration option queue_size is deprecated, use queue_size_batches instead") + } + + qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName(), actualSizeBatches, qrs.cfg.QueueSizeBytes, qrs.logger, *storageClient, qrs.requestUnmarshaler) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -229,8 +244,12 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return fmt.Errorf("failed to create retry queue size metric: %w", err) } + actualSizeBatches := qrs.cfg.QueueSizeBatches + if qrs.cfg.QueueSize != queueSizeDisabledValue { + actualSizeBatches = qrs.cfg.QueueSize + } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(actualSizeBatches) }, metricdata.NewLabelValue(qrs.fullName())) if err != nil { return fmt.Errorf("failed to create retry queue capacity in batches metric: %w", err) diff --git a/exporter/exporterhelper/queued_retry_inmemory.go b/exporter/exporterhelper/queued_retry_inmemory.go index 7b76221a78f..e474ab66f65 100644 --- a/exporter/exporterhelper/queued_retry_inmemory.go +++ b/exporter/exporterhelper/queued_retry_inmemory.go @@ -41,20 +41,29 @@ type QueueSettings struct { Enabled bool `mapstructure:"enabled"` // NumConsumers is the number of consumers from the queue. NumConsumers int `mapstructure:"num_consumers"` - // QueueSize is the maximum number of batches allowed in queue at a given time. + // QueueSize is a deprecated parameter that used to serve the same role as QueueSizeBatches. QueueSize int `mapstructure:"queue_size"` + // QueueSizeBatches is the maximum number of batches allowed in queue at a given time. + QueueSizeBatches int `mapstructure:"queue_size_batches"` } +const ( + queueSizeDisabledValue = 0 +) + // NewDefaultQueueSettings returns the default settings for QueueSettings. func NewDefaultQueueSettings() QueueSettings { return QueueSettings{ Enabled: true, NumConsumers: 10, + // QueueSize is a deprecated parameter now. + // If the user configurates it, use it instead of QueueSizeBatches and print a warning. + QueueSize: queueSizeDisabledValue, // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. // This is a pretty decent value for production. // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, // multiply that by the number of requests per seconds. - QueueSize: 5000, + QueueSizeBatches: 5000, } } @@ -64,7 +73,7 @@ func (qCfg *QueueSettings) Validate() error { return nil } - if qCfg.QueueSize <= 0 { + if qCfg.QueueSizeBatches <= 0 || (qCfg.QueueSize <= 0 && qCfg.QueueSize != queueSizeDisabledValue) { return errors.New("queue size must be positive") } @@ -85,6 +94,12 @@ func newQueuedRetrySender(id config.ComponentID, _ config.DataType, qCfg QueueSe retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) + actualSizeBatches := qCfg.QueueSizeBatches + if qCfg.QueueSize != queueSizeDisabledValue { + actualSizeBatches = qCfg.QueueSize + logger.Warn("Configuration option queue_size is deprecated, use queue_size_batches instead") + } + return &queuedRetrySender{ fullName: id.String(), cfg: qCfg, @@ -96,7 +111,7 @@ func newQueuedRetrySender(id config.ComponentID, _ config.DataType, qCfg QueueSe logger: sampledLogger, onTemporaryFailure: onTemporaryFailure, }, - queue: internal.NewBoundedMemoryQueue(qCfg.QueueSize, func(item interface{}) {}), + queue: internal.NewBoundedMemoryQueue(actualSizeBatches, func(item interface{}) {}), retryStopCh: retryStopCh, traceAttributes: []attribute.KeyValue{traceAttr}, logger: sampledLogger, @@ -128,11 +143,16 @@ func (qrs *queuedRetrySender) start(context.Context, component.Host) error { if err != nil { return fmt.Errorf("failed to create retry queue size metric: %w", err) } + + actualSizeBatches := qrs.cfg.QueueSizeBatches + if qrs.cfg.QueueSize != queueSizeDisabledValue { + actualSizeBatches = qrs.cfg.QueueSize + } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(actualSizeBatches) }, metricdata.NewLabelValue(qrs.fullName)) if err != nil { - return fmt.Errorf("failed to create retry queue capacity metric: %w", err) + return fmt.Errorf("failed to create retry queue capacity in batches metric: %w", err) } } diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index b7cebb39c8b..b5298918676 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -258,7 +258,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 - qCfg.QueueSize = 1 + qCfg.QueueSizeBatches = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) @@ -285,7 +285,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() - qCfg.QueueSize = 0 + qCfg.QueueSizeBatches = 0 rCfg := NewDefaultRetrySettings() be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) @@ -372,7 +372,7 @@ func TestQueueSettings_Validate(t *testing.T) { qCfg := NewDefaultQueueSettings() assert.NoError(t, qCfg.Validate()) - qCfg.QueueSize = 0 + qCfg.QueueSizeBatches = 0 assert.EqualError(t, qCfg.Validate(), "queue size must be positive") // Confirm Validate doesn't return error with invalid config when feature is disabled