Skip to content

Commit 1f9cbc0

Browse files
authored
Merge pull request #323 from Shopify/ack-race-cond
Refactor test ACK to reduce the race condition window
2 parents a50405e + 4a1594d commit 1f9cbc0

File tree

6 files changed

+58
-29
lines changed

6 files changed

+58
-29
lines changed

ruby/lib/ci/queue/redis/build_record.rb

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,29 @@ def record_warning(type, attributes)
5656
redis.rpush(key('warnings'), Marshal.dump([type, attributes]))
5757
end
5858

59+
Test = Struct.new(:id) # Hack
60+
5961
def record_error(id, payload, stats: nil)
60-
redis.pipelined do |pipeline|
61-
pipeline.hset(
62-
key('error-reports'),
63-
id.dup.force_encoding(Encoding::BINARY),
64-
payload.dup.force_encoding(Encoding::BINARY),
65-
)
66-
pipeline.expire(key('error-reports'), config.redis_ttl)
67-
record_stats(stats, pipeline: pipeline)
62+
# FIXME: the ack and hset should be atomic
63+
# otherwise the reporter may see the empty queue before the error
64+
# is appended and wrongly think it's a success.
65+
if @queue.acknowledge(Test.new(id))
66+
redis.pipelined do |pipeline|
67+
pipeline.hset(
68+
key('error-reports'),
69+
id.dup.force_encoding(Encoding::BINARY),
70+
payload.dup.force_encoding(Encoding::BINARY),
71+
)
72+
pipeline.expire(key('error-reports'), config.redis_ttl)
73+
record_stats(stats, pipeline: pipeline)
74+
@queue.increment_test_failed
75+
end
6876
end
6977
nil
7078
end
7179

72-
def record_success(id, stats: nil, skip_flaky_record: false)
80+
def record_success(id, stats: nil, skip_flaky_record: false, acknowledge: true)
81+
@queue.acknowledge(Test.new(id)) if acknowledge
7382
error_reports_deleted_count, requeued_count, _ = redis.pipelined do |pipeline|
7483
pipeline.hdel(key('error-reports'), id.dup.force_encoding(Encoding::BINARY))
7584
pipeline.hget(key('requeues-count'), id.b)

ruby/lib/minitest/queue.rb

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def self.relative_path(path, root: project_root)
150150
end
151151

152152
class SingleExample
153+
attr_reader :method_name
153154

154155
def initialize(runnable, method_name)
155156
@runnable = runnable
@@ -250,15 +251,8 @@ def run_from_queue(reporter, *)
250251

251252
if failed && CI::Queue.requeueable?(result) && queue.requeue(example)
252253
result.requeue!
253-
reporter.record(result)
254-
elsif queue.acknowledge(example)
255-
reporter.record(result)
256-
queue.increment_test_failed if failed
257-
elsif !failed
258-
# If the test was already acknowledged by another worker (we timed out)
259-
# Then we only record it if it is successful.
260-
reporter.record(result)
261254
end
255+
reporter.record(result)
262256
end
263257
queue.stop_heartbeat!
264258
rescue Errno::EPIPE

ruby/lib/minitest/queue/build_status_recorder.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def record(test)
5252
if (test.failure || test.error?) && !test.skipped?
5353
build.record_error("#{test.klass}##{test.name}", dump(test), stats: stats)
5454
else
55-
build.record_success("#{test.klass}##{test.name}", stats: stats, skip_flaky_record: test.skipped?)
55+
build.record_success("#{test.klass}##{test.name}", stats: stats, skip_flaky_record: test.skipped?, acknowledge: !test.requeued?)
5656
end
5757
end
5858

ruby/lib/rspec/queue.rb

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,13 +224,8 @@ def finish(reporter, acknowledge: true)
224224
reporter.cancel_run!
225225
dup.mark_as_requeued!(reporter)
226226
return true
227-
elsif reporter.acknowledge || !@exception
228-
# If the test was already acknowledged by another worker (we timed out)
229-
# Then we only record it if it is successful.
230-
super(reporter)
231227
else
232-
reporter.cancel_run!
233-
return
228+
super(reporter)
234229
end
235230
else
236231
super(reporter)

ruby/test/integration/minitest_redis_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ def test_retry_report
514514
'skips' => 0,
515515
'requeues' => 0,
516516
'total_time' => index + 1,
517-
})
517+
}, acknowledge: false)
518518
end
519519

520520
# Retry first worker, bailing out

ruby/test/minitest/queue/build_status_recorder_test.rb

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,28 @@ def setup
1515
end
1616

1717
def test_aggregation
18+
reserve(@queue, "a")
1819
@reporter.record(result('a', failure: "Something went wrong"))
20+
reserve(@queue, "b")
1921
@reporter.record(result('b', unexpected_error: true))
22+
reserve(@queue, "h")
2023
@reporter.record(result('h', failure: "Something went wrong", requeued: true))
2124

2225
second_queue = worker(2)
2326
second_reporter = BuildStatusRecorder.new(build: second_queue.build)
2427
second_reporter.start
2528

29+
reserve(second_queue, "c")
2630
second_reporter.record(result('c', failure: "Something went wrong"))
31+
reserve(second_queue, "d")
2732
second_reporter.record(result('d', unexpected_error: true))
33+
reserve(second_queue, "e")
2834
second_reporter.record(result('e', skipped: true))
35+
reserve(second_queue, "f")
2936
second_reporter.record(result('f', unexpected_error: true))
37+
reserve(second_queue, "g")
3038
second_reporter.record(result('g', requeued: true))
39+
reserve(second_queue, "h")
3140
second_reporter.record(result('h', skipped: true, requeued: true))
3241

3342
assert_equal 9, summary.assertions
@@ -40,19 +49,39 @@ def test_aggregation
4049
end
4150

4251
def test_retrying_test
43-
@reporter.record(result('a', failure: "Something went wrong"))
44-
assert_equal 1, summary.error_reports.size
52+
yielded = false
53+
54+
test = nil
55+
56+
@queue.poll do |_test|
57+
test = _test
58+
assert_equal "a", test.method_name
59+
@reporter.record(result(test.method_name, failure: "Something went wrong"))
60+
61+
assert_equal 1, summary.error_reports.size
62+
63+
yielded = true
64+
break
65+
end
66+
67+
assert yielded, "@queue.poll didn't yield"
4568

4669
second_queue = worker(2)
4770
second_reporter = BuildStatusRecorder.new(build: second_queue.build)
4871
second_reporter.start
4972

50-
second_reporter.record(result('a'))
73+
# pretend we reserved the same test again
74+
reserve(second_queue, "a")
75+
second_reporter.record(result("a"))
5176
assert_equal 0, summary.error_reports.size
5277
end
5378

5479
private
5580

81+
def reserve(queue, method_name)
82+
queue.instance_variable_set(:@reserved_test, Minitest::Queue::SingleExample.new("Minitest::Test", method_name).id)
83+
end
84+
5685
def worker(id)
5786
CI::Queue::Redis.new(
5887
@redis_url,
@@ -61,7 +90,9 @@ def worker(id)
6190
worker_id: id.to_s,
6291
timeout: 0.2,
6392
),
64-
).populate([])
93+
).populate([
94+
Minitest::Queue::SingleExample.new("Minitest::Test", "a")
95+
])
6596
end
6697

6798
def summary

0 commit comments

Comments
 (0)