Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/ciqueue/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def acknowledge(self, test):
self.key('error-reports'),
self.key('requeued-by'),
],
args=[test, test, '', 0],
args=[test, '', 0],
) == 1

def requeue(self, test, offset=42):
Expand All @@ -116,7 +116,7 @@ def requeue(self, test, offset=42):
self.key('error-reports'),
self.key('requeued-by'),
],
args=[self.max_requeues, self.global_max_requeues, test, test, offset, 0],
args=[self.max_requeues, self.global_max_requeues, test, offset, 0],
) == 1

def retry_queue(self):
Expand Down
7 changes: 0 additions & 7 deletions redis/_entry_helpers.lua

This file was deleted.

9 changes: 4 additions & 5 deletions redis/acknowledge.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ local error_reports_key = KEYS[4]
local requeued_by_key = KEYS[5]

local entry = ARGV[1]
local test_id = ARGV[2]
local error = ARGV[3]
local ttl = ARGV[4]
local error = ARGV[2]
local ttl = ARGV[3]
redis.call('zrem', zset_key, entry)
redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers
redis.call('hdel', requeued_by_key, entry)
local acknowledged = redis.call('sadd', processed_key, test_id) == 1
local acknowledged = redis.call('sadd', processed_key, entry) == 1

if acknowledged and error ~= "" then
redis.call('hset', error_reports_key, test_id, error)
redis.call('hset', error_reports_key, entry, error)
redis.call('expire', error_reports_key, ttl)
end

Expand Down
6 changes: 1 addition & 5 deletions redis/heartbeat.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- @include _entry_helpers

local zset_key = KEYS[1]
local processed_key = KEYS[2]
local owners_key = KEYS[3]
Expand All @@ -8,10 +6,8 @@ local worker_queue_key = KEYS[4]
local current_time = ARGV[1]
local entry = ARGV[2]

local test_id = test_id_from_entry(entry)

-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, test_id) == 1 then
if redis.call('sismember', processed_key, entry) == 1 then
return false
end

Expand Down
13 changes: 6 additions & 7 deletions redis/requeue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ local requeued_by_key = KEYS[8]
local max_requeues = tonumber(ARGV[1])
local global_max_requeues = tonumber(ARGV[2])
local entry = ARGV[3]
local test_id = ARGV[4]
local offset = ARGV[5]
local ttl = tonumber(ARGV[6])
local offset = ARGV[4]
local ttl = tonumber(ARGV[5])

if redis.call('hget', owners_key, entry) == worker_queue_key then
redis.call('hdel', owners_key, entry)
end

if redis.call('sismember', processed_key, test_id) == 1 then
if redis.call('sismember', processed_key, entry) == 1 then
return false
end

Expand All @@ -27,15 +26,15 @@ if global_requeues and global_requeues >= tonumber(global_max_requeues) then
return false
end

local requeues = tonumber(redis.call('hget', requeues_count_key, test_id))
local requeues = tonumber(redis.call('hget', requeues_count_key, entry))
if requeues and requeues >= max_requeues then
return false
end

redis.call('hincrby', requeues_count_key, '___total___', 1)
redis.call('hincrby', requeues_count_key, test_id, 1)
redis.call('hincrby', requeues_count_key, entry, 1)

redis.call('hdel', error_reports_key, test_id)
redis.call('hdel', error_reports_key, entry)

local pivot = redis.call('lrange', queue_key, -1 - offset, 0 - offset)[1]
if pivot then
Expand Down
5 changes: 1 addition & 4 deletions redis/reserve_lost.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- @include _entry_helpers

local zset_key = KEYS[1]
local processed_key = KEYS[2]
local worker_queue_key = KEYS[3]
Expand All @@ -10,8 +8,7 @@ local timeout = ARGV[2]

local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
for _, test in ipairs(lost_tests) do
local test_id = test_id_from_entry(test)
if redis.call('sismember', processed_key, test_id) == 0 then
if redis.call('sismember', processed_key, test) == 0 then
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
Expand Down
10 changes: 5 additions & 5 deletions ruby/lib/ci/queue/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ def queue_exhausted?
@queue.exhausted?
end

def record_error(id, payload, stat_delta: nil)
error_reports[id] = payload
def record_error(entry, payload, stat_delta: nil)
error_reports[entry] = payload
true
end

def record_success(id, skip_flaky_record: false, acknowledge: true)
error_reports.delete(id)
def record_success(entry, skip_flaky_record: false, acknowledge: true)
error_reports.delete(entry)
true
end

def record_requeue(id)
def record_requeue(entry)
true
end

Expand Down
2 changes: 0 additions & 2 deletions ruby/lib/ci/queue/queue_entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ def self.parse(entry)
end

def self.format(test_id, file_path)
return test_id if file_path.nil? || file_path == ""

JSON.dump({ test_id: test_id, file_path: file_path })
end

Expand Down
34 changes: 17 additions & 17 deletions ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ def reset_worker_error
end

def failed_tests
redis.hkeys(key('error-reports'))
redis.hkeys(key('error-reports')).map { |entry| CI::Queue::QueueEntry.test_id(entry) }
end

TOTAL_KEY = "___total___"
def requeued_tests
requeues = redis.hgetall(key('requeues-count'))
requeues.delete(TOTAL_KEY)
requeues
requeues.transform_keys { |entry| CI::Queue::QueueEntry.test_id(entry) }
end

def pop_warnings
Expand All @@ -56,39 +56,39 @@ def record_warning(type, attributes)
redis.rpush(key('warnings'), Marshal.dump([type, attributes]))
end

def record_error(id, payload, stat_delta: nil)
def record_error(entry, payload, stat_delta: nil)
# Run acknowledge first so we know whether we're the first to ack
acknowledged = @queue.acknowledge(id, error: payload)
acknowledged = @queue.acknowledge(entry, error: payload)

if acknowledged
# We were the first to ack; another worker already ack'd would get falsy from SADD
@queue.increment_test_failed
# Only the acknowledging worker's stats include this failure (others skip increment when ack=false).
# Store so we can subtract it if another worker records success later.
store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any?
store_error_report_delta(entry, stat_delta) if stat_delta && stat_delta.any?
end
# Return so caller can roll back local counter when not acknowledged
!!acknowledged
end

def record_success(id, skip_flaky_record: false)
def record_success(entry, skip_flaky_record: false)
acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction|
@queue.acknowledge(id, pipeline: transaction)
transaction.hdel(key('error-reports'), id)
transaction.hget(key('requeues-count'), id)
transaction.hget(key('error-report-deltas'), id)
@queue.acknowledge(entry, pipeline: transaction)
transaction.hdel(key('error-reports'), entry)
transaction.hget(key('requeues-count'), entry)
transaction.hget(key('error-report-deltas'), entry)
end
# When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution
if error_reports_deleted_count.to_i > 0 && delta_json
apply_error_report_delta_correction(delta_json)
redis.hdel(key('error-report-deltas'), id)
redis.hdel(key('error-report-deltas'), entry)
end
record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
record_flaky(entry) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
# Count this run when we ack'd or when we replaced a failure (so stats delta is applied)
!!(acknowledged || error_reports_deleted_count.to_i > 0)
end

def record_requeue(id)
def record_requeue(entry)
true
end

Expand Down Expand Up @@ -142,11 +142,11 @@ def max_test_failed?
end

def error_reports
redis.hgetall(key('error-reports'))
redis.hgetall(key('error-reports')).transform_keys { |entry| CI::Queue::QueueEntry.test_id(entry) }
end

def flaky_reports
redis.smembers(key('flaky-reports'))
redis.smembers(key('flaky-reports')).map { |entry| CI::Queue::QueueEntry.test_id(entry) }
end

def record_worker_profile(profile)
Expand Down Expand Up @@ -187,10 +187,10 @@ def key(*args)
['build', config.build_id, *args].join(':')
end

def store_error_report_delta(test_id, stat_delta)
def store_error_report_delta(entry, stat_delta)
# Only the acknowledging worker's stats include this test; store their delta for correction on success
payload = { 'worker_id' => config.worker_id.to_s }.merge(stat_delta)
redis.hset(key('error-report-deltas'), test_id, JSON.generate(payload))
redis.hset(key('error-report-deltas'), entry, JSON.generate(payload))
redis.expire(key('error-report-deltas'), config.redis_ttl)
end

Expand Down
44 changes: 17 additions & 27 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def retrying?
def retry_queue
failures = build.failed_tests.to_set
log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1)
log = log.map { |entry| queue_entry_test_id(entry) }
log.select! { |id| failures.include?(id) }
log = log.map { |entry| CI::Queue::QueueEntry.test_id(entry) }
log.select! { |test_id| failures.include?(test_id) }
log.uniq!
log.reverse!
Retry.new(log, config, redis: redis)
Expand Down Expand Up @@ -176,23 +176,23 @@ def report_worker_error(error)
build.report_worker_error(error)
end

def acknowledge(test_key, error: nil, pipeline: redis)
test_id = normalize_test_id(test_key)
def acknowledge(entry, error: nil, pipeline: redis)
test_id = CI::Queue::QueueEntry.test_id(entry)
assert_reserved!(test_id)
entry = reserved_entries.fetch(test_id, queue_entry_for(test_key))
entry = reserved_entries.fetch(test_id, entry)
unreserve_entry(test_id)
eval_script(
:acknowledge,
keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by')],
argv: [entry, test_id, error.to_s, config.redis_ttl],
argv: [entry, error.to_s, config.redis_ttl],
pipeline: pipeline,
) == 1
end

def requeue(test, offset: Redis.requeue_offset)
test_id = normalize_test_id(test)
def requeue(entry, offset: Redis.requeue_offset)
test_id = CI::Queue::QueueEntry.test_id(entry)
assert_reserved!(test_id)
entry = reserved_entries.fetch(test_id, queue_entry_for(test))
entry = reserved_entries.fetch(test_id, entry)
unreserve_entry(test_id)
global_max_requeues = config.global_max_requeues(total)

Expand All @@ -208,7 +208,7 @@ def requeue(test, offset: Redis.requeue_offset)
key('error-reports'),
key('requeued-by'),
],
argv: [config.max_requeues, global_max_requeues, entry, test_id, offset, config.redis_ttl],
argv: [config.max_requeues, global_max_requeues, entry, offset, config.redis_ttl],
) == 1

unless requeued
Expand Down Expand Up @@ -255,7 +255,7 @@ def assert_reserved!(test_id)
end

def reserve_entry(entry)
test_id = queue_entry_test_id(entry)
test_id = CI::Queue::QueueEntry.test_id(entry)
reserved_tests << test_id
reserved_entries[test_id] = entry
reserved_entry_ids[entry] = test_id
Expand All @@ -267,19 +267,6 @@ def unreserve_entry(test_id)
reserved_entry_ids.delete(entry) if entry
end

def normalize_test_id(test_key)
key = test_key.respond_to?(:id) ? test_key.id : test_key
if key.is_a?(String)
cached = reserved_entry_ids[key]
return cached if cached
end
queue_entry_test_id(key)
end

def queue_entry_test_id(entry)
CI::Queue::QueueEntry.test_id(entry)
end

def queue_entry_for(test)
return test.queue_entry if test.respond_to?(:queue_entry)
return test.id if test.respond_to?(:id)
Expand All @@ -288,7 +275,7 @@ def queue_entry_for(test)
end

def resolve_entry(entry)
test_id = reserved_entry_ids[entry] || queue_entry_test_id(entry)
test_id = reserved_entry_ids[entry] || CI::Queue::QueueEntry.test_id(entry)
if populated?
return index[test_id] if index.key?(test_id)
end
Expand Down Expand Up @@ -387,15 +374,18 @@ def try_to_reserve_lost_test
:reserve_lost,
keys: [
key('running'),
key('completed'),
key('processed'),
key('worker', worker_id, 'queue'),
key('owners'),
],
argv: [CI::Queue.time_now.to_f, timeout],
)

if lost_test
build.record_warning(Warnings::RESERVED_LOST_TEST, test: lost_test, timeout: config.timeout)
build.record_warning(Warnings::RESERVED_LOST_TEST, test: CI::Queue::QueueEntry.test_id(lost_test), timeout: config.timeout)
if CI::Queue.debug?
$stderr.puts "[ci-queue][reserve_lost] worker=#{worker_id} test_id=#{CI::Queue::QueueEntry.test_id(lost_test)}"
end
end

lost_test
Expand Down
10 changes: 5 additions & 5 deletions ruby/lib/ci/queue/static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ def max_test_failed?
test_failed >= config.max_test_failed
end

def requeue(test)
test_key = test.id
return false unless should_requeue?(test_key)
def requeue(entry)
test_id = CI::Queue::QueueEntry.test_id(entry)
return false unless should_requeue?(test_id)

requeues[test_key] += 1
@queue.unshift(test_key)
requeues[test_id] += 1
@queue.unshift(test_id)
true
end

Expand Down
7 changes: 5 additions & 2 deletions ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ def handle_test_result(reporter, example, result)
failed = false
end

if failed && CI::Queue.requeueable?(result) && queue.requeue(example)
if failed && CI::Queue.requeueable?(result) && queue.requeue(example.queue_entry)
result.requeue!
if CI::Queue.debug?
$stderr.puts "[ci-queue][requeue] test_id=#{example.id} error_class=#{result.failures.first&.class} error=#{result.failures.first&.message&.lines&.first&.chomp}"
end
elsif failed
queue.report_failure!
else
Expand Down Expand Up @@ -327,7 +330,7 @@ def id
end

def queue_entry
id
@queue_entry ||= CI::Queue::QueueEntry.format(id, nil)
end

def <=>(other)
Expand Down
Loading
Loading