Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 14 additions & 48 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,48 +282,15 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
}
}

// toggle ensures only one of the loops is producing actions.
//
// Here's how this works:
//
// As a channel, toggle is in one of two states: either it is empty, or
// contains 1 struct{}{} "value" (it's a buffered channel). But we can
// think of the system as having three possible states:
//
// - the toggle is available
// - the ping loop has the toggle
// - the debouncer loop has the toggle
//
// If toggle contains a value, then either loop can "take" the toggle
// immediately by receiving from the channel. Otherwise, it has to wait
// until a value is available for its receive operation to unblock.
// Similarly, "relinquishing" the toggle is a matter of sending a struct{}{}
// value to the channel. (Each side can unblock the other side.)
//
// Each loop can be doing other things while waiting to take the toggle,
// because a select statement will choose any operation that can proceed.
//
// Each loop keeps track of whether it has currently "has" the toggle or
// not, and each has a different policy for taking and relinquishing it.
//
// The ping loop waits for the toggle to become available and takes it
// before pinging, and relinquishes the toggle as soon as the action
// resulting from a ping has completed. In other words, the ping loop tries
// not to keep the toggle and politely waits for it.
//
// The debouncer (acting for the streaming side) instead holds the toggle
// as long as possible, until the stream becomes unhealthy. The ping loop
// is usually already waiting and ready to take the toggle at that point.
// If the streaming side becomes healthy again, the debouncer will try to
// take the toggle back, and most of the time this is quick because the ping
// loop spends most of its time waiting in between pings. But if the ping
// loop still has the toggle, that must be because the ping loop still has
// an action in progress (probably a job), so the debouncer must wait.
//
// The toggle initially belongs to the streaming/debouncer side, unless the
// ping mode is ping-only, in which case we seed toggle with a value below
// so the ping loop can simply take it.
toggle := make(chan struct{}, 1)
// baton ensures only one of the loops (ping or streaming/debouncer) is
// producing actions at a time. The debouncer holds the baton as long as
// streaming is healthy; when unhealthy it releases the baton so the ping
// loop can take over. The ping loop acquires before each ping and releases
// immediately after the resulting action completes.
baton := NewBaton()
pingHolder := baton.Holder()
debouncerHolder := baton.Holder()
debouncerHolder.Acquired()

// More channels to enable communication between the various loops.
fromPingLoopCh := make(chan actionMessage) // ping loop to action handler
Expand All @@ -337,15 +304,15 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr

pingLoop := func() {
defer wg.Done()
errCh <- a.runPingLoop(ctx, toggle, fromPingLoopCh)
errCh <- a.runPingLoop(ctx, pingHolder, fromPingLoopCh)
}
streamingLoop := func() {
defer wg.Done()
errCh <- a.runStreamingPingLoop(ctx, fromStreamingLoopCh)
}
debouncerLoop := func() {
defer wg.Done()
errCh <- a.runDebouncer(ctx, toggle, fromDebouncerCh, fromStreamingLoopCh)
errCh <- a.runDebouncer(ctx, debouncerHolder, fromDebouncerCh, fromStreamingLoopCh)
}

var loops []func()
Expand All @@ -354,14 +321,13 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
loops = []func(){pingLoop, streamingLoop, debouncerLoop}

case "ping-only":
// Only add the ping loop, and let it take the toggle.
loops = []func(){pingLoop}
toggle <- struct{}{}
debouncerHolder.Release()
fromDebouncerCh = nil // prevent action loop listening to streaming side
loops = []func(){pingLoop}

case "stream-only":
loops = []func(){streamingLoop, debouncerLoop}
fromPingLoopCh = nil // prevent action loop listening to ping side
loops = []func(){streamingLoop, debouncerLoop}
}

// There's always an action handler.
Expand Down
55 changes: 21 additions & 34 deletions agent/agent_worker_debouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,18 @@ import (
// action handler directly, then the "resume" may cause the agent to
// exit in a one-shot mode, even though the second "pause" means the
// user actually *did* want the agent to be paused.
func (a *AgentWorker) runDebouncer(ctx context.Context, toggle chan struct{}, outCh chan<- actionMessage, inCh <-chan actionMessage) error {
func (a *AgentWorker) runDebouncer(ctx context.Context, baton *BatonHolder, outCh chan<- actionMessage, inCh <-chan actionMessage) error {
a.logger.Debug("[runDebouncer] Starting")
defer a.logger.Debug("[runDebouncer] Exiting")

// When the debouncer returns, close the output channel to let the next
// loop know to stop listening to it.
defer close(outCh)

// We begin holding the toggle. (The ping loop is prevented from running.)
haveToggle := true

relinquishToggle := func() {
if !haveToggle {
// Nothing to do
return
}
a.logger.Debug("[runDebouncer] Relinquishing the toggle")
toggle <- struct{}{}
haveToggle = false
}
Comment on lines -39 to -50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor seems to be the main win of this PR.


// Give up the toggle when we're no longer running so that the regular
// Give up the baton when we're no longer running so that the regular
// ping loop can have a go (if it hasn't also ended).
defer relinquishToggle()
// Release is idempotent, so this is safe even if already released.
defer baton.Release()

// This "closed" channel is used for a little hack below.
// `<-cmp.Or(lastActionResult, closed)` will receive from:
Expand All @@ -73,9 +61,9 @@ func (a *AgentWorker) runDebouncer(ctx context.Context, toggle chan struct{}, ou
pending := false

// Is the stream healthy?
// If so, take the toggle (which blocks the ping loop).
// If not, return the toggle (unblocking the ping loop).
// Returning the toggle may have to wait for the current action to complete.
// If so, take the baton (which blocks the ping loop).
// If not, release the baton (unblocking the ping loop).
// Releasing may have to wait for the current action to complete.
healthy := true

for {
Expand All @@ -87,10 +75,9 @@ func (a *AgentWorker) runDebouncer(ctx context.Context, toggle chan struct{}, ou
a.logger.Debug("[runDebouncer] Stopping due to context cancel")
return ctx.Err()

case <-iif(healthy, toggle): // if the stream is healthy, take the toggle
a.logger.Debug("[runDebouncer] Taking the toggle")
// We have the toggle again!
haveToggle = true
case <-iif(healthy, baton.Acquire()): // if the stream is healthy, acquire the baton
baton.Acquired()
a.logger.Debug("[runDebouncer] Acquired the baton")

// Do we have a pending message?
if !pending {
Expand Down Expand Up @@ -129,33 +116,33 @@ func (a *AgentWorker) runDebouncer(ctx context.Context, toggle chan struct{}, ou
if !healthy {
a.logger.Debug("[runDebouncer] Streaming loop is unhealthy")

// It is not, so unblock the toggle as soon as we can (when the
// It is not, so release the baton as soon as we can (when the
// current action is done).
select {
case <-cmp.Or(lastActionResult, closed):
// The last action is done, or there is no last action.
// We're unhealthy, so relinquish the toggle now.
relinquishToggle()
// We're unhealthy, so release the baton now.
baton.Release()

default:
// No, wait until the action is complete to relinquish.
// No, wait until the action is complete to release.
// (Logic is in <-lastActionResult branch.)
}
continue
}

// Yes, we're healthy. Do we have the toggle?
if !haveToggle {
// No, the ping loop is currently in possession of the toggle.
// Yes, we're healthy. Do we have the baton?
if !baton.Held() {
// No, the ping loop currently holds the baton.
// Debounce messages until we have it.
a.logger.Debug("[runDebouncer] Debouncing (action %q, jobID %q) while waiting for toggle", msg.action, msg.jobID)
a.logger.Debug("[runDebouncer] Debouncing (action %q, jobID %q) while waiting for baton", msg.action, msg.jobID)
nextAction = msg.action
nextJobID = msg.jobID
pending = true
continue
}

// Yes, we have the toggle.
// Yes, we have the baton.
// Can we send this message right away?
select {
case <-cmp.Or(lastActionResult, closed):
Expand Down Expand Up @@ -192,10 +179,10 @@ func (a *AgentWorker) runDebouncer(ctx context.Context, toggle chan struct{}, ou
lastActionResult = nil
// Is the streaming side healthy?
if !healthy {
// No, we're not healthy. If we have the toggle, now is the
// No, we're not healthy. If we have the baton, now is the
// time to give it up, falling back to the ping loop.
a.logger.Debug("[runDebouncer] Streaming loop wasn't healthy earlier")
relinquishToggle()
baton.Release()
continue
}
// Yes, we're healthy. Is there a pending message to send?
Expand Down
13 changes: 7 additions & 6 deletions agent/agent_worker_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// runPingLoop runs the (classical) loop that pings Buildkite for work.
func (a *AgentWorker) runPingLoop(ctx context.Context, toggle chan struct{}, outCh chan<- actionMessage) error {
func (a *AgentWorker) runPingLoop(ctx context.Context, baton *BatonHolder, outCh chan<- actionMessage) error {
a.logger.Debug("[runPingLoop] Starting")
defer a.logger.Debug("[runPingLoop] Exiting")

Expand Down Expand Up @@ -84,13 +84,14 @@ func (a *AgentWorker) runPingLoop(ctx context.Context, toggle chan struct{}, out
// (the ping loop) _should_ be blocked from continuing.
// Return the token after any work is complete, to prevent the
// streaming loop from taking back over until then.
a.logger.Debug("[runPingLoop] Waiting for toggle")
a.logger.Debug("[runPingLoop] Waiting for baton")
select {
case <-toggle: // the toggle is ours!
a.logger.Debug("[runPingLoop] Acquired the toggle")
case <-baton.Acquire():
baton.Acquired()
a.logger.Debug("[runPingLoop] Acquired the baton")
defer func() { // <- this is why the loop body is in a func
a.logger.Debug("[runPingLoop] Relinquishing the toggle")
toggle <- struct{}{}
a.logger.Debug("[runPingLoop] Releasing the baton")
baton.Release()
}()

case <-a.stop:
Expand Down
63 changes: 63 additions & 0 deletions agent/baton.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package agent

// Baton coordinates exclusive access between multiple BatonHolders.
// At most one holder has the baton at any time. Holders interact through
// channels, making the baton composable with select statements.
type Baton struct {
ch chan struct{}
}

// NewBaton creates a new Baton. No one holds it initially.
func NewBaton() *Baton {
return &Baton{ch: make(chan struct{}, 1)}
}

// Holder returns a BatonHolder that does not initially hold the baton.
func (b *Baton) Holder() *BatonHolder {
return &BatonHolder{ch: b.ch}
}

// BatonHolder is a per-consumer handle to a Baton. It tracks whether this
// consumer currently holds the baton and provides idempotent Release.
type BatonHolder struct {
ch chan struct{}
held bool
}

// Acquire returns a channel that receives when the baton is available.
//
// Ideally, receiving from the channel and updating the holder's state would
// happen atomically. However, Go's select statement requires a bare channel
// for case expressions, so the caller must explicitly call Acquired after
// a successful receive:
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caller must explicitly call Acquired after
a successful receive:

This is a key problem in trying to abstract this well.

//
// select {
// case <-holder.Acquire():
// holder.Acquired()
// defer holder.Release()
// case <-ctx.Done():
// }
func (h *BatonHolder) Acquire() <-chan struct{} {
return h.ch
}

// Acquired marks this holder as holding the baton.
// Must be called after successfully receiving from Acquire.
func (h *BatonHolder) Acquired() {
h.held = true
}

// Held reports whether this holder currently holds the baton.
func (h *BatonHolder) Held() bool {
return h.held
}

// Release makes the baton available for another holder to acquire.
// It is idempotent — calling Release when not holding is a no-op.
func (h *BatonHolder) Release() {
if !h.held {
return
}
h.held = false
h.ch <- struct{}{}
}