@@ -289,66 +289,82 @@ func (l *outStreamList) dequeue() *outStream {
289289}
290290
291291// controlBuffer is a way to pass information to loopy.
292- // Information is passed as specific struct types called control frames.
293- // A control frame not only represents data, messages or headers to be sent out
294- // but can also be used to instruct loopy to update its internal state.
295- // It shouldn't be confused with an HTTP2 frame, although some of the control frames
296- // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
292+ //
293+ // Information is passed as specific struct types called control frames. A
294+ // control frame not only represents data, messages or headers to be sent out
295+ // but can also be used to instruct loopy to update its internal state. It
296+ // shouldn't be confused with an HTTP2 frame, although some of the control
297+ // frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
297298type controlBuffer struct {
298- ch chan struct {}
299- done <- chan struct {}
299+ wakeupCh chan struct {} // Unblocks readers waiting for something to read.
300+ done <- chan struct {} // Closed when the transport is done.
301+
302+ // Mutex guards all the fields below, except trfChan which can be read
303+ // atomically without holding mu.
300304 mu sync.Mutex
301- consumerWaiting bool
302- list * itemList
303- err error
305+ consumerWaiting bool // True when readers are blocked waiting for new data.
306+ closed bool // True when the controlbuf is finished.
307+ list * itemList // List of queued control frames.
304308
305309 // transportResponseFrames counts the number of queued items that represent
306310 // the response of an action initiated by the peer. trfChan is created
307311 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
308312 // closed and nilled when transportResponseFrames drops below the
309313 // threshold. Both fields are protected by mu.
310314 transportResponseFrames int
311- trfChan atomic.Value // chan struct{}
315+ trfChan atomic.Pointer [ chan struct {}]
312316}
313317
314318func newControlBuffer (done <- chan struct {}) * controlBuffer {
315319 return & controlBuffer {
316- ch : make (chan struct {}, 1 ),
317- list : & itemList {},
318- done : done ,
320+ wakeupCh : make (chan struct {}, 1 ),
321+ list : & itemList {},
322+ done : done ,
319323 }
320324}
321325
322- // throttle blocks if there are too many incomingSettings/cleanupStreams in the
323- // controlbuf.
326+ // throttle blocks if there are too many frames in the control buf that
327+ // represent the response of an action initiated by the peer, like
328+ // incomingSettings cleanupStreams etc.
324329func (c * controlBuffer ) throttle () {
325- ch , _ := c .trfChan .Load ().(chan struct {})
326- if ch != nil {
330+ if ch := c .trfChan .Load (); ch != nil {
327331 select {
328- case <- ch :
332+ case <- ( * ch ) :
329333 case <- c .done :
330334 }
331335 }
332336}
333337
338+ // put adds an item to the controlbuf.
334339func (c * controlBuffer ) put (it cbItem ) error {
335340 _ , err := c .executeAndPut (nil , it )
336341 return err
337342}
338343
344+ // executeAndPut runs f, and if the return value is true, adds the given item to
345+ // the controlbuf. The item could be nil, in which case, this method simply
346+ // executes f and does not add the item to the controlbuf.
347+ //
348+ // The first return value indicates whether the item was successfully added to
349+ // the control buffer. A non-nil error, specifically ErrConnClosing, is returned
350+ // if the control buffer is already closed.
339351func (c * controlBuffer ) executeAndPut (f func () bool , it cbItem ) (bool , error ) {
340- var wakeUp bool
341352 c .mu .Lock ()
342- if c .err != nil {
343- c .mu .Unlock ()
344- return false , c .err
353+ defer c .mu .Unlock ()
354+
355+ if c .closed {
356+ return false , ErrConnClosing
345357 }
346358 if f != nil {
347359 if ! f () { // f wasn't successful
348- c .mu .Unlock ()
349360 return false , nil
350361 }
351362 }
363+ if it == nil {
364+ return true , nil
365+ }
366+
367+ var wakeUp bool
352368 if c .consumerWaiting {
353369 wakeUp = true
354370 c .consumerWaiting = false
@@ -359,77 +375,81 @@ func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
359375 if c .transportResponseFrames == maxQueuedTransportResponseFrames {
360376 // We are adding the frame that puts us over the threshold; create
361377 // a throttling channel.
362- c .trfChan .Store (make (chan struct {}))
378+ ch := make (chan struct {})
379+ c .trfChan .Store (& ch )
363380 }
364381 }
365- c .mu .Unlock ()
366382 if wakeUp {
367383 select {
368- case c .ch <- struct {}{}:
384+ case c .wakeupCh <- struct {}{}:
369385 default :
370386 }
371387 }
372388 return true , nil
373389}
374390
375- // Note argument f should never be nil.
376- func (c * controlBuffer ) execute (f func (it any ) bool , it any ) (bool , error ) {
377- c .mu .Lock ()
378- if c .err != nil {
379- c .mu .Unlock ()
380- return false , c .err
381- }
382- if ! f (it ) { // f wasn't successful
383- c .mu .Unlock ()
384- return false , nil
385- }
386- c .mu .Unlock ()
387- return true , nil
388- }
389-
391+ // get returns the next control frame from the control buffer. If block is true
392+ // **and** there are no control frames in the control buffer, the call blocks
393+ // until one of the conditions is met: there is a frame to return or the
394+ // transport is closed.
390395func (c * controlBuffer ) get (block bool ) (any , error ) {
391396 for {
392397 c .mu .Lock ()
393- if c .err != nil {
394- c .mu .Unlock ()
395- return nil , c .err
396- }
397- if ! c .list .isEmpty () {
398- h := c .list .dequeue ().(cbItem )
399- if h .isTransportResponseFrame () {
400- if c .transportResponseFrames == maxQueuedTransportResponseFrames {
401- // We are removing the frame that put us over the
402- // threshold; close and clear the throttling channel.
403- ch := c .trfChan .Load ().(chan struct {})
404- close (ch )
405- c .trfChan .Store ((chan struct {})(nil ))
406- }
407- c .transportResponseFrames --
408- }
398+ frame , err := c .getOnceLocked ()
399+ if frame != nil || err != nil || ! block {
400+ // If we read a frame or an error, we can return to the caller. The
401+ // call to getOnceLocked() returns a nil frame and a nil error if
402+ // there is nothing to read, and in that case, if the caller asked
403+ // us not to block, we can return now as well.
409404 c .mu .Unlock ()
410- return h , nil
411- }
412- if ! block {
413- c .mu .Unlock ()
414- return nil , nil
405+ return frame , err
415406 }
416407 c .consumerWaiting = true
417408 c .mu .Unlock ()
409+
410+ // Release the lock above and wait to be woken up.
418411 select {
419- case <- c .ch :
412+ case <- c .wakeupCh :
420413 case <- c .done :
421414 return nil , errors .New ("transport closed by client" )
422415 }
423416 }
424417}
425418
419+ // Callers must not use this method, but should instead use get().
420+ //
421+ // Caller must hold c.mu.
422+ func (c * controlBuffer ) getOnceLocked () (any , error ) {
423+ if c .closed {
424+ return false , ErrConnClosing
425+ }
426+ if c .list .isEmpty () {
427+ return nil , nil
428+ }
429+ h := c .list .dequeue ().(cbItem )
430+ if h .isTransportResponseFrame () {
431+ if c .transportResponseFrames == maxQueuedTransportResponseFrames {
432+ // We are removing the frame that put us over the
433+ // threshold; close and clear the throttling channel.
434+ ch := c .trfChan .Swap (nil )
435+ close (* ch )
436+ }
437+ c .transportResponseFrames --
438+ }
439+ return h , nil
440+ }
441+
442+ // finish closes the control buffer, cleaning up any streams that have queued
443+ // header frames. Once this method returns, no more frames can be added to the
444+ // control buffer, and attempts to do so will return ErrConnClosing.
426445func (c * controlBuffer ) finish () {
427446 c .mu .Lock ()
428- if c .err != nil {
429- c .mu .Unlock ()
447+ defer c .mu .Unlock ()
448+
449+ if c .closed {
430450 return
431451 }
432- c .err = ErrConnClosing
452+ c .closed = true
433453 // There may be headers for streams in the control buffer.
434454 // These streams need to be cleaned out since the transport
435455 // is still not aware of these yet.
@@ -442,15 +462,14 @@ func (c *controlBuffer) finish() {
442462 hdr .onOrphaned (ErrConnClosing )
443463 }
444464 }
465+
445466 // In case throttle() is currently in flight, it needs to be unblocked.
446467 // Otherwise, the transport may not close, since the transport is closed by
447468 // the reader encountering the connection error.
448- ch , _ := c .trfChan .Load ().( chan struct {} )
469+ ch := c .trfChan .Swap ( nil )
449470 if ch != nil {
450- close (ch )
471+ close (* ch )
451472 }
452- c .trfChan .Store ((chan struct {})(nil ))
453- c .mu .Unlock ()
454473}
455474
456475type side int
0 commit comments