Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package exporterhelper
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -365,38 +364,39 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
}

func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
produceCounter := &atomic.Uint32{}

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown

rCfg := NewDefaultRetrySettings()
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered

be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
mockReq := newErrorRequest()
be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockReq),
newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

// wraps original queue so we can count operations
be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{
Queue: be.queueSender.(*queueSender).queue,
produceCounter: produceCounter,
var extensions = map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(nil),
}
be.queueSender.(*queueSender).requeuingEnabled = true
host := &mockHost{ext: extensions}

require.NoError(t, be.Start(context.Background(), &mockHost{}))
require.NoError(t, be.Start(context.Background(), host))

// Invoke queuedRetrySender so the producer will put the item for consumer to poll
require.NoError(t, be.send(context.Background(), newErrorRequest()))
require.NoError(t, be.send(context.Background(), mockReq))

// first wait for the item to be produced to the queue initially
// first wait for the item to be consumed from the queue
assert.Eventually(t, func() bool {
return produceCounter.Load() == uint32(1)
return be.queueSender.(*queueSender).queue.Size() == 0
}, time.Second, 1*time.Millisecond)

// shuts down and ensure the item is produced in the queue again
require.NoError(t, be.Shutdown(context.Background()))
assert.Eventually(t, func() bool {
return produceCounter.Load() == uint32(2)
return be.queueSender.(*queueSender).queue.Size() == 1
}, time.Second, 1*time.Millisecond)
}

Expand Down
13 changes: 1 addition & 12 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/testdata"
)

func mockRequestUnmarshaler(mr *mockRequest) RequestUnmarshaler {
func mockRequestUnmarshaler(mr Request) RequestUnmarshaler {
return func(bytes []byte) (Request, error) {
return mr, nil
}
Expand Down Expand Up @@ -405,13 +404,3 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met
}
return true
}

type producerConsumerQueueWithCounter struct {
internal.Queue[Request]
produceCounter *atomic.Uint32
}

func (pcq *producerConsumerQueueWithCounter) Offer(ctx context.Context, item Request) error {
pcq.produceCounter.Add(1)
return pcq.Queue.Offer(ctx, item)
}