Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions redis/acknowledge.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
local zset_key = KEYS[1]
local processed_key = KEYS[2]
local owners_key = KEYS[3]
local error_reports_key = KEYS[4]
local requeues_count_key = KEYS[5]
local flaky_reports_key = KEYS[6]

local test = ARGV[1]

local error_report = ARGV[2]
local ttl = ARGV[3]
local skip_flaky_record = ARGV[4]
redis.call('zrem', zset_key, test)
redis.call('hdel', owners_key, test) -- Doesn't matter if it was reclaimed by another workers
return redis.call('sadd', processed_key, test)

local acknowledged = redis.call('sadd', processed_key, test)

if error_report ~= "" and acknowledged then -- we only record the error if the test was acknowledged by us
redis.call('hset', error_reports_key, test, error_report)
redis.call('expire', error_reports_key, ttl)
else -- we record the error even if we didn't acknowledge the test
local deleted_count = tonumber(redis.call('hdel', error_reports_key, test))
local requeued_count = tonumber(redis.call('hget', requeues_count_key, test))

if skip_flaky_record == "false" and ((deleted_count and deleted_count > 0) or (requeued_count and requeued_count > 0)) then
redis.call('sadd', flaky_reports_key, test)
redis.call('expire', flaky_reports_key, ttl)
end
end

return acknowledged
43 changes: 7 additions & 36 deletions ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,15 @@ def record_warning(type, attributes)
redis.rpush(key('warnings'), Marshal.dump([type, attributes]))
end

def record_error(id, payload, stats: nil)
redis.pipelined do |pipeline|
pipeline.hset(
key('error-reports'),
id.dup.force_encoding(Encoding::BINARY),
payload.dup.force_encoding(Encoding::BINARY),
)
pipeline.expire(key('error-reports'), config.redis_ttl)
record_stats(stats, pipeline: pipeline)
end
nil
end

def record_success(id, stats: nil, skip_flaky_record: false)
error_reports_deleted_count, requeued_count, _ = redis.pipelined do |pipeline|
pipeline.hdel(key('error-reports'), id.dup.force_encoding(Encoding::BINARY))
pipeline.hget(key('requeues-count'), id.b)
record_stats(stats, pipeline: pipeline)
end
record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
nil
end
def record_stats(stats)
return unless stats

def record_flaky(id, stats: nil)
redis.pipelined do |pipeline|
pipeline.sadd?(
key('flaky-reports'),
id.b
)
pipeline.expire(key('flaky-reports'), config.redis_ttl)
stats.each do |stat_name, stat_value|
pipeline.hset(key(stat_name), config.worker_id, stat_value)
pipeline.expire(key(stat_name), config.redis_ttl)
end
end
nil
end

def max_test_failed?
Expand Down Expand Up @@ -126,13 +103,7 @@ def reset_stats(stat_names)

attr_reader :config, :redis

def record_stats(stats, pipeline: redis)
return unless stats
stats.each do |stat_name, stat_value|
pipeline.hset(key(stat_name), config.worker_id, stat_value)
pipeline.expire(key(stat_name), config.redis_ttl)
end
end


def key(*args)
['build', config.build_id, *args].join(':')
Expand Down
166 changes: 162 additions & 4 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,157 @@ class << self
self.requeue_offset = 42

class Worker < Base
class ErrorReport
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Find new location for ErrorReport and FailureFormatter. This may break public API?

class << self
attr_accessor :coder

def load(payload)
new(coder.load(payload))
end
end

self.coder = Marshal

begin
require 'snappy'
require 'msgpack'
require 'stringio'

module SnappyPack
extend self

MSGPACK = MessagePack::Factory.new
MSGPACK.register_type(0x00, Symbol)

def load(payload)
io = StringIO.new(Snappy.inflate(payload))
MSGPACK.unpacker(io).unpack
end

def dump(object)
io = StringIO.new
packer = MSGPACK.packer(io)
packer.pack(object)
packer.flush
io.rewind
Snappy.deflate(io.string).force_encoding(Encoding::UTF_8)
end
end

self.coder = SnappyPack
rescue LoadError
end

def initialize(data)
@data = data
end

def dump
self.class.coder.dump(@data)
end

def test_name
@data[:test_name]
end

def test_and_module_name
@data[:test_and_module_name]
end

def test_suite
@data[:test_suite]
end

def test_file
@data[:test_file]
end

def test_line
@data[:test_line]
end

def to_h
@data
end

def to_s
output
end

def output
@data[:output]
end
end

class FailureFormatter < SimpleDelegator
include ::CI::Queue::OutputHelpers

def initialize(test)
@test = test
super
end

def to_s
[
header,
body,
"\n"
].flatten.compact.join("\n")
end

def to_h
test_file, test_line = test.source_location
{
test_file: test_file,
test_line: test_line,
test_and_module_name: "#{test.klass}##{test.name}",
test_name: test.name,
test_suite: test.klass,
error_class: test.failure.error.class.name,
output: to_s,
}
end

private

attr_reader :test

def header
"#{red(status)} #{test.klass}##{test.name}"
end

def status
if test.error?
'ERROR'
elsif test.failure
'FAIL'
else
raise ArgumentError, "Couldn't infer test status"
end
end

def body
error = test.failure
message = if error.is_a?(Minitest::UnexpectedError)
"#{error.exception.class}: #{error.exception.message}"
else
error.exception.message
end

backtrace = Minitest.filter_backtrace(error.backtrace).map { |line| ' ' + relativize(line) }
[yellow(message), *backtrace].join("\n")
end

def relativize(trace_line)
trace_line.sub(/\A#{Regexp.escape("#{Dir.pwd}/")}/, '')
end
end

class << self
attr_accessor :failure_formatter
end
self.failure_formatter = FailureFormatter

attr_reader :total

def initialize(redis, config)
Expand Down Expand Up @@ -97,13 +248,20 @@ def report_worker_error(error)
build.report_worker_error(error)
end

def acknowledge(test)
test_key = test.id
def acknowledge(example, result)
error_report = if (result.failure || result.error?) && !result.skipped?
ErrorReport.new(self.class.failure_formatter.new(result).to_h).dump
end


test_key = example.id
raise_on_mismatching_test(test_key)

eval_script(
:acknowledge,
keys: [key('running'), key('processed'), key('owners')],
argv: [test_key],
keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeues-count'),
key('flaky-reports')],
argv: [test_key, error_report.to_s, config.redis_ttl, result.skipped?.to_s]
) == 1
end

Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def exhausted?
@queue.empty?
end

def acknowledge(test)
def acknowledge(test, result)
@progress += 1
true
end
Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def run_from_queue(reporter, *)
if failed && CI::Queue.requeueable?(result) && queue.requeue(example)
result.requeue!
reporter.record(result)
elsif queue.acknowledge(example)
elsif queue.acknowledge(example, result)
reporter.record(result)
queue.increment_test_failed if failed
elsif !failed
Expand Down
6 changes: 1 addition & 5 deletions ruby/lib/minitest/queue/build_status_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ def record(test)
end

stats = COUNTERS.zip(COUNTERS.map { |c| send(c) }).to_h
if (test.failure || test.error?) && !test.skipped?
build.record_error("#{test.klass}##{test.name}", dump(test), stats: stats)
else
build.record_success("#{test.klass}##{test.name}", stats: stats, skip_flaky_record: test.skipped?)
end
build.record_stats(stats)
end

private
Expand Down
1 change: 1 addition & 0 deletions ruby/test/integration/minitest_redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def test_retry_report
error_reports = queue.build.error_reports
assert_equal 100, error_reports.size

skip("#record_success does not exist anymore so we need to find another way to simulate this")
error_reports.keys.each_with_index do |test_id, index|
queue.build.record_success(test_id.dup, stats: {
'assertions' => index + 1,
Expand Down
Loading