Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
switch a.agentConfiguration.PingMode {
case "", PingModeAuto: // note: "" can happen in some tests
loops = []func(){pingLoop, streamingLoop, debouncerLoop}
bat.Acquire(actorDebouncer)
<-bat.Acquire()
bat.Acquired(actorDebouncer)

case PingModePollOnly:
loops = []func(){pingLoop}
Expand All @@ -348,7 +349,8 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
case PingModeStreamOnly:
loops = []func(){streamingLoop, debouncerLoop}
fromPingLoopCh = nil // prevent action loop listening to ping side
bat.Acquire(actorDebouncer)
<-bat.Acquire()
bat.Acquired(actorDebouncer)

default:
return fmt.Errorf("unknown ping mode %q", a.agentConfiguration.PingMode)
Expand Down
3 changes: 2 additions & 1 deletion agent/agent_worker_debouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (a *AgentWorker) runDebouncer(ctx context.Context, bat *baton, outCh chan<-
a.logger.Debug("[runDebouncer] Stopping due to context cancel")
return ctx.Err()

case <-iif(healthy, bat.Acquire(actorDebouncer)): // if the stream is healthy, take the baton if available
case <-iif(healthy, bat.Acquire()): // if the stream is healthy, take the baton if available
bat.Acquired(actorDebouncer)
a.logger.Debug("[runDebouncer] Took the baton")
// We now have the baton!
// continue below to send any pending message, if able
Expand Down
3 changes: 2 additions & 1 deletion agent/agent_worker_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (a *pingLoopState) pingLoopInner(ctx context.Context) error {
// preferred.
a.logger.Debug("[runPingLoop] Waiting for baton")
select {
case <-a.bat.Acquire(actorPingLoop): // the baton is ours!
case <-a.bat.Acquire(): // the baton is ours!
a.bat.Acquired(actorPingLoop)
a.logger.Debug("[runPingLoop] Acquired the baton")
defer func() { // <- this is why the ping loop body is in a func
a.logger.Debug("[runPingLoop] Releasing the baton")
Expand Down
86 changes: 36 additions & 50 deletions agent/baton.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import "sync"
// baton is a channel-based mutex. This allows for using it as part of a select
// statement.
type baton struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have a test suite which picked this up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer your question as written: no, there's no existing test suite that picked it up. I don't mind adding a test though.

mu sync.Mutex
holder string
acquire map[string]chan struct{}
mu sync.Mutex
holder string
ch chan struct{}
}

// newBaton creates a new baton for sharing among actors, each identified
// by a non-empty string.
// by a non-empty string. The baton is initially not held by anything.
func newBaton() *baton {
return &baton{
acquire: make(map[string]chan struct{}),
b := &baton{
ch: make(chan struct{}, 1),
}
b.ch <- struct{}{}
return b
}

// HeldBy reports if the actor specified by the argument holds the baton.
Expand All @@ -26,61 +28,45 @@ func (b *baton) HeldBy(by string) bool {
}

// Acquire returns a channel that receives when the baton is acquired by the
// acquirer.
func (b *baton) Acquire(by string) <-chan struct{} {
// caller.
// Be sure to call [baton.Acquired] after receiving from the channel, e.g.
//
// select {
// case <-bat.Acquire():
// bat.Acquired("me")
// defer bat.Release("me")
// ...
// }
func (b *baton) Acquire() <-chan struct{} {
// b.ch should never change, so no need to lock around it
return b.ch
}

// Acquired must be called by the actor that successfully acquired the baton
// immediately after acquiring it.
// It panics if the baton is already marked as held.
// It is necessary to separate this from [baton.Acquire] because it is practically
// impossible to reliably and atomically pass the baton and record the new holder
// at the same time without deadlocks.
func (b *baton) Acquired(by string) {
b.mu.Lock()
defer b.mu.Unlock()

// If there's an existing channel for this actor, reuse it.
ch := b.acquire[by]
if ch == nil {
ch = make(chan struct{})
}

// If nothing holds the baton currently, assign it to the caller.
// The caller won't be receiving on the channel until after we
// return it, so make the channel receivable by closing it.
if b.holder == "" {
b.holder = by
close(ch)
delete(b.acquire, by) // in case it is in the map
return ch
if b.holder != "" {
// panic is not ideal for a few reasons (a fatal log might be better),
// but keeps baton focused on being a concurrency primitive. As long as
// the panic reaches the Go runtime, Go will give us a traceback and exit.
panic("baton already held by " + b.holder)
}

// Something holds the baton, so record that this actor is
// waiting for the baton.
b.acquire[by] = ch
return ch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this deadlock? or race condition instead.

If release happened in between return ch and <-ch, then I think the worst case is that multiple parties consider themselves as holder at the same time.

But I don't see how deadlock is happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume two actors "a" and "b", one runs <-b.Acquire("a") and the other b.Release("b"). Release scans all entries in the b.acquire map to find any that are waiting on the channel. If the Release in "b" happens after Acquire returns (it must at least wait for the mutex), but before the channel is being waited on by "a", then the scan finds no channel being waited on, so assigns the holder to "" and has cleared the map as it goes.

b.holder = by
}

// Release releases the baton, if it is held by the argument.
func (b *baton) Release(by string) {
b.mu.Lock()
defer b.mu.Unlock()

// Only release if its the same actor, to prevent bugs due
// to double-releasing.
if b.holder != by {
return
}

// Attempt to pass the baton to anything still waiting for it.
for a, ch := range b.acquire {
delete(b.acquire, a)
select {
case ch <- struct{}{}:
// We were able to send a value to the channel,
// so this actor was still waiting to receive.
// Therefore this actor has acquired the baton.
b.holder = a
return
default:
// This actor has stopped waiting to receive,
// so try another.
}
}

// Nothing was still waiting on its channel,
// so now nothing holds the baton.
b.holder = ""
b.ch <- struct{}{}
}
43 changes: 43 additions & 0 deletions agent/baton_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package agent

import (
"math/rand/v2"
"sync"
"testing"
"time"
)

func TestBaton_NoDroppedBatonDeadlock(t *testing.T) {
t.Parallel()

bat := newBaton()

actor := func(n string) func() {
return func() {
time.Sleep(rand.N(1 * time.Microsecond))
<-bat.Acquire()
bat.Acquired(n)
time.Sleep(rand.N(1 * time.Microsecond))
bat.Release(n)
}
}

done := make(chan struct{})

go func() {
for range 10000 {
var wg sync.WaitGroup
wg.Go(actor("a"))
wg.Go(actor("b"))
wg.Wait()
}
close(done)
}()

select {
case <-done:
// It probably doesn't deadlock that way
case <-time.After(10 * time.Second):
t.Error("Repeated baton.Acquire/Release failed to progress, possible deadlock")
}
}