Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions agent/agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,12 @@ func (r *AgentPool) StopGracefully() {
// cancellation to finish.
func (r *AgentPool) StopUngracefully() {
var wg sync.WaitGroup
wg.Add(len(r.workers))
for _, worker := range r.workers {
// Because StopUngracefully calls the job runner's Cancel, which blocks,
// concurrently stop all the workers.
// The number of concurrent Stops is bounded by the spawn count, and
// there already exists a handful of goroutines per worker.
go func() {
worker.StopUngracefully()
wg.Done()
}()
wg.Go(worker.StopUngracefully)
}
wg.Wait()
}
Expand Down
13 changes: 3 additions & 10 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,12 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
fromStreamingLoopCh := make(chan actionMessage) // streaming loop to debouncer
fromDebouncerCh := make(chan actionMessage) // debouncer to action handler

// Start the loops and block until they have all stopped.
// Based on configuration, we have our choice of ping loop,
// streaming loop+debouncer loop, or both.
var wg sync.WaitGroup

pingLoop := func() {
defer wg.Done()
errCh <- a.runPingLoop(ctx, bat, fromPingLoopCh)
}
streamingLoop := func() {
defer wg.Done()
err := a.runStreamingPingLoop(ctx, fromStreamingLoopCh)
if err != nil {
switch a.agentConfiguration.PingMode {
Expand All @@ -331,7 +326,6 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr
errCh <- err
}
debouncerLoop := func() {
defer wg.Done()
errCh <- a.runDebouncer(ctx, bat, fromDebouncerCh, fromStreamingLoopCh)
}

Expand All @@ -358,15 +352,14 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr

// There's always an action handler.
actionLoop := func() {
defer wg.Done()
errCh <- a.runActionLoop(ctx, idleMon, fromPingLoopCh, fromDebouncerCh)
}
loops = append(loops, actionLoop)

// Go loops!
wg.Add(len(loops))
// Start the loops and block until they have all stopped.
var wg sync.WaitGroup
for _, l := range loops {
go l()
wg.Go(l)
}
wg.Wait()

Expand Down
13 changes: 5 additions & 8 deletions agent/integration/job_runner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,19 +421,16 @@ func TestChunksIntervalSeconds_ControlsUploadTiming(t *testing.T) {
t.Run("2s interval should upload fewer chunks than 1s interval", func(t *testing.T) {
var count1s, count2s int

wg := &sync.WaitGroup{}
wg.Add(2)
var wg sync.WaitGroup

// these run for 4 seconds, so we run them in parallel to not quite so much wall-clock time
go func() {
defer wg.Done()
wg.Go(func() {
count1s = runTestWithInterval(t, 1)
}()
})

go func() {
defer wg.Done()
wg.Go(func() {
count2s = runTestWithInterval(t, 2)
}()
})

wg.Wait()

Expand Down
9 changes: 2 additions & 7 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,17 +767,12 @@ func (r *JobRunner) executePreBootstrapHook(ctx context.Context, hook string) (b
// jobCancellationChecker waits for the processes to start, then continuously
// polls GetJobState to see if the job has been cancelled server-side. If so,
// it calls r.Cancel.
func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGroup) {
func (r *JobRunner) jobCancellationChecker(ctx context.Context) {
ctx, setStat, done := status.AddSimpleItem(ctx, "Job Cancellation Checker")
defer done()
setStat("Starting...")

defer func() {
// Mark this routine as done in the wait group
wg.Done()

r.agentLogger.Debug("[JobRunner] Routine that refreshes the job has finished")
}()
defer r.agentLogger.Debug("[JobRunner] Routine that refreshes the job has finished")

select {
case <-r.process.Started():
Expand Down
4 changes: 1 addition & 3 deletions agent/log_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ func (ls *LogStreamer) Start(ctx context.Context) error {
ls.conf.MaxSizeBytes = defaultLogMaxSize
}

ls.workerWG.Add(ls.conf.Concurrency)
for i := range ls.conf.Concurrency {
go ls.worker(ctx, i)
ls.workerWG.Go(func() { ls.worker(ctx, i) })
}

return nil
Expand Down Expand Up @@ -180,7 +179,6 @@ func (ls *LogStreamer) worker(ctx context.Context, id int) {
ls.logger.Debug("[LogStreamer/Worker#%d] Worker is starting...", id)

defer ls.logger.Debug("[LogStreamer/Worker#%d] Worker has shutdown", id)
defer ls.workerWG.Done()

ctx, setStat, done := status.AddSimpleItem(ctx, fmt.Sprintf("Log Streamer Worker %d", id))
defer done()
Expand Down
8 changes: 3 additions & 5 deletions agent/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,8 @@ func (r *JobRunner) Run(ctx context.Context, ignoreAgentInDispatches *bool) (err
}

// Kick off log streaming and job status checking when the process starts.
wg.Add(2)
go r.streamJobLogsAfterProcessStart(cctx, &wg)
go r.jobCancellationChecker(cctx, &wg)
wg.Go(func() { r.streamJobLogsAfterProcessStart(cctx) })
wg.Go(func() { r.jobCancellationChecker(cctx) })

exit = r.runJob(cctx)
// The defer mutates the error return in some cases.
Expand Down Expand Up @@ -443,13 +442,12 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit core.P

// streamJobLogsAfterProcessStart waits for the process to start, then grabs the job output
// every few seconds and sends it back to Buildkite.
func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync.WaitGroup) {
func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context) {
ctx, setStat, done := status.AddSimpleItem(ctx, "Job Log Streamer")
defer done()
setStat("🏃 Starting...")

defer func() {
wg.Done()
r.agentLogger.Debug("[JobRunner] Routine that processes the log has finished")
}()

Expand Down
6 changes: 2 additions & 4 deletions clicommand/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,15 +1565,13 @@ func agentLifecycleHook(hookName string, log logger.Logger, cfg AgentStartConfig
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
scan := bufio.NewScanner(r) // log each line separately
log = log.WithFields(logger.StringField("hook", hookName))
for scan.Scan() {
log.Info(scan.Text())
}
}()
})
defer func() {
_ = w.Close() // closing the writer ends scan.Scan and lets wg.Wait return
wg.Wait()
Expand Down
7 changes: 2 additions & 5 deletions internal/artifact/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ func (a *Downloader) Download(ctx context.Context) error {
var wg sync.WaitGroup
artifactsCh := make(chan *api.Artifact)
for range min(10*runtime.GOMAXPROCS(0), len(artifacts)) {
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
for {
var artifact *api.Artifact
var open bool
Expand Down Expand Up @@ -146,7 +143,7 @@ func (a *Downloader) Download(ctx context.Context) error {
}
}
}
}()
})
}

// Send the artifacts to the workers then signal completion by closing the
Expand Down
18 changes: 5 additions & 13 deletions internal/artifact/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,11 @@ func (a *Uploader) collect(ctx context.Context) ([]*api.Artifact, error) {
defer cancel(nil)
var wg sync.WaitGroup
for range runtime.GOMAXPROCS(0) {
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
if err := ac.worker(wctx, filesCh); err != nil {
cancel(err)
}
}()
})
}

fileFinder := a.glob
Expand Down Expand Up @@ -476,9 +473,6 @@ type workUnitResult struct {
type artifactUploadWorker struct {
*Uploader

// Counts the worker goroutines.
wg sync.WaitGroup

// A tracker for every artifact.
// The map is written at the start of upload, and other goroutines only read
// afterwards.
Expand Down Expand Up @@ -562,9 +556,9 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, upload
go worker.stateUpdater(ctx, resultsCh, errCh)

// Worker goroutines that work on work units.
var wg sync.WaitGroup
for range runtime.GOMAXPROCS(0) {
worker.wg.Add(1)
go worker.doWorkUnits(ctx, unitsCh, resultsCh)
wg.Go(func() { worker.doWorkUnits(ctx, unitsCh, resultsCh) })
}

// Send the work units for each artifact to the workers.
Expand All @@ -585,7 +579,7 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, upload
a.logger.Debug("Waiting for uploads to complete...")

// Wait for the workers to finish
worker.wg.Wait()
wg.Wait()

// Since the workers are done, all work unit states have been sent to the
// state updater.
Expand All @@ -604,8 +598,6 @@ func (a *Uploader) upload(ctx context.Context, artifacts []*api.Artifact, upload
}

func (a *artifactUploadWorker) doWorkUnits(ctx context.Context, unitsCh <-chan workUnit, resultsCh chan<- workUnitResult) {
defer a.wg.Done()

for {
select {
case <-ctx.Done():
Expand Down
14 changes: 4 additions & 10 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,7 @@ func restoreWithClient(ctx context.Context, l logger.Logger, client CacheClient,
var wg sync.WaitGroup

for range workerCount {
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
for {
select {
case cacheID, open := <-cacheIDsCh:
Expand Down Expand Up @@ -217,7 +214,7 @@ func restoreWithClient(ctx context.Context, l logger.Logger, client CacheClient,
return
}
}
}()
})
}

sendLoop:
Expand Down Expand Up @@ -253,10 +250,7 @@ func saveWithClient(ctx context.Context, l logger.Logger, client CacheClient, ca
var wg sync.WaitGroup

for range workerCount {
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
for {
select {
case cacheID, open := <-cacheIDsCh:
Expand Down Expand Up @@ -295,7 +289,7 @@ func saveWithClient(ctx context.Context, l logger.Logger, client CacheClient, ca
return
}
}
}()
})
}

sendLoop:
Expand Down
7 changes: 2 additions & 5 deletions internal/job/integration/hooks_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,15 +542,12 @@ func TestPreExitHooksFireAfterCancel(t *testing.T) {
tester.ExpectLocalHook("pre-exit").Once()

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
wg.Go(func() {
if err := tester.Run(t, "BUILDKITE_COMMAND=sleep 5"); err == nil {
t.Errorf(`tester.Run(t, "BUILDKITE_COMMAND=sleep 5") = %v, want non-nil error`, err)
}
t.Logf("Command finished")
}()
})

time.Sleep(time.Millisecond * 500)
tester.Cancel()
Expand Down
12 changes: 4 additions & 8 deletions lock/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,11 @@ func TestLocker(t *testing.T) {
var wg sync.WaitGroup
var locks int
for range 10 {
wg.Add(1)
go func() {
wg.Go(func() {
l.Lock()
locks++
l.Unlock()
wg.Done()
}()
})
}

wg.Wait()
Expand All @@ -126,15 +124,13 @@ func TestDoOnce(t *testing.T) {
var wg sync.WaitGroup
var calls atomic.Int32
for range 10 {
wg.Add(1)
go func() {
wg.Go(func() {
if err := cli.DoOnce(ctx, "once", func() {
calls.Add(1)
}); err != nil {
t.Errorf("Client.DoOnce(ctx, once, inc) = %v", err)
}
wg.Done()
}()
})
}

wg.Wait()
Expand Down
8 changes: 2 additions & 6 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ func (p *Process) Run(ctx context.Context) error {
// Signal waiting consumers in Started() by closing the started channel
close(p.started)

waitGroup.Add(1)

go func() {
waitGroup.Go(func() {
p.logger.Debug("[Process] Starting to copy PTY to the buffer")

// Copy the pty to our writer. This will block until it EOFs or something breaks.
Expand All @@ -222,9 +220,7 @@ func (p *Process) Run(ctx context.Context) error {
default:
p.logger.Error("[Process] PTY output copy failed with error: %T: %v", err, err)
}

waitGroup.Done()
}()
})
} else {
p.logger.Debug("[Process] Running without a PTY")

Expand Down
8 changes: 2 additions & 6 deletions process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,12 @@ func TestProcessRunsAndSignalsStartedAndStopped(t *testing.T) {
})

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

wg.Go(func() {
<-p.Started()
atomic.AddInt32(&started, 1)
<-p.Done()
atomic.AddInt32(&done, 1)
}()
})

// wait for the process to finish
if err := p.Run(context.Background()); err != nil {
Expand Down