Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/persistent-metadata-data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Refactor persistent storage size backup to always record it.

# One or more tracking issues or pull requests related to the change
issues: [12890]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
187 changes: 127 additions & 60 deletions exporter/exporterhelper/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@
zapErrorCount = "errorCount"
zapNumberOfItems = "numberOfItems"

readIndexKey = "ri"
writeIndexKey = "wi"
currentlyDispatchedItemsKey = "di"

// queueMetadataKey is the new single key for all queue metadata.
// TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
//nolint:unused
queueMetadataKey = "qmv0"
legacyReadIndexKey = "ri"
legacyWriteIndexKey = "wi"
legacyCurrentlyDispatchedItemsKey = "di"

// metadataKey is the new single key for all queue metadata.
metadataKey = "qmv0"
)

var (
Expand Down Expand Up @@ -154,14 +152,53 @@
pq.client = client
// Start with a reference 1 which is the reference we use for the producer goroutines and initialization.
pq.refClient = 1
pq.initPersistentContiguousStorage(ctx)
// Make sure the leftover requests are handled
pq.retrieveAndEnqueueNotDispatchedReqs(ctx)

// Try to load from new consolidated metadata first
err := pq.loadQueueMetadata(ctx)
switch {
case err == nil:
pq.enqueueNotDispatchedReqs(ctx, pq.metadata.CurrentlyDispatchedItems)
pq.metadata.CurrentlyDispatchedItems = nil
case !errors.Is(err, errValueNotSet):
pq.logger.Error("Failed getting metadata, starting with new ones", zap.Error(err))
pq.metadata = PersistentMetadata{}
default:
pq.logger.Info("New queue metadata key not found, attempting to load legacy format.")
pq.loadLegacyMetadata(ctx)
}
}

// loadQueueMetadata loads queue metadata from the consolidated key
func (pq *persistentQueue[T]) loadQueueMetadata(ctx context.Context) error {
buf, err := pq.client.Get(ctx, metadataKey)
if err != nil {
return err
}

if len(buf) == 0 {
return errValueNotSet
}

metadata := &pq.metadata
if err = metadata.Unmarshal(buf); err != nil {
return err
}

pq.logger.Info("Loaded queue metadata",
zap.Uint64("readIndex", pq.metadata.ReadIndex),
zap.Uint64("writeIndex", pq.metadata.WriteIndex),
zap.Int64("itemsSize", pq.metadata.ItemsSize),
zap.Int64("bytesSize", pq.metadata.BytesSize),
zap.Int("dispatchedItems", len(pq.metadata.CurrentlyDispatchedItems)))

return nil
}

func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Context) {
riOp := storage.GetOperation(readIndexKey)
wiOp := storage.GetOperation(writeIndexKey)
// TODO: Remove legacy format support after 6 months (target: December 2025)
func (pq *persistentQueue[T]) loadLegacyMetadata(ctx context.Context) {
// Fallback to legacy individual keys for backward compatibility
riOp := storage.GetOperation(legacyReadIndexKey)
wiOp := storage.GetOperation(legacyWriteIndexKey)

err := pq.client.Batch(ctx, riOp, wiOp)
if err == nil {
Expand All @@ -181,6 +218,29 @@
pq.metadata.ReadIndex = 0
pq.metadata.WriteIndex = 0
}

pq.retrieveAndEnqueueNotDispatchedReqs(ctx)

// Save to a new format and clean up legacy keys
metadataBytes, err := pq.metadata.Marshal()
if err != nil {
pq.logger.Error("Failed to marshal metadata", zap.Error(err))
return
}

Check warning on line 229 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L227-L229

Added lines #L227 - L229 were not covered by tests

if err = pq.client.Set(ctx, metadataKey, metadataBytes); err != nil {
pq.logger.Error("Failed to persist current metadata to storage", zap.Error(err))
return
}

Check warning on line 234 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L232-L234

Added lines #L232 - L234 were not covered by tests

if err = pq.client.Batch(ctx,
storage.DeleteOperation(legacyReadIndexKey),
storage.DeleteOperation(legacyWriteIndexKey),
storage.DeleteOperation(legacyCurrentlyDispatchedItemsKey)); err != nil {
pq.logger.Warn("Failed to cleanup legacy metadata keys", zap.Error(err))

Check warning on line 240 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L240

Added line #L240 was not covered by tests
} else {
pq.logger.Info("Successfully migrated to consolidated metadata format")
}
}

func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -213,11 +273,7 @@
func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
pq.mu.Lock()
defer pq.mu.Unlock()
return pq.putInternal(ctx, req)
}

// putInternal is the internal version that requires caller to hold the mutex lock.
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
size := pq.activeSizer.Sizeof(req)
for pq.internalSize()+size > pq.capacity {
if !pq.blockOnOverflow {
Expand All @@ -228,23 +284,36 @@
}
}

reqBuf, err := pq.encoding.Marshal(ctx, req)
pq.metadata.ItemsSize += pq.itemsSizer.Sizeof(req)
pq.metadata.BytesSize += pq.bytesSizer.Sizeof(req)

return pq.putInternal(ctx, req)
}

// putInternal adds the request to the storage without updating items/bytes sizes.
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
pq.metadata.WriteIndex++

metadataBuf, err := pq.metadata.Marshal()
if err != nil {
return err
}

reqBuf, err := pq.encoding.Marshal(ctx, req)
if err != nil {
return err
}

Check warning on line 305 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L304-L305

Added lines #L304 - L305 were not covered by tests
// Carry out a transaction where we both add the item and update the write index
ops := []*storage.Operation{
storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.metadata.WriteIndex+1)),
storage.SetOperation(getItemKey(pq.metadata.WriteIndex), reqBuf),
storage.SetOperation(metadataKey, metadataBuf),
storage.SetOperation(getItemKey(pq.metadata.WriteIndex-1), reqBuf),
}
if err = pq.client.Batch(ctx, ops...); err != nil {
// At this moment, metadata may be updated in the storage, so we cannot just revert changes to the
// metadata, rely on the sizes being fixed on complete draining.
return err
}

pq.metadata.ItemsSize += pq.itemsSizer.Sizeof(req)
pq.metadata.BytesSize += pq.bytesSizer.Sizeof(req)
pq.metadata.WriteIndex++
pq.hasMoreElements.Signal()

return nil
Expand Down Expand Up @@ -291,14 +360,16 @@
// Increase here, so even if errors happen below, it always iterates
pq.metadata.ReadIndex++
pq.metadata.CurrentlyDispatchedItems = append(pq.metadata.CurrentlyDispatchedItems, index)
getOp := storage.GetOperation(getItemKey(index))
err := pq.client.Batch(ctx,
storage.SetOperation(readIndexKey, itemIndexToBytes(pq.metadata.ReadIndex)),
storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.metadata.CurrentlyDispatchedItems)),
getOp)

var req T
restoredCtx := context.Background()
metadataBytes, err := pq.metadata.Marshal()
if err != nil {
return 0, req, restoredCtx, false
}

Check warning on line 369 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L368-L369

Added lines #L368 - L369 were not covered by tests

getOp := storage.GetOperation(getItemKey(index))
err = pq.client.Batch(ctx, storage.SetOperation(metadataKey, metadataBytes), getOp)
if err == nil {
restoredCtx, req, err = pq.encoding.Unmarshal(getOp.Value)
}
Expand Down Expand Up @@ -338,9 +409,6 @@
return
}

if err := pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}
pq.metadata.BytesSize -= bytesSize
if pq.metadata.BytesSize < 0 {
pq.metadata.BytesSize = 0
Expand All @@ -350,10 +418,8 @@
pq.metadata.ItemsSize = 0
}

// Ensure the used size are in sync when queue is drained.
if pq.requestSize() == 0 {
pq.metadata.BytesSize = 0
pq.metadata.ItemsSize = 0
if err := pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))

Check warning on line 422 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L422

Added line #L422 was not covered by tests
}

// More space available after data are removed from the storage.
Expand All @@ -368,7 +434,7 @@
pq.mu.Lock()
defer pq.mu.Unlock()
pq.logger.Debug("Checking if there are items left for dispatch by consumers")
itemKeysBuf, err := pq.client.Get(ctx, currentlyDispatchedItemsKey)
itemKeysBuf, err := pq.client.Get(ctx, legacyCurrentlyDispatchedItemsKey)
if err == nil {
dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf)
}
Expand All @@ -377,6 +443,10 @@
return
}

pq.enqueueNotDispatchedReqs(ctx, dispatchedItems)
}

func (pq *persistentQueue[T]) enqueueNotDispatchedReqs(ctx context.Context, dispatchedItems []uint64) {
if len(dispatchedItems) == 0 {
pq.logger.Debug("No items left for dispatch by consumers")
return
Expand Down Expand Up @@ -440,23 +510,35 @@
}
}

setOp := storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.metadata.CurrentlyDispatchedItems))
// Ensure the used size are in sync when queue is drained.
if pq.requestSize() == 0 {
pq.metadata.BytesSize = 0
pq.metadata.ItemsSize = 0
}

metadataBytes, err := pq.metadata.Marshal()
if err != nil {
return err
}

Check warning on line 522 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L521-L522

Added lines #L521 - L522 were not covered by tests

setOp := storage.SetOperation(metadataKey, metadataBytes)
deleteOp := storage.DeleteOperation(getItemKey(index))
if err := pq.client.Batch(ctx, setOp, deleteOp); err != nil {
// got an error, try to gracefully handle it
pq.logger.Warn("Failed updating currently dispatched items, trying to delete the item first",
zap.Error(err))
} else {
err = pq.client.Batch(ctx, setOp, deleteOp)
if err == nil {
// Everything ok, exit
return nil
}

if err := pq.client.Batch(ctx, deleteOp); err != nil {
// got an error, try to gracefully handle it
pq.logger.Warn("Failed updating currently dispatched items, trying to delete the item first",
zap.Error(err))

if err = pq.client.Batch(ctx, deleteOp); err != nil {
// Return an error here, as this indicates an issue with the underlying storage medium
return fmt.Errorf("failed deleting item from queue, got error from storage: %w", err)
}

if err := pq.client.Batch(ctx, setOp); err != nil {
if err = pq.client.Batch(ctx, setOp); err != nil {
// even if this fails, we still have the right dispatched items in memory
// at worst, we'll have the wrong list in storage, and we'll discard the nonexistent items during startup
return fmt.Errorf("failed updating currently dispatched items, but deleted item successfully: %w", err)
Expand All @@ -483,10 +565,6 @@
return strconv.FormatUint(index, 10)
}

func itemIndexToBytes(value uint64) []byte {
return binary.LittleEndian.AppendUint64([]byte{}, value)
}

func bytesToItemIndex(buf []byte) (uint64, error) {
if buf == nil {
return uint64(0), errValueNotSet
Expand All @@ -498,17 +576,6 @@
return binary.LittleEndian.Uint64(buf), nil
}

func itemIndexArrayToBytes(arr []uint64) []byte {
size := len(arr)
buf := make([]byte, 0, 4+size*8)
//nolint:gosec
buf = binary.LittleEndian.AppendUint32(buf, uint32(size))
for _, item := range arr {
buf = binary.LittleEndian.AppendUint64(buf, item)
}
return buf
}

func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
if len(buf) == 0 {
return nil, nil
Expand Down
Loading
Loading