diff --git a/python/ciqueue/distributed.py b/python/ciqueue/distributed.py index e56d342e..e8b9acc4 100644 --- a/python/ciqueue/distributed.py +++ b/python/ciqueue/distributed.py @@ -97,7 +97,7 @@ def acknowledge(self, test): self.key('error-reports'), self.key('requeued-by'), ], - args=[test, test, '', 0], + args=[test, '', 0], ) == 1 def requeue(self, test, offset=42): @@ -116,7 +116,7 @@ def requeue(self, test, offset=42): self.key('error-reports'), self.key('requeued-by'), ], - args=[self.max_requeues, self.global_max_requeues, test, test, offset, 0], + args=[self.max_requeues, self.global_max_requeues, test, offset, 0], ) == 1 def retry_queue(self): diff --git a/redis/_entry_helpers.lua b/redis/_entry_helpers.lua deleted file mode 100644 index b2fd2f57..00000000 --- a/redis/_entry_helpers.lua +++ /dev/null @@ -1,7 +0,0 @@ -local function test_id_from_entry(value) - if string.sub(value, 1, 1) == '{' then - local decoded = cjson.decode(value) - return decoded['test_id'] - end - return value -end diff --git a/redis/acknowledge.lua b/redis/acknowledge.lua index c98a8320..4ca40729 100644 --- a/redis/acknowledge.lua +++ b/redis/acknowledge.lua @@ -5,16 +5,15 @@ local error_reports_key = KEYS[4] local requeued_by_key = KEYS[5] local entry = ARGV[1] -local test_id = ARGV[2] -local error = ARGV[3] -local ttl = ARGV[4] +local error = ARGV[2] +local ttl = ARGV[3] redis.call('zrem', zset_key, entry) redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers redis.call('hdel', requeued_by_key, entry) -local acknowledged = redis.call('sadd', processed_key, test_id) == 1 +local acknowledged = redis.call('sadd', processed_key, entry) == 1 if acknowledged and error ~= "" then - redis.call('hset', error_reports_key, test_id, error) + redis.call('hset', error_reports_key, entry, error) redis.call('expire', error_reports_key, ttl) end diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua index 2a7006c1..4c2bc64c 100644 --- a/redis/heartbeat.lua +++ b/redis/heartbeat.lua @@ -1,5 +1,3 @@ --- @include _entry_helpers - local zset_key = KEYS[1] local processed_key = KEYS[2] local owners_key = KEYS[3] @@ -8,10 +6,8 @@ local worker_queue_key = KEYS[4] local current_time = ARGV[1] local entry = ARGV[2] -local test_id = test_id_from_entry(entry) - -- already processed, we do not need to bump the timestamp -if redis.call('sismember', processed_key, test_id) == 1 then +if redis.call('sismember', processed_key, entry) == 1 then return false end diff --git a/redis/requeue.lua b/redis/requeue.lua index 790b0cd4..e01e61d3 100644 --- a/redis/requeue.lua +++ b/redis/requeue.lua @@ -10,15 +10,14 @@ local requeued_by_key = KEYS[8] local max_requeues = tonumber(ARGV[1]) local global_max_requeues = tonumber(ARGV[2]) local entry = ARGV[3] -local test_id = ARGV[4] -local offset = ARGV[5] -local ttl = tonumber(ARGV[6]) +local offset = ARGV[4] +local ttl = tonumber(ARGV[5]) if redis.call('hget', owners_key, entry) == worker_queue_key then redis.call('hdel', owners_key, entry) end -if redis.call('sismember', processed_key, test_id) == 1 then +if redis.call('sismember', processed_key, entry) == 1 then return false end @@ -27,15 +26,15 @@ if global_requeues and global_requeues >= tonumber(global_max_requeues) then return false end -local requeues = tonumber(redis.call('hget', requeues_count_key, test_id)) +local requeues = tonumber(redis.call('hget', requeues_count_key, entry)) if requeues and requeues >= max_requeues then return false end redis.call('hincrby', requeues_count_key, '___total___', 1) -redis.call('hincrby', requeues_count_key, test_id, 1) +redis.call('hincrby', requeues_count_key, entry, 1) -redis.call('hdel', error_reports_key, test_id) +redis.call('hdel', error_reports_key, entry) local pivot = redis.call('lrange', queue_key, -1 - offset, 0 - offset)[1] if pivot then diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index a36fc067..9dfaa616 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -1,5 +1,3 @@ --- @include _entry_helpers - local zset_key = KEYS[1] local processed_key = KEYS[2] local worker_queue_key = KEYS[3] @@ -10,8 +8,7 @@ local timeout = ARGV[2] local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout) for _, test in ipairs(lost_tests) do - local test_id = test_id_from_entry(test) - if redis.call('sismember', processed_key, test_id) == 0 then + if redis.call('sismember', processed_key, test) == 0 then redis.call('zadd', zset_key, current_time, test) redis.call('lpush', worker_queue_key, test) redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership diff --git a/ruby/lib/ci/queue/build_record.rb b/ruby/lib/ci/queue/build_record.rb index f864f2a5..472b3580 100644 --- a/ruby/lib/ci/queue/build_record.rb +++ b/ruby/lib/ci/queue/build_record.rb @@ -18,17 +18,17 @@ def queue_exhausted? @queue.exhausted? end - def record_error(id, payload, stat_delta: nil) - error_reports[id] = payload + def record_error(entry, payload, stat_delta: nil) + error_reports[entry] = payload true end - def record_success(id, skip_flaky_record: false, acknowledge: true) - error_reports.delete(id) + def record_success(entry, skip_flaky_record: false, acknowledge: true) + error_reports.delete(entry) true end - def record_requeue(id) + def record_requeue(entry) true end diff --git a/ruby/lib/ci/queue/queue_entry.rb b/ruby/lib/ci/queue/queue_entry.rb index 50288603..137e6691 100644 --- a/ruby/lib/ci/queue/queue_entry.rb +++ b/ruby/lib/ci/queue/queue_entry.rb @@ -17,8 +17,6 @@ def self.parse(entry) end def self.format(test_id, file_path) - return test_id if file_path.nil? || file_path == "" - JSON.dump({ test_id: test_id, file_path: file_path }) end diff --git a/ruby/lib/ci/queue/redis/build_record.rb b/ruby/lib/ci/queue/redis/build_record.rb index a94084e8..870a7f89 100644 --- a/ruby/lib/ci/queue/redis/build_record.rb +++ b/ruby/lib/ci/queue/redis/build_record.rb @@ -33,14 +33,14 @@ def reset_worker_error end def failed_tests - redis.hkeys(key('error-reports')) + redis.hkeys(key('error-reports')).map { |entry| CI::Queue::QueueEntry.test_id(entry) } end TOTAL_KEY = "___total___" def requeued_tests requeues = redis.hgetall(key('requeues-count')) requeues.delete(TOTAL_KEY) - requeues + requeues.transform_keys { |entry| CI::Queue::QueueEntry.test_id(entry) } end def pop_warnings @@ -56,39 +56,39 @@ def record_warning(type, attributes) redis.rpush(key('warnings'), Marshal.dump([type, attributes])) end - def record_error(id, payload, stat_delta: nil) + def record_error(entry, payload, stat_delta: nil) # Run acknowledge first so we know whether we're the first to ack - acknowledged = @queue.acknowledge(id, error: payload) + acknowledged = @queue.acknowledge(entry, error: payload) if acknowledged # We were the first to ack; another worker already ack'd would get falsy from SADD @queue.increment_test_failed # Only the acknowledging worker's stats include this failure (others skip increment when ack=false). # Store so we can subtract it if another worker records success later. - store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any? + store_error_report_delta(entry, stat_delta) if stat_delta && stat_delta.any? end # Return so caller can roll back local counter when not acknowledged !!acknowledged end - def record_success(id, skip_flaky_record: false) + def record_success(entry, skip_flaky_record: false) acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction| - @queue.acknowledge(id, pipeline: transaction) - transaction.hdel(key('error-reports'), id) - transaction.hget(key('requeues-count'), id) - transaction.hget(key('error-report-deltas'), id) + @queue.acknowledge(entry, pipeline: transaction) + transaction.hdel(key('error-reports'), entry) + transaction.hget(key('requeues-count'), entry) + transaction.hget(key('error-report-deltas'), entry) end # When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution if error_reports_deleted_count.to_i > 0 && delta_json apply_error_report_delta_correction(delta_json) - redis.hdel(key('error-report-deltas'), id) + redis.hdel(key('error-report-deltas'), entry) end - record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0) + record_flaky(entry) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0) # Count this run when we ack'd or when we replaced a failure (so stats delta is applied) !!(acknowledged || error_reports_deleted_count.to_i > 0) end - def record_requeue(id) + def record_requeue(entry) true end @@ -142,11 +142,11 @@ def max_test_failed? end def error_reports - redis.hgetall(key('error-reports')) + redis.hgetall(key('error-reports')).transform_keys { |entry| CI::Queue::QueueEntry.test_id(entry) } end def flaky_reports - redis.smembers(key('flaky-reports')) + redis.smembers(key('flaky-reports')).map { |entry| CI::Queue::QueueEntry.test_id(entry) } end def record_worker_profile(profile) @@ -187,10 +187,10 @@ def key(*args) ['build', config.build_id, *args].join(':') end - def store_error_report_delta(test_id, stat_delta) + def store_error_report_delta(entry, stat_delta) # Only the acknowledging worker's stats include this test; store their delta for correction on success payload = { 'worker_id' => config.worker_id.to_s }.merge(stat_delta) - redis.hset(key('error-report-deltas'), test_id, JSON.generate(payload)) + redis.hset(key('error-report-deltas'), entry, JSON.generate(payload)) redis.expire(key('error-report-deltas'), config.redis_ttl) end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 6736bf40..40a457cd 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -147,8 +147,8 @@ def retrying? def retry_queue failures = build.failed_tests.to_set log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1) - log = log.map { |entry| queue_entry_test_id(entry) } - log.select! { |id| failures.include?(id) } + log = log.map { |entry| CI::Queue::QueueEntry.test_id(entry) } + log.select! { |test_id| failures.include?(test_id) } log.uniq! log.reverse! Retry.new(log, config, redis: redis) @@ -176,23 +176,23 @@ def report_worker_error(error) build.report_worker_error(error) end - def acknowledge(test_key, error: nil, pipeline: redis) - test_id = normalize_test_id(test_key) + def acknowledge(entry, error: nil, pipeline: redis) + test_id = CI::Queue::QueueEntry.test_id(entry) assert_reserved!(test_id) - entry = reserved_entries.fetch(test_id, queue_entry_for(test_key)) + entry = reserved_entries.fetch(test_id, entry) unreserve_entry(test_id) eval_script( :acknowledge, keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by')], - argv: [entry, test_id, error.to_s, config.redis_ttl], + argv: [entry, error.to_s, config.redis_ttl], pipeline: pipeline, ) == 1 end - def requeue(test, offset: Redis.requeue_offset) - test_id = normalize_test_id(test) + def requeue(entry, offset: Redis.requeue_offset) + test_id = CI::Queue::QueueEntry.test_id(entry) assert_reserved!(test_id) - entry = reserved_entries.fetch(test_id, queue_entry_for(test)) + entry = reserved_entries.fetch(test_id, entry) unreserve_entry(test_id) global_max_requeues = config.global_max_requeues(total) @@ -208,7 +208,7 @@ def requeue(test, offset: Redis.requeue_offset) key('error-reports'), key('requeued-by'), ], - argv: [config.max_requeues, global_max_requeues, entry, test_id, offset, config.redis_ttl], + argv: [config.max_requeues, global_max_requeues, entry, offset, config.redis_ttl], ) == 1 unless requeued @@ -255,7 +255,7 @@ def assert_reserved!(test_id) end def reserve_entry(entry) - test_id = queue_entry_test_id(entry) + test_id = CI::Queue::QueueEntry.test_id(entry) reserved_tests << test_id reserved_entries[test_id] = entry reserved_entry_ids[entry] = test_id @@ -267,19 +267,6 @@ def unreserve_entry(test_id) reserved_entry_ids.delete(entry) if entry end - def normalize_test_id(test_key) - key = test_key.respond_to?(:id) ? test_key.id : test_key - if key.is_a?(String) - cached = reserved_entry_ids[key] - return cached if cached - end - queue_entry_test_id(key) - end - - def queue_entry_test_id(entry) - CI::Queue::QueueEntry.test_id(entry) - end - def queue_entry_for(test) return test.queue_entry if test.respond_to?(:queue_entry) return test.id if test.respond_to?(:id) @@ -288,7 +275,7 @@ def queue_entry_for(test) end def resolve_entry(entry) - test_id = reserved_entry_ids[entry] || queue_entry_test_id(entry) + test_id = reserved_entry_ids[entry] || CI::Queue::QueueEntry.test_id(entry) if populated? return index[test_id] if index.key?(test_id) end @@ -387,7 +374,7 @@ def try_to_reserve_lost_test :reserve_lost, keys: [ key('running'), - key('completed'), + key('processed'), key('worker', worker_id, 'queue'), key('owners'), ], @@ -395,7 +382,10 @@ def try_to_reserve_lost_test ) if lost_test - build.record_warning(Warnings::RESERVED_LOST_TEST, test: lost_test, timeout: config.timeout) + build.record_warning(Warnings::RESERVED_LOST_TEST, test: CI::Queue::QueueEntry.test_id(lost_test), timeout: config.timeout) + if CI::Queue.debug? + $stderr.puts "[ci-queue][reserve_lost] worker=#{worker_id} test_id=#{CI::Queue::QueueEntry.test_id(lost_test)}" + end end lost_test diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index ecdf2b84..0799e4b4 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -125,12 +125,12 @@ def max_test_failed? test_failed >= config.max_test_failed end - def requeue(test) - test_key = test.id - return false unless should_requeue?(test_key) + def requeue(entry) + test_id = CI::Queue::QueueEntry.test_id(entry) + return false unless should_requeue?(test_id) - requeues[test_key] += 1 - @queue.unshift(test_key) + requeues[test_id] += 1 + @queue.unshift(test_id) true end diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index 7a2ed006..b1459b9f 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -197,8 +197,11 @@ def handle_test_result(reporter, example, result) failed = false end - if failed && CI::Queue.requeueable?(result) && queue.requeue(example) + if failed && CI::Queue.requeueable?(result) && queue.requeue(example.queue_entry) result.requeue! + if CI::Queue.debug? + $stderr.puts "[ci-queue][requeue] test_id=#{example.id} error_class=#{result.failures.first&.class} error=#{result.failures.first&.message&.lines&.first&.chomp}" + end elsif failed queue.report_failure! else @@ -327,7 +330,7 @@ def id end def queue_entry - id + @queue_entry ||= CI::Queue::QueueEntry.format(id, nil) end def <=>(other) diff --git a/ruby/lib/minitest/queue/build_status_recorder.rb b/ruby/lib/minitest/queue/build_status_recorder.rb index 35cc4919..e211c463 100644 --- a/ruby/lib/minitest/queue/build_status_recorder.rb +++ b/ruby/lib/minitest/queue/build_status_recorder.rb @@ -40,15 +40,15 @@ def record(test) self.total_time = Minitest.clock_time - start_time # Determine what type of result this is and record it - test_id = "#{test.klass}##{test.name}" + entry = test.queue_entry delta = delta_for(test) acknowledged = if (test.failure || test.error?) && !test.skipped? - build.record_error(test_id, dump(test), stat_delta: delta) + build.record_error(entry, dump(test), stat_delta: delta) elsif test.requeued? - build.record_requeue(test_id) + build.record_requeue(entry) else - build.record_success(test_id, skip_flaky_record: test.skipped?) + build.record_success(entry, skip_flaky_record: test.skipped?) end if acknowledged diff --git a/ruby/lib/rspec/queue.rb b/ruby/lib/rspec/queue.rb index c92d6703..bc0c9bc8 100644 --- a/ruby/lib/rspec/queue.rb +++ b/ruby/lib/rspec/queue.rb @@ -253,6 +253,10 @@ def id example.id end + def queue_entry + @queue_entry ||= CI::Queue::QueueEntry.format(id, nil) + end + def <=>(other) id <=> other.id end @@ -411,7 +415,7 @@ def report_failure! end def requeue - @queue.requeue(@example) + @queue.requeue(@example.queue_entry) end def cancel_run! @@ -422,7 +426,7 @@ def cancel_run! end def acknowledge - @queue.acknowledge(@example) + @queue.acknowledge(@example.queue_entry) end end diff --git a/ruby/lib/rspec/queue/build_status_recorder.rb b/ruby/lib/rspec/queue/build_status_recorder.rb index adf39cc8..3050fe28 100644 --- a/ruby/lib/rspec/queue/build_status_recorder.rb +++ b/ruby/lib/rspec/queue/build_status_recorder.rb @@ -18,12 +18,14 @@ def initialize(*) def example_passed(notification) example = notification.example - build.record_success(example.id) + entry = CI::Queue::QueueEntry.format(example.id, nil) + build.record_success(entry) end def example_failed(notification) example = notification.example - build.record_error(example.id, dump(notification)) + entry = CI::Queue::QueueEntry.format(example.id, nil) + build.record_error(entry, dump(notification)) end private diff --git a/ruby/test/ci/queue/queue_entry_test.rb b/ruby/test/ci/queue/queue_entry_test.rb index 5d869bbb..7a9ef3e2 100644 --- a/ruby/test/ci/queue/queue_entry_test.rb +++ b/ruby/test/ci/queue/queue_entry_test.rb @@ -3,7 +3,7 @@ class CI::Queue::QueueEntryTest < Minitest::Test def test_parse_without_file_path - entry = "FooTest#test_bar" + entry = CI::Queue::QueueEntry.format("FooTest#test_bar", nil) parsed = CI::Queue::QueueEntry.parse(entry) assert_equal "FooTest#test_bar", parsed[:test_id] assert_nil parsed[:file_path] @@ -17,8 +17,15 @@ def test_parse_with_file_path end def test_format_without_file_path - assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.format("FooTest#test_bar", nil) - assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.format("FooTest#test_bar", "") + entry_nil = CI::Queue::QueueEntry.format("FooTest#test_bar", nil) + parsed_nil = JSON.parse(entry_nil, symbolize_names: true) + assert_equal "FooTest#test_bar", parsed_nil[:test_id] + assert_nil parsed_nil[:file_path] + + entry_empty = CI::Queue::QueueEntry.format("FooTest#test_bar", "") + parsed_empty = JSON.parse(entry_empty, symbolize_names: true) + assert_equal "FooTest#test_bar", parsed_empty[:test_id] + assert_equal "", parsed_empty[:file_path] end def test_format_with_file_path @@ -54,7 +61,8 @@ def test_round_trip_preserves_test_id end def test_test_id_without_file_path - assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id("FooTest#test_bar") + entry = CI::Queue::QueueEntry.format("FooTest#test_bar", nil) + assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id(entry) end def test_test_id_with_file_path diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 42908235..c8d927f3 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -56,7 +56,7 @@ def test_retry_queue_with_all_tests_passing_2 retry_queue = @queue.retry_queue populate(retry_queue) retry_test_order = poll(retry_queue) do |test| - @queue.build.record_error(test.id, 'Failed') + @queue.build.record_error(test.queue_entry, 'Failed') end assert_equal retry_test_order, retry_test_order end @@ -188,7 +188,7 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker monitor.synchronize do condition.wait_until { acquired } second_queue.poll do |test| - assert_equal true, second_queue.acknowledge(test.id) + assert_equal true, second_queue.acknowledge(test.queue_entry) end done = true condition.signal @@ -201,7 +201,7 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker monitor.synchronize do condition.signal condition.wait_until { done } - assert_equal false, @queue.acknowledge(test.id) + assert_equal false, @queue.acknowledge(test.queue_entry) end end @@ -221,7 +221,7 @@ def test_timeout_warning queue = worker(i, tests: [TEST_LIST.first], build_id: '24') queue.poll do |test| sleep 1 # timeout - queue.acknowledge(test.id) + queue.acknowledge(test.queue_entry) end end end @@ -283,10 +283,9 @@ def test_streaming_waits_for_batches def test_reserve_lost_ignores_processed_entry_with_path queue = worker(1, populate: false) entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb') - test_id = 'ATest#test_foo' @redis.zadd(queue.send(:key, 'running'), 0, entry) - @redis.sadd(queue.send(:key, 'completed'), test_id) + @redis.sadd(queue.send(:key, 'processed'), entry) @redis.hset(queue.send(:key, 'owners'), entry, queue.send(:key, 'worker', queue.config.worker_id, 'queue')) lost = queue.send(:try_to_reserve_lost_test) @@ -325,12 +324,11 @@ def test_reserve_defers_own_requeued_test_once assert_equal entry, second_try end - def test_heartbeat_uses_test_id_for_processed_check + def test_heartbeat_uses_entry_for_processed_check queue = worker(1, populate: false) entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb') - test_id = 'ATest#test_foo' - @redis.sadd(queue.send(:key, 'processed'), test_id) + @redis.sadd(queue.send(:key, 'processed'), entry) result = queue.send( :eval_script, @@ -367,7 +365,7 @@ def test_continuously_timing_out_tests queue = worker(i, tests: [TEST_LIST.first], build_id: '24') queue.poll do |test| sleep 1 # timeout - queue.acknowledge(test.id) + queue.acknowledge(test.queue_entry) end end end @@ -461,6 +459,7 @@ def test_worker_does_not_pick_up_its_own_requeued_test_when_others_are_available w3.send(:register) id_for = ->(test) { test.respond_to?(:id) ? test.id : CI::Queue::QueueEntry.test_id(test) } + entry_for = ->(test) { test.respond_to?(:queue_entry) ? test.queue_entry : test } requeued_test_id = nil picked_up_requeue = {} @@ -481,7 +480,7 @@ def test_worker_does_not_pick_up_its_own_requeued_test_when_others_are_available cond.broadcast cond.wait_until { release_other_workers } end - w2.acknowledge(test_id) + w2.acknowledge(entry_for.call(test)) end end, Thread.new do @@ -493,7 +492,7 @@ def test_worker_does_not_pick_up_its_own_requeued_test_when_others_are_available cond.broadcast cond.wait_until { release_other_workers } end - w3.acknowledge(test_id) + w3.acknowledge(entry_for.call(test)) end end, ] @@ -512,14 +511,14 @@ def test_worker_does_not_pick_up_its_own_requeued_test_when_others_are_available first_test = false requeued_test_id = test_id w1.report_failure! - assert_equal true, w1.requeue(test) + assert_equal true, w1.requeue(entry_for.call(test)) mon.synchronize do release_other_workers = true cond.broadcast end else worker_one_picked_its_own_requeue = true if test_id == requeued_test_id - w1.acknowledge(test_id) + w1.acknowledge(entry_for.call(test)) end end diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index c3821e47..92566dc5 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -737,8 +737,12 @@ def test_retry_report assert_equal 100, error_reports.size error_reports.keys.each_with_index do |test_id, index| + entry = CI::Queue::QueueEntry.format(test_id, nil) queue.instance_variable_set(:@reserved_tests, Concurrent::Set.new([test_id])) - queue.build.record_success(test_id.dup) + reserved_entries = queue.instance_variable_get(:@reserved_entries) || Concurrent::Map.new + reserved_entries[test_id] = entry + queue.instance_variable_set(:@reserved_entries, reserved_entries) + queue.build.record_success(entry) queue.build.record_stats({ 'assertions' => index + 1, 'errors' => 0, diff --git a/ruby/test/minitest/queue/build_status_recorder_test.rb b/ruby/test/minitest/queue/build_status_recorder_test.rb index 3ea258eb..1c9788c6 100644 --- a/ruby/test/minitest/queue/build_status_recorder_test.rb +++ b/ruby/test/minitest/queue/build_status_recorder_test.rb @@ -152,27 +152,35 @@ def test_duplicate_success_does_not_increment_skips def test_build_record_methods_return_boolean # Redis build: first to ack returns true, duplicate returns false reserve(@queue, "a") - assert_equal true, @queue.build.record_success("Minitest::Test#a") - assert_equal true, @queue.build.record_requeue("Minitest::Test#b") + entry_a = CI::Queue::QueueEntry.format("Minitest::Test#a", nil) + assert_equal true, @queue.build.record_success(entry_a) + entry_b = CI::Queue::QueueEntry.format("Minitest::Test#b", nil) + assert_equal true, @queue.build.record_requeue(entry_b) second_queue = worker(2) reserve(second_queue, "a") - assert_equal false, second_queue.build.record_success("Minitest::Test#a") + assert_equal false, second_queue.build.record_success(entry_a) end def test_static_build_record_returns_true static_queue = CI::Queue::Static.new(['test_example'], CI::Queue::Configuration.new(build_id: '42', worker_id: '1')) build = static_queue.build - assert_equal true, build.record_success("test_example") - assert_equal true, build.record_requeue("test_example") - assert_equal true, build.record_error("test_example", "payload") + entry = CI::Queue::QueueEntry.format("test_example", nil) + assert_equal true, build.record_success(entry) + assert_equal true, build.record_requeue(entry) + assert_equal true, build.record_error(entry, "payload") end private def reserve(queue, method_name) - queue.instance_variable_set(:@reserved_tests, Concurrent::Set.new([Minitest::Queue::SingleExample.new("Minitest::Test", method_name).id])) + test_id = Minitest::Queue::SingleExample.new("Minitest::Test", method_name).id + entry = CI::Queue::QueueEntry.format(test_id, nil) + queue.instance_variable_set(:@reserved_tests, Concurrent::Set.new([test_id])) + reserved_entries = queue.instance_variable_get(:@reserved_entries) || Concurrent::Map.new + reserved_entries[test_id] = entry + queue.instance_variable_set(:@reserved_entries, reserved_entries) end def worker(id) diff --git a/ruby/test/support/queue_helpers.rb b/ruby/test/support/queue_helpers.rb index 0e2fd3bc..b7f94fbc 100644 --- a/ruby/test/support/queue_helpers.rb +++ b/ruby/test/support/queue_helpers.rb @@ -9,15 +9,15 @@ def poll(queue, success = true) test_order << test failed = !(success.respond_to?(:call) ? success.call(test) : success) if failed - if queue.requeue(test) + if queue.requeue(test.queue_entry) # Requeued — don't report to circuit breaker else queue.report_failure! - queue.acknowledge(test.id) + queue.acknowledge(test.queue_entry) end else queue.report_success! - queue.acknowledge(test.id) + queue.acknowledge(test.queue_entry) end end test_order diff --git a/ruby/test/support/reporter_test_helper.rb b/ruby/test/support/reporter_test_helper.rb index 0271847b..e7840e1c 100644 --- a/ruby/test/support/reporter_test_helper.rb +++ b/ruby/test/support/reporter_test_helper.rb @@ -5,6 +5,7 @@ module ReporterTestHelper def result(name, **kwargs) result = Minitest::Result.from(runnable(name, **kwargs)) result.source_location = ["#{Minitest::Queue.project_root}/test/my_test.rb", 12] + result.queue_entry = CI::Queue::QueueEntry.format("#{result.klass}##{result.name}", nil) if result.respond_to?(:queue_entry=) result end diff --git a/ruby/test/support/shared_queue_assertions.rb b/ruby/test/support/shared_queue_assertions.rb index 50ce53ec..df53b033 100644 --- a/ruby/test/support/shared_queue_assertions.rb +++ b/ruby/test/support/shared_queue_assertions.rb @@ -69,7 +69,7 @@ def test_requeue def test_acknowledge @queue.poll do |test| - assert_equal true, @queue.acknowledge(test.id) + assert_equal true, @queue.acknowledge(test.queue_entry) end end diff --git a/ruby/test/support/shared_test_cases.rb b/ruby/test/support/shared_test_cases.rb index d6110ff6..9bdfc9e3 100644 --- a/ruby/test/support/shared_test_cases.rb +++ b/ruby/test/support/shared_test_cases.rb @@ -15,6 +15,10 @@ def id name end + def queue_entry + CI::Queue::QueueEntry.format(id, nil) + end + def to_s inspect end