Skip to content

Commit c36458a

Browse files
authored
Merge pull request #3754 from buildkite/fix-baton-deadlock
fix: potential deadlock in baton
2 parents fac8462 + 17d38d0 commit c36458a

File tree

5 files changed

+87
-54
lines changed

5 files changed

+87
-54
lines changed

agent/agent_worker.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,8 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
339339
switch a.agentConfiguration.PingMode {
340340
case "", PingModeAuto: // note: "" can happen in some tests
341341
loops = []func(){pingLoop, streamingLoop, debouncerLoop}
342-
bat.Acquire(actorDebouncer)
342+
<-bat.Acquire()
343+
bat.Acquired(actorDebouncer)
343344

344345
case PingModePollOnly:
345346
loops = []func(){pingLoop}
@@ -348,7 +349,8 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
348349
case PingModeStreamOnly:
349350
loops = []func(){streamingLoop, debouncerLoop}
350351
fromPingLoopCh = nil // prevent action loop listening to ping side
351-
bat.Acquire(actorDebouncer)
352+
<-bat.Acquire()
353+
bat.Acquired(actorDebouncer)
352354

353355
default:
354356
return fmt.Errorf("unknown ping mode %q", a.agentConfiguration.PingMode)

agent/agent_worker_debouncer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ func (a *AgentWorker) runDebouncer(ctx context.Context, bat *baton, outCh chan<-
6767
a.logger.Debug("[runDebouncer] Stopping due to context cancel")
6868
return ctx.Err()
6969

70-
case <-iif(healthy, bat.Acquire(actorDebouncer)): // if the stream is healthy, take the baton if available
70+
case <-iif(healthy, bat.Acquire()): // if the stream is healthy, take the baton if available
71+
bat.Acquired(actorDebouncer)
7172
a.logger.Debug("[runDebouncer] Took the baton")
7273
// We now have the baton!
7374
// continue below to send any pending message, if able

agent/agent_worker_ping.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ func (a *pingLoopState) pingLoopInner(ctx context.Context) error {
112112
// preferred.
113113
a.logger.Debug("[runPingLoop] Waiting for baton")
114114
select {
115-
case <-a.bat.Acquire(actorPingLoop): // the baton is ours!
115+
case <-a.bat.Acquire(): // the baton is ours!
116+
a.bat.Acquired(actorPingLoop)
116117
a.logger.Debug("[runPingLoop] Acquired the baton")
117118
defer func() { // <- this is why the ping loop body is in a func
118119
a.logger.Debug("[runPingLoop] Releasing the baton")

agent/baton.go

Lines changed: 36 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@ import "sync"
55
// baton is a channel-based mutex. This allows for using it as part of a select
66
// statement.
77
type baton struct {
8-
mu sync.Mutex
9-
holder string
10-
acquire map[string]chan struct{}
8+
mu sync.Mutex
9+
holder string
10+
ch chan struct{}
1111
}
1212

1313
// newBaton creates a new baton for sharing among actors, each identified
14-
// by a non-empty string.
14+
// by a non-empty string. The baton is initially not held by anything.
1515
func newBaton() *baton {
16-
return &baton{
17-
acquire: make(map[string]chan struct{}),
16+
b := &baton{
17+
ch: make(chan struct{}, 1),
1818
}
19+
b.ch <- struct{}{}
20+
return b
1921
}
2022

2123
// HeldBy reports if the actor specified by the argument holds the baton.
@@ -26,61 +28,45 @@ func (b *baton) HeldBy(by string) bool {
2628
}
2729

2830
// Acquire returns a channel that receives when the baton is acquired by the
29-
// acquirer.
30-
func (b *baton) Acquire(by string) <-chan struct{} {
31+
// caller.
32+
// Be sure to call [baton.Acquired] after receiving from the channel, e.g.
33+
//
34+
// select {
35+
// case <-bat.Acquire():
36+
// bat.Acquired("me")
37+
// defer bat.Release("me")
38+
// ...
39+
// }
40+
func (b *baton) Acquire() <-chan struct{} {
41+
// b.ch should never change, so no need to lock around it
42+
return b.ch
43+
}
44+
45+
// Acquired must be called by the actor that successfully acquired the baton
46+
// immediately after acquiring it.
47+
// It panics if the baton is already marked as held.
48+
// It is necessary to separate this from [baton.Acquire] because it is practically
49+
// impossible to reliably and atomically pass the baton and record the new holder
50+
// at the same time without deadlocks.
51+
func (b *baton) Acquired(by string) {
3152
b.mu.Lock()
3253
defer b.mu.Unlock()
33-
34-
// If there's an existing channel for this actor, reuse it.
35-
ch := b.acquire[by]
36-
if ch == nil {
37-
ch = make(chan struct{})
38-
}
39-
40-
// If nothing holds the baton currently, assign it to the caller.
41-
// The caller won't be receiving on the channel until after we
42-
// return it, so make the channel receivable by closing it.
43-
if b.holder == "" {
44-
b.holder = by
45-
close(ch)
46-
delete(b.acquire, by) // in case it is in the map
47-
return ch
54+
if b.holder != "" {
55+
// panic is not ideal for a few reasons (a fatal log might be better),
56+
// but keeps baton focused on being a concurrency primitive. As long as
57+
// the panic reaches the Go runtime, Go will give us a traceback and exit.
58+
panic("baton already held by " + b.holder)
4859
}
49-
50-
// Something holds the baton, so record that this actor is
51-
// waiting for the baton.
52-
b.acquire[by] = ch
53-
return ch
60+
b.holder = by
5461
}
5562

5663
// Release releases the baton, if it is held by the argument.
5764
func (b *baton) Release(by string) {
5865
b.mu.Lock()
5966
defer b.mu.Unlock()
60-
61-
// Only release if its the same actor, to prevent bugs due
62-
// to double-releasing.
6367
if b.holder != by {
6468
return
6569
}
66-
67-
// Attempt to pass the baton to anything still waiting for it.
68-
for a, ch := range b.acquire {
69-
delete(b.acquire, a)
70-
select {
71-
case ch <- struct{}{}:
72-
// We were able to send a value to the channel,
73-
// so this actor was still waiting to receive.
74-
// Therefore this actor has acquired the baton.
75-
b.holder = a
76-
return
77-
default:
78-
// This actor has stopped waiting to receive,
79-
// so try another.
80-
}
81-
}
82-
83-
// Nothing was still waiting on its channel,
84-
// so now nothing holds the baton.
8570
b.holder = ""
71+
b.ch <- struct{}{}
8672
}

agent/baton_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package agent
2+
3+
import (
4+
"math/rand/v2"
5+
"sync"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestBaton_NoDroppedBatonDeadlock(t *testing.T) {
11+
t.Parallel()
12+
13+
bat := newBaton()
14+
15+
actor := func(n string) func() {
16+
return func() {
17+
time.Sleep(rand.N(1 * time.Microsecond))
18+
<-bat.Acquire()
19+
bat.Acquired(n)
20+
time.Sleep(rand.N(1 * time.Microsecond))
21+
bat.Release(n)
22+
}
23+
}
24+
25+
done := make(chan struct{})
26+
27+
go func() {
28+
for range 10000 {
29+
var wg sync.WaitGroup
30+
wg.Go(actor("a"))
31+
wg.Go(actor("b"))
32+
wg.Wait()
33+
}
34+
close(done)
35+
}()
36+
37+
select {
38+
case <-done:
39+
// It probably doesn't deadlock that way
40+
case <-time.After(10 * time.Second):
41+
t.Error("Repeated baton.Acquire/Release failed to progress, possible deadlock")
42+
}
43+
}

0 commit comments

Comments
 (0)