@@ -10,6 +10,7 @@ import (
1010 "encoding/json"
1111 "errors"
1212 "fmt"
13+ "math"
1314 "strconv"
1415 "sync"
1516
@@ -243,11 +244,6 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
243244 return pq .putInternal (ctx , req )
244245}
245246
246- type marshaledRequestWithSpanContext struct {
247- RequestBytes []byte `json:"request"`
248- SpanContextJSON json.RawMessage `json:"span_context,omitempty"`
249- }
250-
251247type spanContextConfigWrapper struct {
252248 TraceID string
253249 SpanID string
@@ -289,31 +285,42 @@ func spanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContex
289285 return & sc , nil
290286}
291287
292- // unmarshalRequestWithSpanContext unmarshals a marshaledRequestWithSpanContext from bytes, returning the request
293- // and a context with the restored SpanContext (if present).
288+ // unmarshalRequestWithSpanContext unmarshals a binary envelope, returning the request and a context with the restored SpanContext (if present).
294289func unmarshalRequestWithSpanContext [T any ](encoding Encoding [T ], value []byte ) (T , context.Context , error ) {
295290 var req T
296291 restoredContext := context .Background ()
297- var envelope marshaledRequestWithSpanContext
298- if err := json .Unmarshal (value , & envelope ); err != nil {
299- return req , restoredContext , err
292+ if len (value ) < 8 {
293+ return req , restoredContext , errors .New ("envelope too short" )
294+ }
295+ reqLen := binary .LittleEndian .Uint32 (value [:4 ])
296+ if len (value ) < int (4 + reqLen + 4 ) {
297+ return req , restoredContext , errors .New ("envelope too short for request" )
300298 }
301- request , err := encoding .Unmarshal (envelope .RequestBytes )
299+ reqBytes := value [4 : 4 + reqLen ]
300+ scLen := binary .LittleEndian .Uint32 (value [4 + reqLen : 8 + reqLen ])
301+ if len (value ) < int (8 + reqLen + scLen ) {
302+ return req , restoredContext , errors .New ("envelope too short for span context" )
303+ }
304+ scBytes := value [8 + reqLen : 8 + reqLen + scLen ]
305+ // Unmarshal request
306+ r , err := encoding .Unmarshal (reqBytes )
302307 if err != nil {
303308 return req , restoredContext , err
304309 }
305- if len (envelope .SpanContextJSON ) > 0 {
310+ req = r
311+ // Unmarshal span context if present
312+ if scLen > 0 {
306313 var wrapper spanContextConfigWrapper
307- if err := json .Unmarshal (envelope . SpanContextJSON , & wrapper ); err == nil {
314+ if err := json .Unmarshal (scBytes , & wrapper ); err == nil {
308315 if sc , err := spanContextFromWrapper (wrapper ); err == nil && sc != nil {
309316 restoredContext = trace .ContextWithSpanContext (restoredContext , * sc )
310317 }
311318 }
312319 }
313- return request , restoredContext , nil
320+ return req , restoredContext , nil
314321}
315322
316- // marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a marshaledRequestWithSpanContext envelope as bytes.
323+ // marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a binary envelope as bytes.
317324func marshalRequestWithSpanContext [T any ](ctx context.Context , encoding Encoding [T ], req T ) ([]byte , error ) {
318325 reqBuf , err := encoding .Marshal (req )
319326 if err != nil {
@@ -326,14 +333,22 @@ func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding
326333 if err != nil {
327334 return nil , err
328335 }
329- } else {
330- scJSON = nil // Will be omitted due to omitempty
331- }
332- envelope := marshaledRequestWithSpanContext {
333- RequestBytes : reqBuf ,
334- SpanContextJSON : scJSON ,
335336 }
336- return json .Marshal (envelope )
337+ if len (reqBuf ) > int (math .MaxInt32 ) {
338+ return nil , fmt .Errorf ("request too large to encode: %d bytes" , len (reqBuf ))
339+ }
340+ if len (scJSON ) > int (math .MaxInt32 ) {
341+ return nil , fmt .Errorf ("span context too large to encode: %d bytes" , len (scJSON ))
342+ }
343+ // Compose binary envelope: [4 bytes reqLen][req][4 bytes scLen][scJSON]
344+ buf := make ([]byte , 0 , 8 + len (reqBuf )+ len (scJSON ))
345+ //nolint:gosec // G115: integer overflow conversion int -> uint32
346+ buf = binary .LittleEndian .AppendUint32 (buf , uint32 (len (reqBuf )))
347+ buf = append (buf , reqBuf ... )
348+ //nolint:gosec // G115: integer overflow conversion int -> uint32
349+ buf = binary .LittleEndian .AppendUint32 (buf , uint32 (len (scJSON )))
350+ buf = append (buf , scJSON ... )
351+ return buf , nil
337352}
338353
339354// putInternal is the internal version that requires caller to hold the mutex lock.
0 commit comments