Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions redis/_entry_helpers.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
local function test_id_from_entry(value, delimiter)
if delimiter then
local pos = string.find(value, delimiter, 1, true)
if pos then
return string.sub(value, 1, pos - 1)
end
local function test_id_from_entry(value)
if string.sub(value, 1, 1) == '{' then
local decoded = cjson.decode(value)
return decoded['test_id']
end
return value
end
3 changes: 1 addition & 2 deletions redis/heartbeat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ local worker_queue_key = KEYS[4]

local current_time = ARGV[1]
local entry = ARGV[2]
local entry_delimiter = ARGV[3]

local test_id = test_id_from_entry(entry, entry_delimiter)
local test_id = test_id_from_entry(entry)

-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, test_id) == 1 then
Expand Down
3 changes: 1 addition & 2 deletions redis/reserve_lost.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ local owners_key = KEYS[4]

local current_time = ARGV[1]
local timeout = ARGV[2]
local entry_delimiter = ARGV[3]

local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
for _, test in ipairs(lost_tests) do
local test_id = test_id_from_entry(test, entry_delimiter)
local test_id = test_id_from_entry(test)
if redis.call('sismember', processed_key, test_id) == 0 then
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
Expand Down
12 changes: 3 additions & 9 deletions ruby/lib/ci/queue/queue_entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,20 @@
module CI
module Queue
module QueueEntry
DELIMITER = "\t"
LOAD_ERROR_PREFIX = '__ciq_load_error__:'.freeze

def self.test_id(entry)
pos = entry.index(DELIMITER)
pos ? entry[0, pos] : entry
JSON.parse(entry, symbolize_names: true)[:test_id]
end

def self.parse(entry)
return { test_id: entry, file_path: nil } unless entry.include?(DELIMITER)

test_id, file_path = entry.split(DELIMITER, 2)
file_path = nil if file_path == ""
{ test_id: test_id, file_path: file_path }
JSON.parse(entry, symbolize_names: true)
end

def self.format(test_id, file_path)
return test_id if file_path.nil? || file_path == ""

"#{test_id}#{DELIMITER}#{file_path}"
JSON.dump({ test_id: test_id, file_path: file_path })
end

def self.load_error_payload?(file_path)
Expand Down
5 changes: 1 addition & 4 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,12 @@ def resolve_lua_includes(script, root)
end

class HeartbeatProcess
def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:)
def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key)
@redis_url = redis_url
@zset_key = zset_key
@processed_key = processed_key
@owners_key = owners_key
@worker_queue_key = worker_queue_key
@entry_delimiter = entry_delimiter
end

def boot!
Expand All @@ -285,7 +284,6 @@ def boot!
@processed_key,
@owners_key,
@worker_queue_key,
@entry_delimiter,
in: child_read,
out: child_write,
)
Expand Down Expand Up @@ -360,7 +358,6 @@ def heartbeat_process
key('processed'),
key('owners'),
key('worker', worker_id, 'queue'),
entry_delimiter: CI::Queue::QueueEntry::DELIMITER,
)
end

Expand Down
8 changes: 3 additions & 5 deletions ruby/lib/ci/queue/redis/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ class Monitor
DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../../redis', __FILE__)
RELEASE_SCRIPTS_ROOT = ::File.expand_path('../../redis', __FILE__)

def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter)
def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key)
@zset_key = zset_key
@processed_key = processed_key
@owners_key = owners_key
@worker_queue_key = worker_queue_key
@entry_delimiter = entry_delimiter
@logger = logger
@redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5])
@shutdown = false
Expand All @@ -41,7 +40,7 @@ def process_tick!(id:)
eval_script(
:heartbeat,
keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key],
argv: [Time.now.to_f, id, @entry_delimiter]
argv: [Time.now.to_f, id]
)
rescue => error
@logger.info(error)
Expand Down Expand Up @@ -155,10 +154,9 @@ def monitor
processed_key = ARGV[2]
owners_key = ARGV[3]
worker_queue_key = ARGV[4]
entry_delimiter = ARGV[5]

logger.debug("Starting monitor: #{redis_url} #{zset_key} #{processed_key}")
manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter)
manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key)

# Notify the parent we're ready
$stdout.puts(".")
Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def try_to_reserve_lost_test
key('worker', worker_id, 'queue'),
key('owners'),
],
argv: [CI::Queue.time_now.to_f, timeout, CI::Queue::QueueEntry::DELIMITER],
argv: [CI::Queue.time_now.to_f, timeout],
)

if lost_test
Expand Down
31 changes: 27 additions & 4 deletions ruby/test/ci/queue/queue_entry_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
require 'test_helper'

class CI::Queue::QueueEntryTest < Minitest::Test
DELIMITER = CI::Queue::QueueEntry::DELIMITER

def test_parse_without_file_path
entry = "FooTest#test_bar"
parsed = CI::Queue::QueueEntry.parse(entry)
Expand All @@ -12,7 +10,7 @@ def test_parse_without_file_path
end

def test_parse_with_file_path
entry = "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb"
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
parsed = CI::Queue::QueueEntry.parse(entry)
assert_equal "FooTest#test_bar", parsed[:test_id]
assert_equal "/tmp/foo_test.rb", parsed[:file_path]
Expand All @@ -25,7 +23,9 @@ def test_format_without_file_path

def test_format_with_file_path
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
assert_equal "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb", entry
parsed = JSON.parse(entry, symbolize_names: true)
assert_equal "FooTest#test_bar", parsed[:test_id]
assert_equal "/tmp/foo_test.rb", parsed[:file_path]
end

def test_parse_with_pipe_in_test_name
Expand All @@ -36,6 +36,14 @@ def test_parse_with_pipe_in_test_name
assert_equal "/tmp/foo_test.rb", parsed[:file_path]
end

def test_parse_with_tab_in_test_name
test_id = "FooTest#test_xss_<IMG SRC=\"jav\tascript:alert('XSS');\">"
entry = CI::Queue::QueueEntry.format(test_id, "/tmp/foo_test.rb")
parsed = CI::Queue::QueueEntry.parse(entry)
assert_equal test_id, parsed[:test_id]
assert_equal "/tmp/foo_test.rb", parsed[:file_path]
end

def test_round_trip_preserves_test_id
test_id = "FooTest#test_bar"
file_path = "/tmp/foo_test.rb"
Expand All @@ -45,6 +53,21 @@ def test_round_trip_preserves_test_id
assert_equal file_path, parsed[:file_path]
end

def test_test_id_without_file_path
assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id("FooTest#test_bar")
end

def test_test_id_with_file_path
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id(entry)
end

def test_test_id_with_tab_in_test_name
test_id = "FooTest#test_xss_<IMG SRC=\"jav\tascript:alert('XSS');\">"
entry = CI::Queue::QueueEntry.format(test_id, "/tmp/foo_test.rb")
assert_equal test_id, CI::Queue::QueueEntry.test_id(entry)
end

def test_encode_decode_load_error
error = StandardError.new("boom")
error.set_backtrace(["/tmp/test.rb:10"])
Expand Down
18 changes: 9 additions & 9 deletions ruby/test/ci/queue/redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
class CI::Queue::RedisTest < Minitest::Test
include SharedQueueAssertions

DELIMITER = CI::Queue::QueueEntry::DELIMITER
EntryTest = Struct.new(:id, :queue_entry)

def setup
Expand Down Expand Up @@ -243,8 +242,8 @@ def test_streaming_waits_for_batches
consumer.entry_resolver = ->(entry) { entry }

tests = [
EntryTest.new('ATest#test_foo', "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"),
EntryTest.new('ATest#test_bar', "ATest#test_bar#{DELIMITER}/tmp/a_test.rb"),
EntryTest.new('ATest#test_foo', CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')),
EntryTest.new('ATest#test_bar', CI::Queue::QueueEntry.format('ATest#test_bar', '/tmp/a_test.rb')),
]

streamed = Enumerator.new do |yielder|
Expand Down Expand Up @@ -283,7 +282,7 @@ def test_streaming_waits_for_batches

def test_reserve_lost_ignores_processed_entry_with_path
queue = worker(1, populate: false)
entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')
test_id = 'ATest#test_foo'

@redis.zadd(queue.send(:key, 'running'), 0, entry)
Expand All @@ -307,7 +306,7 @@ def test_streaming_timeout_raises_lost_master
def test_reserve_defers_own_requeued_test_once
queue = worker(1, populate: false, build_id: 'self-requeue-script')
queue.send(:register)
entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')
queue_key = queue.send(:key, 'queue')
requeued_by_key = queue.send(:key, 'requeued-by')
worker_queue_key = queue.send(:key, 'worker', queue.config.worker_id, 'queue')
Expand All @@ -328,7 +327,7 @@ def test_reserve_defers_own_requeued_test_once

def test_heartbeat_uses_test_id_for_processed_check
queue = worker(1, populate: false)
entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')
test_id = 'ATest#test_foo'

@redis.sadd(queue.send(:key, 'processed'), test_id)
Expand All @@ -342,7 +341,7 @@ def test_heartbeat_uses_test_id_for_processed_check
queue.send(:key, 'owners'),
queue.send(:key, 'worker', queue.config.worker_id, 'queue'),
],
argv: [CI::Queue.time_now.to_f, entry, DELIMITER],
argv: [CI::Queue.time_now.to_f, entry],
)

assert_nil result
Expand All @@ -353,9 +352,10 @@ def test_resolve_entry_falls_back_to_resolver
queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok })
queue.entry_resolver = ->(entry) { "resolved:#{entry}" }

resolved = queue.send(:resolve_entry, "MissingTest#test_bar#{DELIMITER}/tmp/missing.rb")
missing_entry = CI::Queue::QueueEntry.format('MissingTest#test_bar', '/tmp/missing.rb')
resolved = queue.send(:resolve_entry, missing_entry)

assert_equal "resolved:MissingTest#test_bar#{DELIMITER}/tmp/missing.rb", resolved
assert_equal "resolved:#{missing_entry}", resolved
end

def test_continuously_timing_out_tests
Expand Down
Loading