Skip to content

Commit 15a8997

Browse files
author
Gustavo Caso
committed
WIP
1 parent 136117f commit 15a8997

File tree

4 files changed

+82
-19
lines changed

4 files changed

+82
-19
lines changed

lib/job-iteration/iteration.rb

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Iteration
1515
)
1616

1717
define_callbacks :start
18+
define_callbacks :reenqueue
1819
define_callbacks :shutdown
1920
define_callbacks :complete
2021
end
@@ -32,6 +33,10 @@ def on_shutdown(*filters, &blk)
3233
set_callback(:shutdown, :after, *filters, &blk)
3334
end
3435

36+
def on_reenqueue(*filters, &blk)
37+
set_callback(:reenqueue, :before, *filters, &blk)
38+
end
39+
3540
def on_complete(*filters, &blk)
3641
set_callback(:complete, :after, *filters, &blk)
3742
end
@@ -74,6 +79,18 @@ def retry_job(*)
7479
@retried = true
7580
end
7681

82+
def reenqueue_iteration_job(options = {})
83+
self.executions -= 1 if executions > 1
84+
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
85+
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")
86+
87+
adjust_total_time
88+
self.times_interrupted += 1
89+
90+
self.already_in_queue = true if respond_to?(:already_in_queue=)
91+
retry_job(options)
92+
end
93+
7794
private
7895

7996
def enumerator_builder
@@ -123,8 +140,9 @@ def iterate_with_enumerator(enumerator, arguments)
123140
end
124141

125142
next unless job_should_exit?
126-
self.executions -= 1 if executions > 1
127-
reenqueue_iteration_job
143+
run_callbacks(:reenqueue) do
144+
reenqueue_iteration_job
145+
end
128146
return false
129147
end
130148

@@ -137,17 +155,6 @@ def record_unit_of_work
137155
end
138156
end
139157

140-
def reenqueue_iteration_job
141-
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
142-
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")
143-
144-
adjust_total_time
145-
self.times_interrupted += 1
146-
147-
self.already_in_queue = true if respond_to?(:already_in_queue=)
148-
retry_job
149-
end
150-
151158
def adjust_total_time
152159
self.total_time += (Time.now.utc.to_f - start_time.to_f).round(6)
153160
end

lib/job-iteration/throttle_enumerator.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def to_enum
3030
@enum.each do |*val|
3131
if should_throttle?
3232
ActiveSupport::Notifications.instrument("throttled.iteration", job_class: @job.class.name)
33-
@job.retry_job(wait: @backoff)
33+
@job.reenqueue_iteration_job(wait: @backoff)
3434
throw(:abort, :skip_complete_callbacks)
3535
end
3636

test/unit/active_job_iteration_test.rb

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class SimpleIterationJob < ActiveJob::Base
1414
self.on_complete_called = 0
1515
cattr_accessor :on_shutdown_called, instance_accessor: false
1616
self.on_shutdown_called = 0
17+
cattr_accessor :on_reenqueue_called, instance_accessor: false
18+
self.on_reenqueue_called = 0
1719

1820
on_start do
1921
self.class.on_start_called += 1
@@ -26,6 +28,10 @@ class SimpleIterationJob < ActiveJob::Base
2628
on_shutdown do
2729
self.class.on_shutdown_called += 1
2830
end
31+
32+
on_reenqueue do
33+
self.class.on_reenqueue_called += 1
34+
end
2935
end
3036

3137
class MultiArgumentIterationJob < SimpleIterationJob
@@ -61,6 +67,12 @@ def each_iteration(record)
6167
end
6268
end
6369

70+
class ActiveRecordIterationJobHaltReenqueue < ActiveRecordIterationJob
71+
on_reenqueue do |job|
72+
throw(:abort) if job.times_interrupted > 0
73+
end
74+
end
75+
6476
class BatchActiveRecordIterationJob < SimpleIterationJob
6577
def build_enumerator(cursor:)
6678
enumerator_builder.active_record_on_batches(
@@ -297,6 +309,7 @@ def setup
297309
klass.on_start_called = 0
298310
klass.on_complete_called = 0
299311
klass.on_shutdown_called = 0
312+
klass.on_reenqueue_called = 0
300313
end
301314
JobShouldExitJob.records_performed = []
302315
super
@@ -329,6 +342,7 @@ def test_works_with_private_methods
329342
assert_equal(1, PrivateIterationJob.on_start_called)
330343
assert_equal(1, PrivateIterationJob.on_complete_called)
331344
assert_equal(1, PrivateIterationJob.on_shutdown_called)
345+
assert_equal(0, PrivateIterationJob.on_reenqueue_called)
332346
end
333347

334348
def test_failing_job
@@ -379,6 +393,7 @@ def test_active_record_job
379393

380394
assert_equal(0, ActiveRecordIterationJob.on_complete_called)
381395
work_one_job
396+
assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called)
382397

383398
assert_equal(2, ActiveRecordIterationJob.records_performed.size)
384399

@@ -389,6 +404,7 @@ def test_active_record_job
389404

390405
work_one_job
391406
assert_equal(4, ActiveRecordIterationJob.records_performed.size)
407+
assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called)
392408

393409
job = peek_into_queue
394410
assert_equal(2, job.times_interrupted)
@@ -401,6 +417,24 @@ def test_active_record_job
401417
assert_equal(2, ActiveRecordIterationJob.on_shutdown_called)
402418
end
403419

420+
def test_active_record_job_halt_reenqueue
421+
iterate_exact_times(3.times)
422+
423+
push(ActiveRecordIterationJobHaltReenqueue)
424+
assert_jobs_in_queue(1)
425+
426+
work_one_job
427+
assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called)
428+
assert_equal(3, ActiveRecordIterationJob.records_performed.size)
429+
assert_jobs_in_queue(1)
430+
431+
work_one_job
432+
assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called)
433+
assert_equal(6, ActiveRecordIterationJob.records_performed.size)
434+
# By throwing abort on the reenqueue callback we halt the iteration and no jobs are reenqueue
435+
assert_jobs_in_queue(0)
436+
end
437+
404438
def test_activerecord_batches_complete
405439
push(BatchActiveRecordIterationJob)
406440
processed_records = Product.order(:id).pluck(:id)

test/unit/throttle_enumerator_test.rb

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class IterationThrottleJob < ActiveJob::Base
1212

1313
cattr_accessor :on_complete_called, instance_accessor: false
1414
self.on_complete_called = 0
15+
cattr_accessor :on_reenqueue_called, instance_accessor: false
16+
self.on_reenqueue_called = 0
1517

1618
cattr_accessor :should_throttle_sequence, instance_accessor: false
1719
self.should_throttle_sequence = []
@@ -20,6 +22,10 @@ class IterationThrottleJob < ActiveJob::Base
2022
self.class.on_complete_called += 1
2123
end
2224

25+
on_reenqueue do
26+
self.class.on_reenqueue_called += 1
27+
end
28+
2329
def build_enumerator(_params, cursor:)
2430
enumerator_builder.build_throttle_enumerator(
2531
enumerator_builder.build_array_enumerator(
@@ -36,13 +42,18 @@ def each_iteration(record, _params)
3642
end
3743
end
3844

39-
setup do
40-
IterationThrottleJob.iterations_performed = []
45+
class IterationThrottleJobHaltReenqueue < IterationThrottleJob
46+
on_reenqueue do |_job|
47+
throw(:abort)
48+
end
4149
end
4250

43-
teardown do
44-
IterationThrottleJob.on_complete_called = 0
45-
IterationThrottleJob.should_throttle_sequence = []
51+
setup do
52+
IterationThrottleJob.descendants.each do |klass|
53+
klass.iterations_performed = []
54+
klass.on_complete_called = 0
55+
klass.on_reenqueue_called = 0
56+
end
4657
end
4758

4859
test "throttle enumerator proxies wrapped enumerator" do
@@ -92,6 +103,17 @@ def each_iteration(record, _params)
92103
assert_equal [1], IterationThrottleJob.iterations_performed
93104
end
94105

106+
test "do not push back to queue if reenqueue callback abort" do
107+
IterationThrottleJobHaltReenqueue.should_throttle_sequence = [false, true, false]
108+
109+
IterationThrottleJobHaltReenqueue.perform_now({})
110+
111+
enqueued = ActiveJob::Base.queue_adapter.enqueued_jobs
112+
assert_equal 0, enqueued.size
113+
114+
assert_equal [1], IterationThrottleJobHaltReenqueue.iterations_performed
115+
end
116+
95117
test "does not pushed back to queue if not throttle" do
96118
assert_predicate ActiveJob::Base.queue_adapter.enqueued_jobs, :empty?
97119

0 commit comments

Comments
 (0)