@@ -25,14 +25,12 @@ const (
2525 zapErrorCount = "errorCount"
2626 zapNumberOfItems = "numberOfItems"
2727
28- readIndexKey = "ri"
29- writeIndexKey = "wi"
30- currentlyDispatchedItemsKey = "di"
31-
32- // queueMetadataKey is the new single key for all queue metadata.
33- // TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
34- //nolint:unused
35- queueMetadataKey = "qmv0"
28+ legacyReadIndexKey = "ri"
29+ legacyWriteIndexKey = "wi"
30+ legacyCurrentlyDispatchedItemsKey = "di"
31+
32+ // metadataKey is the new single key for all queue metadata.
33+ metadataKey = "qmv0"
3634)
3735
3836var (
@@ -154,14 +152,53 @@ func (pq *persistentQueue[T]) initClient(ctx context.Context, client storage.Cli
154152 pq .client = client
155153 // Start with a reference 1 which is the reference we use for the producer goroutines and initialization.
156154 pq .refClient = 1
157- pq .initPersistentContiguousStorage (ctx )
158- // Make sure the leftover requests are handled
159- pq .retrieveAndEnqueueNotDispatchedReqs (ctx )
155+
156+ // Try to load from new consolidated metadata first
157+ err := pq .loadQueueMetadata (ctx )
158+ switch {
159+ case err == nil :
160+ pq .enqueueNotDispatchedReqs (ctx , pq .metadata .CurrentlyDispatchedItems )
161+ pq .metadata .CurrentlyDispatchedItems = nil
162+ case ! errors .Is (err , errValueNotSet ):
163+ pq .logger .Error ("Failed getting metadata, starting with new ones" , zap .Error (err ))
164+ pq .metadata = PersistentMetadata {}
165+ default :
166+ pq .logger .Info ("New queue metadata key not found, attempting to load legacy format." )
167+ pq .loadLegacyMetadata (ctx )
168+ }
169+ }
170+
171+ // loadQueueMetadata loads queue metadata from the consolidated key
172+ func (pq * persistentQueue [T ]) loadQueueMetadata (ctx context.Context ) error {
173+ buf , err := pq .client .Get (ctx , metadataKey )
174+ if err != nil {
175+ return err
176+ }
177+
178+ if len (buf ) == 0 {
179+ return errValueNotSet
180+ }
181+
182+ metadata := & pq .metadata
183+ if err = metadata .Unmarshal (buf ); err != nil {
184+ return err
185+ }
186+
187+ pq .logger .Info ("Loaded queue metadata" ,
188+ zap .Uint64 ("readIndex" , pq .metadata .ReadIndex ),
189+ zap .Uint64 ("writeIndex" , pq .metadata .WriteIndex ),
190+ zap .Int64 ("itemsSize" , pq .metadata .ItemsSize ),
191+ zap .Int64 ("bytesSize" , pq .metadata .BytesSize ),
192+ zap .Int ("dispatchedItems" , len (pq .metadata .CurrentlyDispatchedItems )))
193+
194+ return nil
160195}
161196
162- func (pq * persistentQueue [T ]) initPersistentContiguousStorage (ctx context.Context ) {
163- riOp := storage .GetOperation (readIndexKey )
164- wiOp := storage .GetOperation (writeIndexKey )
197+ // TODO: Remove legacy format support after 6 months (target: December 2025)
198+ func (pq * persistentQueue [T ]) loadLegacyMetadata (ctx context.Context ) {
199+ // Fallback to legacy individual keys for backward compatibility
200+ riOp := storage .GetOperation (legacyReadIndexKey )
201+ wiOp := storage .GetOperation (legacyWriteIndexKey )
165202
166203 err := pq .client .Batch (ctx , riOp , wiOp )
167204 if err == nil {
@@ -181,6 +218,29 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
181218 pq .metadata .ReadIndex = 0
182219 pq .metadata .WriteIndex = 0
183220 }
221+
222+ pq .retrieveAndEnqueueNotDispatchedReqs (ctx )
223+
224+ // Save to a new format and clean up legacy keys
225+ metadataBytes , err := pq .metadata .Marshal ()
226+ if err != nil {
227+ pq .logger .Error ("Failed to marshal metadata" , zap .Error (err ))
228+ return
229+ }
230+
231+ if err = pq .client .Set (ctx , metadataKey , metadataBytes ); err != nil {
232+ pq .logger .Error ("Failed to persist current metadata to storage" , zap .Error (err ))
233+ return
234+ }
235+
236+ if err = pq .client .Batch (ctx ,
237+ storage .DeleteOperation (legacyReadIndexKey ),
238+ storage .DeleteOperation (legacyWriteIndexKey ),
239+ storage .DeleteOperation (legacyCurrentlyDispatchedItemsKey )); err != nil {
240+ pq .logger .Warn ("Failed to cleanup legacy metadata keys" , zap .Error (err ))
241+ } else {
242+ pq .logger .Info ("Successfully migrated to consolidated metadata format" )
243+ }
184244}
185245
186246func (pq * persistentQueue [T ]) Shutdown (ctx context.Context ) error {
@@ -213,11 +273,7 @@ func (pq *persistentQueue[T]) unrefClient(ctx context.Context) error {
213273func (pq * persistentQueue [T ]) Offer (ctx context.Context , req T ) error {
214274 pq .mu .Lock ()
215275 defer pq .mu .Unlock ()
216- return pq .putInternal (ctx , req )
217- }
218276
219- // putInternal is the internal version that requires caller to hold the mutex lock.
220- func (pq * persistentQueue [T ]) putInternal (ctx context.Context , req T ) error {
221277 size := pq .activeSizer .Sizeof (req )
222278 for pq .internalSize ()+ size > pq .capacity {
223279 if ! pq .blockOnOverflow {
@@ -228,23 +284,36 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
228284 }
229285 }
230286
231- reqBuf , err := pq .encoding .Marshal (ctx , req )
287+ pq .metadata .ItemsSize += pq .itemsSizer .Sizeof (req )
288+ pq .metadata .BytesSize += pq .bytesSizer .Sizeof (req )
289+
290+ return pq .putInternal (ctx , req )
291+ }
292+
293+ // putInternal adds the request to the storage without updating items/bytes sizes.
294+ func (pq * persistentQueue [T ]) putInternal (ctx context.Context , req T ) error {
295+ pq .metadata .WriteIndex ++
296+
297+ metadataBuf , err := pq .metadata .Marshal ()
232298 if err != nil {
233299 return err
234300 }
235301
302+ reqBuf , err := pq .encoding .Marshal (ctx , req )
303+ if err != nil {
304+ return err
305+ }
236306 // Carry out a transaction where we both add the item and update the write index
237307 ops := []* storage.Operation {
238- storage .SetOperation (writeIndexKey , itemIndexToBytes ( pq . metadata . WriteIndex + 1 ) ),
239- storage .SetOperation (getItemKey (pq .metadata .WriteIndex ), reqBuf ),
308+ storage .SetOperation (metadataKey , metadataBuf ),
309+ storage .SetOperation (getItemKey (pq .metadata .WriteIndex - 1 ), reqBuf ),
240310 }
241311 if err = pq .client .Batch (ctx , ops ... ); err != nil {
312+ // At this moment, metadata may be updated in the storage, so we cannot just revert changes to the
313+ // metadata, rely on the sizes being fixed on complete draining.
242314 return err
243315 }
244316
245- pq .metadata .ItemsSize += pq .itemsSizer .Sizeof (req )
246- pq .metadata .BytesSize += pq .bytesSizer .Sizeof (req )
247- pq .metadata .WriteIndex ++
248317 pq .hasMoreElements .Signal ()
249318
250319 return nil
@@ -291,14 +360,16 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, conte
291360 // Increase here, so even if errors happen below, it always iterates
292361 pq .metadata .ReadIndex ++
293362 pq .metadata .CurrentlyDispatchedItems = append (pq .metadata .CurrentlyDispatchedItems , index )
294- getOp := storage .GetOperation (getItemKey (index ))
295- err := pq .client .Batch (ctx ,
296- storage .SetOperation (readIndexKey , itemIndexToBytes (pq .metadata .ReadIndex )),
297- storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata .CurrentlyDispatchedItems )),
298- getOp )
299363
300364 var req T
301365 restoredCtx := context .Background ()
366+ metadataBytes , err := pq .metadata .Marshal ()
367+ if err != nil {
368+ return 0 , req , restoredCtx , false
369+ }
370+
371+ getOp := storage .GetOperation (getItemKey (index ))
372+ err = pq .client .Batch (ctx , storage .SetOperation (metadataKey , metadataBytes ), getOp )
302373 if err == nil {
303374 restoredCtx , req , err = pq .encoding .Unmarshal (getOp .Value )
304375 }
@@ -338,9 +409,6 @@ func (pq *persistentQueue[T]) onDone(index uint64, itemsSize int64, bytesSize in
338409 return
339410 }
340411
341- if err := pq .itemDispatchingFinish (context .Background (), index ); err != nil {
342- pq .logger .Error ("Error deleting item from queue" , zap .Error (err ))
343- }
344412 pq .metadata .BytesSize -= bytesSize
345413 if pq .metadata .BytesSize < 0 {
346414 pq .metadata .BytesSize = 0
@@ -350,10 +418,8 @@ func (pq *persistentQueue[T]) onDone(index uint64, itemsSize int64, bytesSize in
350418 pq .metadata .ItemsSize = 0
351419 }
352420
353- // Ensure the used size are in sync when queue is drained.
354- if pq .requestSize () == 0 {
355- pq .metadata .BytesSize = 0
356- pq .metadata .ItemsSize = 0
421+ if err := pq .itemDispatchingFinish (context .Background (), index ); err != nil {
422+ pq .logger .Error ("Error deleting item from queue" , zap .Error (err ))
357423 }
358424
359425 // More space available after data are removed from the storage.
@@ -368,7 +434,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
368434 pq .mu .Lock ()
369435 defer pq .mu .Unlock ()
370436 pq .logger .Debug ("Checking if there are items left for dispatch by consumers" )
371- itemKeysBuf , err := pq .client .Get (ctx , currentlyDispatchedItemsKey )
437+ itemKeysBuf , err := pq .client .Get (ctx , legacyCurrentlyDispatchedItemsKey )
372438 if err == nil {
373439 dispatchedItems , err = bytesToItemIndexArray (itemKeysBuf )
374440 }
@@ -377,6 +443,10 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
377443 return
378444 }
379445
446+ pq .enqueueNotDispatchedReqs (ctx , dispatchedItems )
447+ }
448+
449+ func (pq * persistentQueue [T ]) enqueueNotDispatchedReqs (ctx context.Context , dispatchedItems []uint64 ) {
380450 if len (dispatchedItems ) == 0 {
381451 pq .logger .Debug ("No items left for dispatch by consumers" )
382452 return
@@ -440,23 +510,35 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u
440510 }
441511 }
442512
443- setOp := storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata .CurrentlyDispatchedItems ))
513+ // Ensure the used size are in sync when queue is drained.
514+ if pq .requestSize () == 0 {
515+ pq .metadata .BytesSize = 0
516+ pq .metadata .ItemsSize = 0
517+ }
518+
519+ metadataBytes , err := pq .metadata .Marshal ()
520+ if err != nil {
521+ return err
522+ }
523+
524+ setOp := storage .SetOperation (metadataKey , metadataBytes )
444525 deleteOp := storage .DeleteOperation (getItemKey (index ))
445- if err := pq .client .Batch (ctx , setOp , deleteOp ); err != nil {
446- // got an error, try to gracefully handle it
447- pq .logger .Warn ("Failed updating currently dispatched items, trying to delete the item first" ,
448- zap .Error (err ))
449- } else {
526+ err = pq .client .Batch (ctx , setOp , deleteOp )
527+ if err == nil {
450528 // Everything ok, exit
451529 return nil
452530 }
453531
454- if err := pq .client .Batch (ctx , deleteOp ); err != nil {
532+ // got an error, try to gracefully handle it
533+ pq .logger .Warn ("Failed updating currently dispatched items, trying to delete the item first" ,
534+ zap .Error (err ))
535+
536+ if err = pq .client .Batch (ctx , deleteOp ); err != nil {
455537 // Return an error here, as this indicates an issue with the underlying storage medium
456538 return fmt .Errorf ("failed deleting item from queue, got error from storage: %w" , err )
457539 }
458540
459- if err : = pq .client .Batch (ctx , setOp ); err != nil {
541+ if err = pq .client .Batch (ctx , setOp ); err != nil {
460542 // even if this fails, we still have the right dispatched items in memory
461543 // at worst, we'll have the wrong list in storage, and we'll discard the nonexistent items during startup
462544 return fmt .Errorf ("failed updating currently dispatched items, but deleted item successfully: %w" , err )
@@ -483,10 +565,6 @@ func getItemKey(index uint64) string {
483565 return strconv .FormatUint (index , 10 )
484566}
485567
486- func itemIndexToBytes (value uint64 ) []byte {
487- return binary .LittleEndian .AppendUint64 ([]byte {}, value )
488- }
489-
490568func bytesToItemIndex (buf []byte ) (uint64 , error ) {
491569 if buf == nil {
492570 return uint64 (0 ), errValueNotSet
@@ -498,17 +576,6 @@ func bytesToItemIndex(buf []byte) (uint64, error) {
498576 return binary .LittleEndian .Uint64 (buf ), nil
499577}
500578
501- func itemIndexArrayToBytes (arr []uint64 ) []byte {
502- size := len (arr )
503- buf := make ([]byte , 0 , 4 + size * 8 )
504- //nolint:gosec
505- buf = binary .LittleEndian .AppendUint32 (buf , uint32 (size ))
506- for _ , item := range arr {
507- buf = binary .LittleEndian .AppendUint64 (buf , item )
508- }
509- return buf
510- }
511-
512579func bytesToItemIndexArray (buf []byte ) ([]uint64 , error ) {
513580 if len (buf ) == 0 {
514581 return nil , nil
0 commit comments