Skip to content

Commit f39525d

Browse files
committed
Serialize cursor_position
Previously, `cursor_position` was handed as-is to the queue adapter. This could lead to the queue adapter corrupting cursors of certain classes. For example, if given a `Time` cursor, Sidekiq would save it as JSON by calling `to_s`, resulting in the deserialized cursor being a `String` instead of a `Time`. To prevent this, we now leverage `ActiveJob::Arguments` to (de)serialize the `cursor_position` and ensure it will make the round trip safely. However, as this is a breaking change (as unsafe cursors would previously be accepted, but possibly corrupted, whereas they would now be rejected), we begin by rescuing (de)serialization failures and emitting a deprecation warning. Starting in Job Iteration version 2.0, the deprecation warning will be removed, and (de)serialization failure will raise. Application owners can opt-in to the 2.0 behavior either globally by setting JobIteration.enforce_serializable_cursors = true or on an inheritable per-class basis by setting class MyJob < ActiveJob::Base include JobIteration::Iteration self.job_iteration_enforce_serializable_cursors = true # ... end
1 parent f6bd25c commit f39525d

File tree

8 files changed

+391
-158
lines changed

8 files changed

+391
-158
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
- [437](https://github.com/Shopify/job-iteration/pull/437) - Use minimum between per-class `job_iteration_max_job_runtime` and `JobIteration.max_job_runtime`, instead of enforcing only setting decreasing values.
66
Because it is possible to change the global or parent values after setting the value on a class, it is not possible to truly enforce the decreasing value constraint. Instead, we now use the minimum between the global value and per-class value. This is considered a non-breaking change, as it should not break any **existing** code, it only removes the constraint on new classes.
7+
- [81](https://github.com/Shopify/job-iteration/pull/81) - Serialize cursors using `ActiveJob::Arguments` & deprecated unserializable cursors.
8+
Cursor serialization has been dependent on the adapter's serialization method, which typically uses `JSON.dump` and `JSON.load`, meaning only JSON-serializable objects could be used as cursors. Using `ActiveJob::Arguments` to serialize cursors instead allows the use of any object that can be serialized using `ActiveJob::Arguments.serialize` and deserialized using `ActiveJob::Arguments.deserialize`, such as `Time` objects, which would previously be lossily serialized as strings.
9+
This change is backwards compatible, by using a new job argument for the serialized cursor, but continuing to write to the old argument, ensuring that jobs can be processed regardless of if they are enqueued or dequeued with the old or new version of the gem.
10+
In the event that a cursor is not serializable, the gem will fall back to the deprecated old behaviour. In Job Iteration 2.0, this fallback will be removed, and cursors will be required to be serializable, raising otherwise. To opt-in to this behaviour, set `JobIteration.enforce_serializable_cursors = true`. To support gradual migration, a per-class `job_iteration_enforce_serializable_cursors` option is also available, which overrides the global option for that class.
711

812
### Bug fixes
913

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ class MyJob < ApplicationJob
213213
end
214214
```
215215

216+
See [the guide on Custom Enumerator](guides/custom-enumerator.md) for details.
217+
216218
## Credits
217219

218220
This project would not be possible without these individuals (in alphabetical order):

guides/custom-enumerator.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,68 @@ and you initiate the job with
101101
LoadRefundsForChargeJob.perform_later(charge_id = "chrg_345")
102102
```
103103

104+
### Cursor Serialization
105+
106+
Cursors should be of a [type that Active Job can serialize](https://guides.rubyonrails.org/active_job_basics.html#supported-types-for-arguments).
107+
108+
For example, consider:
109+
110+
```ruby
111+
FancyCursor = Struct.new(:wrapped_value) do
112+
def to_s
113+
wrapped_value
114+
end
115+
end
116+
```
117+
118+
```ruby
119+
def build_enumerator(cursor:)
120+
Enumerator.new do |yielder|
121+
# ...something with fancy cursor...
122+
yielder.yield 123, FancyCursor.new(:abc)
123+
end
124+
end
125+
```
126+
127+
If this job was interrupted, Active Job would be unable to serialize
128+
`FancyCursor`, and Job Iteration would fallback to the legacy behavior of not
129+
serializing the cursor. This would typically result in the queue adapter
130+
eventually serializing the cursor as JSON by calling `.to_s` on it. The cursor
131+
would be deserialized as `:abc`, rather than the intended `FancyCursor.new(:abc)`.
132+
133+
To avoid this, job authors should take care to ensure that their cursor is
134+
serializable by Active Job. This can be done in a couple ways, such as:
135+
- [adding `GlobalID` support to the cursor class](https://guides.rubyonrails.org/active_job_basics.html#globalid)
136+
- [implementing a custom Active Job argument serializer for the cursor class](https://guides.rubyonrails.org/active_job_basics.html#serializers)
137+
- handling (de)serialization in the job/enumerator itself
138+
```ruby
139+
def build_enumerator(cursor:)
140+
fancy_cursor = FancyCursor.new(cursor) unless cursor.nil?
141+
Enumerator.new do |yielder|
142+
# ...something with fancy cursor...
143+
yielder.yield 123, FancyCursor(:abc).wrapped_value
144+
end
145+
end
146+
```
147+
Note that starting in 2.0, Job Iteration will stop supporting fallback behavior
148+
and raise when it encounters an unserializable cursor. To opt-in to this behavior early, set
149+
```ruby
150+
JobIteration.enforce_serializable_cursors = true
151+
```
152+
or, to support gradual migration, a per-class option is also available to override the global value, if set:
153+
```ruby
154+
class MyJob < ActiveJob::Base
155+
include JobIteration::Iteration
156+
self.job_iteration_enforce_serializable_cursors = true
157+
```
158+
159+
104160
## Cursorless enumerator
105161

106162
Sometimes you can ignore the cursor. Consider the following custom `Enumerator` that takes items from a Redis list, which
107163
is essentially a queue. Even if this job doesn't need to persist a cursor in order to resume, it can still use
108164
`Iteration`'s signal handling to finish `each_iteration` and gracefully terminate.
165+
`Iteration`'s signal handling to finish `each_iteration` and gracefully terminate.
109166
110167
```ruby
111168
class RedisPopListJob < ActiveJob::Base

lib/job-iteration.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ module JobIteration
1111

1212
INTEGRATIONS = [:resque, :sidekiq]
1313

14+
Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration")
15+
1416
extend self
1517

1618
attr_writer :logger
@@ -57,6 +59,24 @@ def logger
5759
# where the throttle backoff value will take precedence over this setting.
5860
attr_accessor :default_retry_backoff
5961

62+
# Set this to `true` to enforce that cursors be composed of objects capable
63+
# of built-in (de)serialization by Active Job.
64+
#
65+
# JobIteration.enforce_serializable_cursors = true
66+
#
67+
# For more granular control, this can also be configured per job class, and
68+
# is inherited by child classes.
69+
#
70+
# class MyJob < ActiveJob::Base
71+
# include JobIteration::Iteration
72+
# self.job_iteration_enforce_serializable_cursors = false
73+
# # ...
74+
# end
75+
#
76+
# Note that non-enforcement is deprecated and enforcement will be mandatory
77+
# in version 2.0, at which point this config will go away.
78+
attr_accessor :enforce_serializable_cursors
79+
6080
# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
6181
attr_accessor :interruption_adapter
6282

lib/job-iteration/iteration.rb

Lines changed: 57 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,6 @@ module Iteration
1818
# The time isn't reset if the job is interrupted.
1919
attr_accessor :total_time
2020

21-
class CursorError < ArgumentError
22-
attr_reader :cursor
23-
24-
def initialize(message, cursor:)
25-
super(message)
26-
@cursor = cursor
27-
end
28-
29-
def message
30-
"#{super} (#{inspected_cursor})"
31-
end
32-
33-
private
34-
35-
def inspected_cursor
36-
cursor.inspect
37-
rescue NoMethodError
38-
# For those brave enough to try to use BasicObject as cursor. Nice try.
39-
Object.instance_method(:inspect).bind(cursor).call
40-
end
41-
end
42-
4321
included do |_base|
4422
define_callbacks :start
4523
define_callbacks :shutdown
@@ -50,6 +28,12 @@ def inspected_cursor
5028
instance_accessor: false,
5129
instance_predicate: false,
5230
)
31+
32+
class_attribute(
33+
:job_iteration_enforce_serializable_cursors,
34+
instance_accessor: false,
35+
instance_predicate: false,
36+
)
5337
end
5438

5539
module ClassMethods
@@ -88,16 +72,25 @@ def initialize(*arguments)
8872
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)
8973

9074
def serialize # @private
91-
super.merge(
92-
"cursor_position" => cursor_position,
75+
iteration_job_data = {
76+
"cursor_position" => cursor_position, # Backwards compatibility
9377
"times_interrupted" => times_interrupted,
9478
"total_time" => total_time,
95-
)
79+
}
80+
81+
begin
82+
iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position)
83+
rescue ActiveJob::SerializationError
84+
raise if job_iteration_enforce_serializable_cursors?
85+
# No point in duplicating the deprecation warning from assert_valid_cursor!
86+
end
87+
88+
super.merge(iteration_job_data)
9689
end
9790

9891
def deserialize(job_data) # @private
9992
super
100-
self.cursor_position = job_data["cursor_position"]
93+
self.cursor_position = cursor_position_from_job_data(job_data)
10194
self.times_interrupted = Integer(job_data["times_interrupted"] || 0)
10295
self.total_time = Float(job_data["total_time"] || 0.0)
10396
end
@@ -167,8 +160,7 @@ def iterate_with_enumerator(enumerator, arguments)
167160
@needs_reenqueue = false
168161

169162
enumerator.each do |object_from_enumerator, cursor_from_enumerator|
170-
# Deferred until 2.0.0
171-
# assert_valid_cursor!(cursor_from_enumerator)
163+
assert_valid_cursor!(cursor_from_enumerator)
172164

173165
tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator)
174166
ActiveSupport::Notifications.instrument("each_iteration.iteration", tags) do
@@ -222,16 +214,19 @@ def build_enumerator(params, cursor:)
222214
EOS
223215
end
224216

225-
# The adapter must be able to serialize and deserialize the cursor back into an equivalent object.
226-
# https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple
227217
def assert_valid_cursor!(cursor)
228-
return if serializable?(cursor)
218+
serialize_cursor(cursor)
219+
true
220+
rescue ActiveJob::SerializationError
221+
raise if job_iteration_enforce_serializable_cursors?
229222

230-
raise CursorError.new(
231-
"Cursor must be composed of objects capable of built-in (de)serialization: " \
232-
"Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.",
233-
cursor: cursor,
234-
)
223+
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3))
224+
The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize.
225+
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types
226+
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
227+
DEPRECATION_MESSAGE
228+
229+
false
235230
end
236231

237232
def assert_implements_methods!
@@ -286,6 +281,13 @@ def job_iteration_max_job_runtime
286281
[global_max, class_max].min
287282
end
288283

284+
def job_iteration_enforce_serializable_cursors? # TODO: Add a test for the edge case of registering it afterwards
285+
per_class_setting = self.class.job_iteration_enforce_serializable_cursors
286+
return per_class_setting unless per_class_setting.nil?
287+
288+
!!JobIteration.enforce_serializable_cursors
289+
end
290+
289291
def handle_completed(completed)
290292
case completed
291293
when nil # someone aborted the job but wants to call the on_complete callback
@@ -305,6 +307,25 @@ def handle_completed(completed)
305307
raise "Unexpected thrown value: #{completed.inspect}"
306308
end
307309

310+
def cursor_position_from_job_data(job_data)
311+
if job_data.key?("serialized_cursor_position")
312+
deserialize_cursor(job_data.fetch("serialized_cursor_position"))
313+
else
314+
# Backwards compatibility for
315+
# - jobs interrupted before cursor serialization feature shipped, or
316+
# - jobs whose cursor could not be serialized
317+
job_data.fetch("cursor_position", nil)
318+
end
319+
end
320+
321+
def serialize_cursor(cursor)
322+
ActiveJob::Arguments.serialize([cursor]).first
323+
end
324+
325+
def deserialize_cursor(cursor)
326+
ActiveJob::Arguments.deserialize([cursor]).first
327+
end
328+
308329
def valid_cursor_parameter?(parameters)
309330
# this condition is when people use the splat operator.
310331
# def build_enumerator(*)
@@ -316,21 +337,5 @@ def valid_cursor_parameter?(parameters)
316337
end
317338
false
318339
end
319-
320-
SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze
321-
private_constant :SIMPLE_SERIALIZABLE_CLASSES
322-
def serializable?(object)
323-
# Subclasses must be excluded, hence not using is_a? or ===.
324-
if object.instance_of?(Array)
325-
object.all? { |element| serializable?(element) }
326-
elsif object.instance_of?(Hash)
327-
object.all? { |key, value| serializable?(key) && serializable?(value) }
328-
else
329-
SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) }
330-
end
331-
rescue NoMethodError
332-
# BasicObject doesn't respond to instance_of, but we can't serialize it anyway
333-
false
334-
end
335340
end
336341
end

test/integration/integration_behaviour.rb

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,12 @@ module IntegrationBehaviour
3636

3737
test "unserializable corruption is prevented" do
3838
skip_until_version("2.0.0")
39-
# Cursors are serialized as JSON, but not all objects are serializable.
40-
# time = Time.at(0).utc # => 1970-01-01 00:00:00 UTC
41-
# json = JSON.dump(time) # => "\"1970-01-01 00:00:00 UTC\""
42-
# string = JSON.parse(json) # => "1970-01-01 00:00:00 UTC"
43-
# We serialized a Time, but it was deserialized as a String.
44-
TimeCursorJob.perform_later
39+
UnserializableCursorJob.perform_later
4540
TerminateJob.perform_later
4641
start_worker_and_wait
4742

4843
assert_equal(
49-
JobIteration::Iteration::CursorError.name,
44+
ActiveJob::SerializationError.name,
5045
failed_job_error_class_name,
5146
)
5247
end

test/support/jobs.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ def each_iteration(omg)
1515
end
1616
end
1717

18-
class TimeCursorJob < ActiveJob::Base
18+
class UnserializableCursorJob < ActiveJob::Base
1919
include JobIteration::Iteration
20+
UnserializableCursor = Class.new
2021

2122
def build_enumerator(cursor:)
22-
return [["item", Time.now]].to_enum if cursor.nil?
23+
return [["item", UnserializableCursor.new]].to_enum if cursor.nil?
2324

2425
raise "This should never run; cursor is unserializable!"
2526
end

0 commit comments

Comments
 (0)