diff --git a/agent/agent_worker.go b/agent/agent_worker.go index 5625351056..e670bc350b 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -339,7 +339,8 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr switch a.agentConfiguration.PingMode { case "", PingModeAuto: // note: "" can happen in some tests loops = []func(){pingLoop, streamingLoop, debouncerLoop} - bat.Acquire(actorDebouncer) + <-bat.Acquire() + bat.Acquired(actorDebouncer) case PingModePollOnly: loops = []func(){pingLoop} @@ -348,7 +349,8 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr case PingModeStreamOnly: loops = []func(){streamingLoop, debouncerLoop} fromPingLoopCh = nil // prevent action loop listening to ping side - bat.Acquire(actorDebouncer) + <-bat.Acquire() + bat.Acquired(actorDebouncer) default: return fmt.Errorf("unknown ping mode %q", a.agentConfiguration.PingMode) diff --git a/agent/agent_worker_debouncer.go b/agent/agent_worker_debouncer.go index f09cc0b000..c38e1616da 100644 --- a/agent/agent_worker_debouncer.go +++ b/agent/agent_worker_debouncer.go @@ -67,7 +67,8 @@ func (a *AgentWorker) runDebouncer(ctx context.Context, bat *baton, outCh chan<- a.logger.Debug("[runDebouncer] Stopping due to context cancel") return ctx.Err() - case <-iif(healthy, bat.Acquire(actorDebouncer)): // if the stream is healthy, take the baton if available + case <-iif(healthy, bat.Acquire()): // if the stream is healthy, take the baton if available + bat.Acquired(actorDebouncer) a.logger.Debug("[runDebouncer] Took the baton") // We now have the baton! // continue below to send any pending message, if able diff --git a/agent/agent_worker_ping.go b/agent/agent_worker_ping.go index 7e4957ad03..2dd1cbac45 100644 --- a/agent/agent_worker_ping.go +++ b/agent/agent_worker_ping.go @@ -112,7 +112,8 @@ func (a *pingLoopState) pingLoopInner(ctx context.Context) error { // preferred. a.logger.Debug("[runPingLoop] Waiting for baton") select { - case <-a.bat.Acquire(actorPingLoop): // the baton is ours! + case <-a.bat.Acquire(): // the baton is ours! + a.bat.Acquired(actorPingLoop) a.logger.Debug("[runPingLoop] Acquired the baton") defer func() { // <- this is why the ping loop body is in a func a.logger.Debug("[runPingLoop] Releasing the baton") diff --git a/agent/baton.go b/agent/baton.go index 0603935a75..a8e5eea151 100644 --- a/agent/baton.go +++ b/agent/baton.go @@ -5,17 +5,19 @@ import "sync" // baton is a channel-based mutex. This allows for using it as part of a select // statement. type baton struct { - mu sync.Mutex - holder string - acquire map[string]chan struct{} + mu sync.Mutex + holder string + ch chan struct{} } // newBaton creates a new baton for sharing among actors, each identified -// by a non-empty string. +// by a non-empty string. The baton is initially not held by anything. func newBaton() *baton { - return &baton{ - acquire: make(map[string]chan struct{}), + b := &baton{ + ch: make(chan struct{}, 1), } + b.ch <- struct{}{} + return b } // HeldBy reports if the actor specified by the argument holds the baton. @@ -26,61 +28,45 @@ func (b *baton) HeldBy(by string) bool { } // Acquire returns a channel that receives when the baton is acquired by the -// acquirer. -func (b *baton) Acquire(by string) <-chan struct{} { +// caller. +// Be sure to call [baton.Acquired] after receiving from the channel, e.g. +// +// select { +// case <-bat.Acquire(): +// bat.Acquired("me") +// defer bat.Release("me") +// ... +// } +func (b *baton) Acquire() <-chan struct{} { + // b.ch should never change, so no need to lock around it + return b.ch +} + +// Acquired must be called by the actor that successfully acquired the baton +// immediately after acquiring it. +// It panics if the baton is already marked as held. +// It is necessary to separate this from [baton.Acquire] because it is practically +// impossible to reliably and atomically pass the baton and record the new holder +// at the same time without deadlocks. +func (b *baton) Acquired(by string) { b.mu.Lock() defer b.mu.Unlock() - - // If there's an existing channel for this actor, reuse it. - ch := b.acquire[by] - if ch == nil { - ch = make(chan struct{}) - } - - // If nothing holds the baton currently, assign it to the caller. - // The caller won't be receiving on the channel until after we - // return it, so make the channel receivable by closing it. - if b.holder == "" { - b.holder = by - close(ch) - delete(b.acquire, by) // in case it is in the map - return ch + if b.holder != "" { + // panic is not ideal for a few reasons (a fatal log might be better), + // but keeps baton focused on being a concurrency primitive. As long as + // the panic reaches the Go runtime, Go will give us a traceback and exit. + panic("baton already held by " + b.holder) } - - // Something holds the baton, so record that this actor is - // waiting for the baton. - b.acquire[by] = ch - return ch + b.holder = by } // Release releases the baton, if it is held by the argument. func (b *baton) Release(by string) { b.mu.Lock() defer b.mu.Unlock() - - // Only release if its the same actor, to prevent bugs due - // to double-releasing. if b.holder != by { return } - - // Attempt to pass the baton to anything still waiting for it. - for a, ch := range b.acquire { - delete(b.acquire, a) - select { - case ch <- struct{}{}: - // We were able to send a value to the channel, - // so this actor was still waiting to receive. - // Therefore this actor has acquired the baton. - b.holder = a - return - default: - // This actor has stopped waiting to receive, - // so try another. - } - } - - // Nothing was still waiting on its channel, - // so now nothing holds the baton. b.holder = "" + b.ch <- struct{}{} } diff --git a/agent/baton_test.go b/agent/baton_test.go new file mode 100644 index 0000000000..415edd8445 --- /dev/null +++ b/agent/baton_test.go @@ -0,0 +1,43 @@ +package agent + +import ( + "math/rand/v2" + "sync" + "testing" + "time" +) + +func TestBaton_NoDroppedBatonDeadlock(t *testing.T) { + t.Parallel() + + bat := newBaton() + + actor := func(n string) func() { + return func() { + time.Sleep(rand.N(1 * time.Microsecond)) + <-bat.Acquire() + bat.Acquired(n) + time.Sleep(rand.N(1 * time.Microsecond)) + bat.Release(n) + } + } + + done := make(chan struct{}) + + go func() { + for range 10000 { + var wg sync.WaitGroup + wg.Go(actor("a")) + wg.Go(actor("b")) + wg.Wait() + } + close(done) + }() + + select { + case <-done: + // It probably doesn't deadlock that way + case <-time.After(10 * time.Second): + t.Error("Repeated baton.Acquire/Release failed to progress, possible deadlock") + } +}