Skip to content

Commit 75bc012

Browse files
authored
[SYNTH-22405]: Fix Synthetics Scheduler Flush by locking it (#41954)
### What does this PR do? This PR fixes an issue in the Synthetics Scheduler where we weren't locking the flush function and therefore we had race conditions when updating the nextRun value for tests. ### Motivation ### Describe how you validated your changes Local execution of the Agent. ### Additional Notes
1 parent 4344049 commit 75bc012

1 file changed

Lines changed: 16 additions & 14 deletions

File tree

comp/syntheticstestscheduler/impl/worker.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,26 @@ func (s *syntheticsTestScheduler) flushLoop(ctx context.Context) {
5959

6060
// flush enqueues tests whose nextRun is due.
6161
func (s *syntheticsTestScheduler) flush(flushTime time.Time) {
62+
s.state.mu.Lock()
63+
defer s.state.mu.Unlock()
64+
var testsToRun []*runningTestState
6265
for id, rt := range s.state.tests {
6366
if flushTime.After(rt.nextRun) || flushTime.Equal(rt.nextRun) {
64-
s.log.Debugf("enqueuing test %s", id)
65-
s.syntheticsTestProcessingChan <- SyntheticsTestCtx{
66-
nextRun: flushTime,
67-
cfg: rt.cfg,
68-
}
69-
s.updateTestState(rt)
67+
s.log.Debugf("test %s is due for execution", id)
68+
testsToRun = append(testsToRun, rt)
7069
}
7170
}
71+
72+
for _, rt := range testsToRun {
73+
s.log.Debugf("enqueuing test %s", rt.cfg.PublicID)
74+
s.syntheticsTestProcessingChan <- SyntheticsTestCtx{
75+
nextRun: flushTime,
76+
cfg: rt.cfg,
77+
}
78+
79+
rt.lastRun = rt.nextRun
80+
rt.nextRun = rt.nextRun.Add(time.Duration(rt.cfg.Interval) * time.Second)
81+
}
7282
}
7383

7484
// runWorker is the main loop for a single worker.
@@ -212,14 +222,6 @@ type SyntheticsTestCtx struct {
212222
cfg common.SyntheticsTestConfig
213223
}
214224

215-
// updateTestState updates lastRun and nextRun for a running test.
216-
func (s *syntheticsTestScheduler) updateTestState(rt *runningTestState) {
217-
s.state.mu.Lock()
218-
defer s.state.mu.Unlock()
219-
rt.lastRun = rt.nextRun
220-
rt.nextRun = rt.nextRun.Add(time.Duration(rt.cfg.Interval) * time.Second)
221-
}
222-
223225
// sendSyntheticsTestResult marshals the workerResult and forwards it via the epForwarder.
224226
func (s *syntheticsTestScheduler) sendSyntheticsTestResult(w *workerResult) error {
225227
res, err := s.networkPathToTestResult(w)

0 commit comments

Comments
 (0)