Skip to content

Commit 07b6fd3

Browse files
committed
Serialize cursor_position
Previously, `cursor_position` was handed as-is to the queue adapter. This could lead to the queue adapter corrupting cursors of certain classes. For example, if given a `Time` cursor, Sidekiq would save it as JSON by calling `to_s`, resulting in the deserialized cursor being a `String` instead of a `Time`. To prevent this, we now leverage `ActiveJob::Arguments` to (de)serialize the `cursor_position` and ensure it will make the round trip safely. However, as this is a breaking change (as unsafe cursors would previously be accepted, but possibly corrupted, whereas they would now be rejected), we begin by rescuing (de)serialization failures and emitting a deprecation warning. Starting in Job Iteration version 2.0, the deprecation warning will be removed, and (de)serialization failure will raise. Application owners can opt-in to the 2.0 behavior either globally by setting JobIteration.enforce_serializable_cursors = true or on an inheritable per-class basis by setting class MyJob < ActiveJob::Base include JobIteration::Iteration self.job_iteration_enforce_serializable_cursors = true # ... end
1 parent 4e694a2 commit 07b6fd3

File tree

8 files changed

+396
-146
lines changed

8 files changed

+396
-146
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Ruby 2.7+, dropping 2.6 support
44
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support
55
- [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime`
6+
- [80](https://github.com/Shopify/job-iteration/pull/80) - Serialize cursors using ActiveJob::Arguments
7+
- [80](https://github.com/Shopify/job-iteration/pull/80) - Deprecate un(de)serializable cursors
8+
- [80](https://github.com/Shopify/job-iteration/pull/80) - Add `enforce_serializable_cursors` config
69

710
## v1.3.6 (Mar 9, 2022)
811

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ class MyJob < ApplicationJob
193193
end
194194
```
195195

196+
See [the guide on Custom Enumerators](guides/custom-enumerator.md) for details.
197+
196198
## Credits
197199

198200
This project would not be possible without these individuals (in alphabetical order):

guides/custom-enumerator.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
# Custom Enumerator
2+
13
Iteration leverages the [Enumerator](http://ruby-doc.org/core-2.5.1/Enumerator.html) pattern from the Ruby standard library, which allows us to use almost any resource as a collection to iterate.
24

5+
## Cursorless Enumerator
6+
37
Consider a custom Enumerator that takes items from a Redis list. Because a Redis List is essentially a queue, we can ignore the cursor:
48

59
```ruby
@@ -19,6 +23,8 @@ class ListJob < ActiveJob::Base
1923
end
2024
```
2125

26+
## Enumerator with cursor
27+
2228
But what about iterating based on a cursor? Consider this Enumerator that wraps third party API (Stripe) for paginated iteration:
2329

2430
```ruby
@@ -82,6 +88,57 @@ class StripeJob < ActiveJob::Base
8288
end
8389
```
8490

91+
## Notes
92+
8593
We recommend that you read the implementation of the other enumerators that come with the library (`CsvEnumerator`, `ActiveRecordEnumerator`) to gain a better understanding of building Enumerator objects.
8694

95+
### Post-`yield` code
96+
8797
Code that is written after the `yield` in a custom enumerator is not guaranteed to execute. In the case that a job is forced to exit ie `job_should_exit?` is true, then the job is re-enqueued during the yield and the rest of the code in the enumerator does not run. You can follow that logic [here](https://github.com/Shopify/job-iteration/blob/9641f455b9126efff2214692c0bef423e0d12c39/lib/job-iteration/iteration.rb#L128-L131).
98+
99+
### Cursor types
100+
101+
Cursors should be of a [type that Active Job can serialize](https://guides.rubyonrails.org/active_job_basics.html#supported-types-for-arguments).
102+
103+
For example, consider:
104+
105+
```ruby
106+
FancyCursor = Struct.new(:wrapped_value) do
107+
def to_s
108+
wrapped_value
109+
end
110+
end
111+
```
112+
113+
```ruby
114+
def build_enumerator(cursor:)
115+
Enumerator.new do |yielder|
116+
# ...something with fancy cursor...
117+
yield 123, FancyCursor.new(:abc)
118+
end
119+
end
120+
```
121+
122+
If this job was interrupted, Active Job would be unable to serialize
123+
`FancyCursor`, and Job Iteration would fallback to the legacy behavior of not
124+
serializing the cursor. This would typically result in the queue adapter
125+
eventually serializing the cursor as JSON by calling `.to_s` on it. The cursor
126+
would be deserialized as `:abc`, rather than the intended `FancyCursor.new(:abc)`.
127+
128+
To avoid this, job authors should take care to ensure that their cursor is
129+
serializable by Active Job. This can be done in a couple ways, such as:
130+
- [adding GlobalID support to the cursor class](https://guides.rubyonrails.org/active_job_basics.html#globalid)
131+
- [implementing a custom Active Job argument serializer for the cursor class](https://guides.rubyonrails.org/active_job_basics.html#serializers)
132+
- handling (de)serialization in the job/enumerator itself
133+
```ruby
134+
def build_enumerator(cursor:)
135+
fancy_cursor = FancyCursor.new(cursor) unless cursor.nil?
136+
Enumerator.new do |yielder|
137+
# ...something with fancy cursor...
138+
yield 123, FancyCursor(:abc).wrapped_value
139+
end
140+
end
141+
```
142+
143+
Note that starting in 2.0, Job Iteration will stop supporting fallback behavior
144+
and raise when it encounters an unserializable cursor.

lib/job-iteration.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ module JobIteration
99

1010
INTEGRATIONS = [:resque, :sidekiq]
1111

12+
Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration")
13+
1214
extend self
1315

1416
# Use this to _always_ interrupt the job after it's been running for more than N seconds.
@@ -29,6 +31,24 @@ module JobIteration
2931
# # ...
3032
attr_accessor :max_job_runtime
3133

34+
# Set this to `true` to enforce that cursors be composed of objects capable
35+
# of built-in (de)serialization by Active Job.
36+
#
37+
# JobIteration.enforce_serializable_cursors = true
38+
#
39+
# For more granular control, this can also be configured per job class, and
40+
# is inherited by child classes.
41+
#
42+
# class MyJob < ActiveJob::Base
43+
# include JobIteration::Iteration
44+
# self.job_iteration_enforce_serializable_cursors = false
45+
# # ...
46+
# end
47+
#
48+
# Note that non-enforcement is deprecated and enforcement will be mandatory
49+
# in version 2.0, at which point this config will go away.
50+
attr_accessor :enforce_serializable_cursors
51+
3252
# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
3353
attr_accessor :interruption_adapter
3454

lib/job-iteration/iteration.rb

Lines changed: 53 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,6 @@ module Iteration
1313
:total_time,
1414
)
1515

16-
class CursorError < ArgumentError
17-
attr_reader :cursor
18-
19-
def initialize(message, cursor:)
20-
super(message)
21-
@cursor = cursor
22-
end
23-
24-
def message
25-
"#{super} (#{inspected_cursor})"
26-
end
27-
28-
private
29-
30-
def inspected_cursor
31-
cursor.inspect
32-
rescue NoMethodError
33-
# For those brave enough to try to use BasicObject as cursor. Nice try.
34-
Object.instance_method(:inspect).bind(cursor).call
35-
end
36-
end
37-
3816
included do |_base|
3917
define_callbacks :start
4018
define_callbacks :shutdown
@@ -47,6 +25,13 @@ def inspected_cursor
4725
default: JobIteration.max_job_runtime,
4826
)
4927

28+
class_attribute(
29+
:job_iteration_enforce_serializable_cursors,
30+
instance_writer: false,
31+
instance_predicate: false,
32+
default: JobIteration.enforce_serializable_cursors,
33+
)
34+
5035
singleton_class.prepend(PrependedClassMethods)
5136
end
5237

@@ -104,16 +89,25 @@ def initialize(*arguments)
10489
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)
10590

10691
def serialize # @private
107-
super.merge(
108-
"cursor_position" => cursor_position,
92+
iteration_job_data = {
93+
"cursor_position" => cursor_position, # Backwards compatibility
10994
"times_interrupted" => times_interrupted,
11095
"total_time" => total_time,
111-
)
96+
}
97+
98+
begin
99+
iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position)
100+
rescue ActiveJob::SerializationError
101+
raise if job_iteration_enforce_serializable_cursors
102+
# No point in duplicating the deprecation warning from assert_valid_cursor!
103+
end
104+
105+
super.merge(iteration_job_data)
112106
end
113107

114108
def deserialize(job_data) # @private
115109
super
116-
self.cursor_position = job_data["cursor_position"]
110+
self.cursor_position = cursor_position_from_job_data(job_data)
117111
self.times_interrupted = job_data["times_interrupted"] || 0
118112
self.total_time = job_data["total_time"] || 0
119113
end
@@ -176,8 +170,7 @@ def iterate_with_enumerator(enumerator, arguments)
176170
@needs_reenqueue = false
177171

178172
enumerator.each do |object_from_enumerator, index|
179-
# Deferred until 2.0.0
180-
# assert_valid_cursor!(index)
173+
assert_valid_cursor!(index)
181174

182175
record_unit_of_work do
183176
found_record = true
@@ -236,16 +229,21 @@ def build_enumerator(params, cursor:)
236229
EOS
237230
end
238231

239-
# The adapter must be able to serialize and deserialize the cursor back into an equivalent object.
240-
# https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple
241232
def assert_valid_cursor!(cursor)
242-
return if serializable?(cursor)
233+
serialize_cursor(cursor)
234+
true
235+
rescue ActiveJob::SerializationError
236+
raise if job_iteration_enforce_serializable_cursors
243237

244-
raise CursorError.new(
245-
"Cursor must be composed of objects capable of built-in (de)serialization: " \
246-
"Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.",
247-
cursor: cursor,
248-
)
238+
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3))
239+
The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize.
240+
241+
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types
242+
243+
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
244+
DEPRECATION_MESSAGE
245+
246+
false
249247
end
250248

251249
def assert_implements_methods!
@@ -315,6 +313,25 @@ def handle_completed(completed)
315313
raise "Unexpected thrown value: #{completed.inspect}"
316314
end
317315

316+
def cursor_position_from_job_data(job_data)
317+
if job_data.key?("serialized_cursor_position")
318+
deserialize_cursor(job_data.fetch("serialized_cursor_position"))
319+
else
320+
# Backwards compatibility for
321+
# - jobs interrupted before cursor serialization feature shipped, or
322+
# - jobs whose cursor could not be serialized
323+
job_data.fetch("cursor_position", nil)
324+
end
325+
end
326+
327+
def serialize_cursor(cursor)
328+
ActiveJob::Arguments.serialize([cursor]).first
329+
end
330+
331+
def deserialize_cursor(cursor)
332+
ActiveJob::Arguments.deserialize([cursor]).first
333+
end
334+
318335
def valid_cursor_parameter?(parameters)
319336
# this condition is when people use the splat operator.
320337
# def build_enumerator(*)
@@ -326,21 +343,5 @@ def valid_cursor_parameter?(parameters)
326343
end
327344
false
328345
end
329-
330-
SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze
331-
private_constant :SIMPLE_SERIALIZABLE_CLASSES
332-
def serializable?(object)
333-
# Subclasses must be excluded, hence not using is_a? or ===.
334-
if object.instance_of?(Array)
335-
object.all? { |element| serializable?(element) }
336-
elsif object.instance_of?(Hash)
337-
object.all? { |key, value| serializable?(key) && serializable?(value) }
338-
else
339-
SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) }
340-
end
341-
rescue NoMethodError
342-
# BasicObject doesn't respond to instance_of, but we can't serialize it anyway
343-
false
344-
end
345346
end
346347
end

test/integration/integration_behaviour.rb

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,12 @@ module IntegrationBehaviour
3636

3737
test "unserializable corruption is prevented" do
3838
skip_until_version("2.0.0")
39-
# Cursors are serialized as JSON, but not all objects are serializable.
40-
# time = Time.at(0).utc # => 1970-01-01 00:00:00 UTC
41-
# json = JSON.dump(time) # => "\"1970-01-01 00:00:00 UTC\""
42-
# string = JSON.parse(json) # => "1970-01-01 00:00:00 UTC"
43-
# We serialized a Time, but it was deserialized as a String.
44-
TimeCursorJob.perform_later
39+
UnserializableCursorJob.perform_later
4540
TerminateJob.perform_later
4641
start_worker_and_wait
4742

4843
assert_equal(
49-
JobIteration::Iteration::CursorError.name,
44+
ActiveJob::SerializationError.name,
5045
failed_job_error_class_name,
5146
)
5247
end

test/support/jobs.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ def each_iteration(omg)
1515
end
1616
end
1717

18-
class TimeCursorJob < ActiveJob::Base
18+
class UnserializableCursorJob < ActiveJob::Base
1919
include JobIteration::Iteration
20+
UnserializableCursor = Class.new
2021

2122
def build_enumerator(cursor:)
22-
return [["item", Time.now]].to_enum if cursor.nil?
23+
return [["item", UnserializableCursor.new]].to_enum if cursor.nil?
2324

2425
raise "This should never run; cursor is unserializable!"
2526
end

0 commit comments

Comments
 (0)