Skip to content

Commit e8a5168

Browse files
committed
Expose poller automation
1 parent e93350c commit e8a5168

9 files changed

Lines changed: 277 additions & 24 deletions

File tree

temporalio/ext/src/worker.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@ use std::{
66
};
77

88
use crate::{
9+
ROOT_MOD,
910
client::Client,
1011
enter_sync, error, id, new_error,
1112
runtime::{AsyncCommand, Runtime, RuntimeHandle},
1213
util::{AsyncCallback, Struct},
13-
ROOT_MOD,
1414
};
15+
use futures::{StreamExt, stream::BoxStream};
1516
use futures::{future, stream};
16-
use futures::{stream::BoxStream, StreamExt};
1717
use magnus::{
18-
class, function, method, prelude::*, typed_data, DataTypeFunctions, Error, IntoValue, RArray,
19-
RString, RTypedData, Ruby, TypedData, Value,
18+
DataTypeFunctions, Error, IntoValue, RArray, RString, RTypedData, Ruby, TypedData, Value,
19+
class, function, method, prelude::*, typed_data,
2020
};
2121
use prost::Message;
2222
use temporal_sdk_core::{
23-
replay::{HistoryForReplay, ReplayWorkerInput},
2423
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
2524
SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfig, WorkerConfigBuilder,
25+
replay::{HistoryForReplay, ReplayWorkerInput},
2626
};
2727
use temporal_sdk_core_api::{
2828
errors::{PollError, WorkflowErrorType},
@@ -34,7 +34,7 @@ use temporal_sdk_core_api::{
3434
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
3535
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
3636
use temporal_sdk_core_protos::temporal::api::history::v1::History;
37-
use tokio::sync::mpsc::{channel, Sender};
37+
use tokio::sync::mpsc::{Sender, channel};
3838
use tokio_stream::wrappers::ReceiverStream;
3939

4040
pub fn init(ruby: &Ruby) -> Result<(), Error> {
@@ -480,15 +480,21 @@ fn build_config(options: Struct) -> Result<WorkerConfig, Error> {
480480
})
481481
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
482482
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
483-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
484-
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
485-
))
483+
.workflow_task_poller_behavior({
484+
let poller_behavior = options
485+
.child(id!("workflow_task_poller_behavior"))?
486+
.ok_or_else(|| error!("Worker options must have workflow_task_poller_behavior"))?;
487+
extract_poller_behavior(poller_behavior)?
488+
})
486489
.nonsticky_to_sticky_poll_ratio(
487490
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
488491
)
489-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
490-
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
491-
))
492+
.activity_task_poller_behavior({
493+
let poller_behavior = options
494+
.child(id!("activity_task_poller_behavior"))?
495+
.ok_or_else(|| error!("Worker options must have activity_task_poller_behavior"))?;
496+
extract_poller_behavior(poller_behavior)?
497+
})
492498
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
493499
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
494500
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
@@ -605,3 +611,15 @@ fn build_tuner_resource_options<SK: SlotKind>(
605611
Some(slots_options),
606612
))
607613
}
614+
615+
fn extract_poller_behavior(poller_behavior: Struct) -> Result<PollerBehavior, Error> {
616+
Ok(if poller_behavior.member::<usize>(id!("initial")).is_ok() {
617+
PollerBehavior::Autoscaling {
618+
minimum: poller_behavior.member::<usize>(id!("minimum"))?,
619+
maximum: poller_behavior.member::<usize>(id!("maximum"))?,
620+
initial: poller_behavior.member::<usize>(id!("initial"))?,
621+
}
622+
} else {
623+
PollerBehavior::SimpleMaximum(poller_behavior.member::<usize>(id!("maximum"))?)
624+
})
625+
}

temporalio/lib/temporalio/internal/bridge/worker.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ class Worker
1414
:tuner,
1515
:identity_override,
1616
:max_cached_workflows,
17-
:max_concurrent_workflow_task_polls,
17+
:workflow_task_poller_behavior,
1818
:nonsticky_to_sticky_poll_ratio,
19-
:max_concurrent_activity_task_polls,
19+
:activity_task_poller_behavior,
2020
:no_remote_activities,
2121
:sticky_queue_schedule_to_start_timeout,
2222
:max_heartbeat_throttle_interval,
@@ -65,6 +65,18 @@ class Worker
6565
keyword_init: true
6666
)
6767

68+
PollerBehaviorSimpleMaximum = Struct.new(
69+
:simple_maximum,
70+
keyword_init: true
71+
)
72+
73+
PollerBehaviorAutoscaling = Struct.new(
74+
:minimum,
75+
:maximum,
76+
:initial,
77+
keyword_init: true
78+
)
79+
6880
def self.finalize_shutdown_all(workers)
6981
queue = Queue.new
7082
async_finalize_all(workers, queue)

temporalio/lib/temporalio/worker.rb

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
require 'temporalio/internal/worker/workflow_worker'
1414
require 'temporalio/worker/activity_executor'
1515
require 'temporalio/worker/interceptor'
16+
require 'temporalio/worker/poller_behavior'
1617
require 'temporalio/worker/thread_pool'
1718
require 'temporalio/worker/tuner'
1819
require 'temporalio/worker/workflow_executor'
@@ -52,7 +53,9 @@ class Worker
5253
:workflow_payload_codec_thread_pool,
5354
:unsafe_workflow_io_enabled,
5455
:deployment_options,
55-
:debug_mode
56+
:debug_mode,
57+
:workflow_task_poller_behavior,
58+
:activity_task_poller_behavior
5659
)
5760

5861
# Options as returned from {options} for `**to_h` splat use in {initialize}. See {initialize} for details.
@@ -385,6 +388,8 @@ def initialize(
385388
workflow_payload_codec_thread_pool: nil,
386389
unsafe_workflow_io_enabled: false,
387390
deployment_options: Worker.default_deployment_options,
391+
workflow_task_poller_behavior: nil,
392+
activity_task_poller_behavior: nil,
388393
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
389394
)
390395
raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?
@@ -419,7 +424,9 @@ def initialize(
419424
workflow_payload_codec_thread_pool:,
420425
unsafe_workflow_io_enabled:,
421426
deployment_options:,
422-
debug_mode:
427+
debug_mode:,
428+
workflow_task_poller_behavior:,
429+
activity_task_poller_behavior:
423430
).freeze
424431

425432
should_enforce_versioning_behavior =
@@ -435,6 +442,10 @@ def initialize(
435442
workflow_failure_exception_types:, workflow_definitions:
436443
)
437444

445+
# Convert deprecated max concurrent polls to poller behaviors if not specified
446+
workflow_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_workflow_task_polls)
447+
activity_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_activity_task_polls)
448+
438449
# Create the bridge worker
439450
@bridge_worker = Internal::Bridge::Worker.new(
440451
client.connection._core_client,
@@ -446,9 +457,9 @@ def initialize(
446457
tuner: tuner._to_bridge_options,
447458
identity_override: identity,
448459
max_cached_workflows:,
449-
max_concurrent_workflow_task_polls:,
460+
workflow_task_poller_behavior: workflow_task_poller_behavior._to_bridge_options,
450461
nonsticky_to_sticky_poll_ratio:,
451-
max_concurrent_activity_task_polls:,
462+
activity_task_poller_behavior: activity_task_poller_behavior._to_bridge_options,
452463
# For shutdown to work properly, we must disable remote activities
453464
# ourselves if there are no activities
454465
no_remote_activities: no_remote_activities || activities.empty?,
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# frozen_string_literal: true
2+
3+
module Temporalio
4+
class Worker
5+
# Base class for poller behaviors that control how polling scales.
6+
#
7+
# Use factory methods {.simple_maximum} or {.autoscaling} to create instances.
8+
class PollerBehavior
9+
# Creates a simple maximum poller behavior
10+
# The poller will attempt to poll as long as a slot is available, up to the
11+
# provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
12+
#
13+
# @param maximum [Integer] Maximum number of concurrent poll requests.
14+
# @return [SimpleMaximumPollerBehavior] A simple maximum poller behavior
15+
def self.simple_maximum(maximum = 5)
16+
SimpleMaximumPollerBehavior.new(maximum: maximum)
17+
end
18+
19+
# Creates an autoscaling poller behavior
20+
# The poller will automatically scale the number of pollers based on feedback
21+
# from the server. A slot must be available before beginning polling.
22+
#
23+
# @param minimum [Integer] Minimum number of poll calls (assuming slots are available).
24+
# @param maximum [Integer] Maximum number of poll calls that will ever be open at once.
25+
# @param initial [Integer] Number of polls attempted initially before scaling kicks in.
26+
# @return [AutoscalingPollerBehavior] An autoscaling poller behavior
27+
def self.autoscaling(minimum: 1, maximum: 100, initial: 5)
28+
AutoscalingPollerBehavior.new(minimum: minimum, maximum: maximum, initial: initial)
29+
end
30+
31+
# @!visibility private
32+
def _to_bridge_options
33+
raise NotImplementedError, 'Subclasses must implement this method'
34+
end
35+
end
36+
37+
# A poller behavior that attempts to poll as long as a slot is available, up to the
38+
# provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
39+
class SimpleMaximumPollerBehavior < PollerBehavior
40+
# @return [Integer] Maximum number of concurrent poll requests.
41+
attr_reader :maximum
42+
43+
# @param maximum [Integer] Maximum number of concurrent poll requests.
44+
def initialize(maximum: 5)
45+
super()
46+
@maximum = maximum
47+
end
48+
49+
# @!visibility private
50+
def _to_bridge_options
51+
Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: maximum)
52+
end
53+
end
54+
55+
# A poller behavior that automatically scales the number of pollers based on feedback
56+
# from the server. A slot must be available before beginning polling.
57+
class AutoscalingPollerBehavior < PollerBehavior
58+
# @return [Integer] Minimum number of poll calls (assuming slots are available).
59+
attr_reader :minimum
60+
# @return [Integer] Maximum number of poll calls that will ever be open at once.
61+
attr_reader :maximum
62+
# @return [Integer] Number of polls attempted initially before scaling kicks in.
63+
attr_reader :initial
64+
65+
# @param minimum [Integer] Minimum number of poll calls (assuming slots are available).
66+
# @param maximum [Integer] Maximum number of poll calls that will ever be open at once.
67+
# @param initial [Integer] Number of polls attempted initially before scaling kicks in.
68+
def initialize(minimum: 1, maximum: 100, initial: 5)
69+
super()
70+
@minimum = minimum
71+
@maximum = maximum
72+
@initial = initial
73+
end
74+
75+
# @!visibility private
76+
def _to_bridge_options
77+
Internal::Bridge::Worker::PollerBehaviorAutoscaling.new(
78+
minimum: minimum,
79+
maximum: maximum,
80+
initial: initial
81+
)
82+
end
83+
end
84+
end
85+
end

temporalio/lib/temporalio/worker/workflow_replayer.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require 'temporalio/internal/worker/multi_runner'
88
require 'temporalio/internal/worker/workflow_worker'
99
require 'temporalio/worker/interceptor'
10+
require 'temporalio/worker/poller_behavior'
1011
require 'temporalio/worker/thread_pool'
1112
require 'temporalio/worker/tuner'
1213
require 'temporalio/worker/workflow_executor'
@@ -205,9 +206,9 @@ def initialize(
205206
)._to_bridge_options,
206207
identity_override: options.identity,
207208
max_cached_workflows: 2,
208-
max_concurrent_workflow_task_polls: 2,
209+
workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(2)._to_bridge_options,
209210
nonsticky_to_sticky_poll_ratio: 1.0,
210-
max_concurrent_activity_task_polls: 1,
211+
activity_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(1)._to_bridge_options,
211212
no_remote_activities: true,
212213
sticky_queue_schedule_to_start_timeout: 1.0,
213214
max_heartbeat_throttle_interval: 1.0,

temporalio/sig/temporalio/internal/bridge/worker.rbs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ module Temporalio
1010
attr_accessor tuner: TunerOptions
1111
attr_accessor identity_override: String?
1212
attr_accessor max_cached_workflows: Integer
13-
attr_accessor max_concurrent_workflow_task_polls: Integer
13+
attr_accessor workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling
1414
attr_accessor nonsticky_to_sticky_poll_ratio: Float
15-
attr_accessor max_concurrent_activity_task_polls: Integer
15+
attr_accessor activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling
1616
attr_accessor no_remote_activities: bool
1717
attr_accessor sticky_queue_schedule_to_start_timeout: Float
1818
attr_accessor max_heartbeat_throttle_interval: Float
@@ -32,9 +32,9 @@ module Temporalio
3232
tuner: TunerOptions,
3333
identity_override: String?,
3434
max_cached_workflows: Integer,
35-
max_concurrent_workflow_task_polls: Integer,
35+
workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling,
3636
nonsticky_to_sticky_poll_ratio: Float,
37-
max_concurrent_activity_task_polls: Integer,
37+
activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling,
3838
no_remote_activities: bool,
3939
sticky_queue_schedule_to_start_timeout: Float,
4040
max_heartbeat_throttle_interval: Float,
@@ -142,6 +142,20 @@ module Temporalio
142142
) -> void
143143
end
144144

145+
class PollerBehaviorSimpleMaximum
146+
attr_accessor simple_maximum: Integer
147+
148+
def initialize: (simple_maximum: Integer) -> void
149+
end
150+
151+
class PollerBehaviorAutoscaling
152+
attr_accessor minimum: Integer
153+
attr_accessor maximum: Integer
154+
attr_accessor initial: Integer
155+
156+
def initialize: (minimum: Integer, maximum: Integer, initial: Integer) -> void
157+
end
158+
145159
class WorkflowReplayer
146160
def self.new: (Runtime runtime, Options options) -> [WorkflowReplayer, Worker]
147161

temporalio/sig/temporalio/worker.rbs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Temporalio
22
class Worker
3+
34
class Options
45
attr_reader client: Client
56
attr_reader task_queue: String
@@ -27,6 +28,8 @@ module Temporalio
2728
attr_reader workflow_failure_exception_types: Array[singleton(Exception)]
2829
attr_reader workflow_payload_codec_thread_pool: ThreadPool?
2930
attr_reader unsafe_workflow_io_enabled: bool
31+
attr_reader workflow_task_poller_behavior: PollerBehavior
32+
attr_reader activity_task_poller_behavior: PollerBehavior
3033
attr_reader debug_mode: bool
3134

3235
def initialize: (
@@ -56,6 +59,8 @@ module Temporalio
5659
workflow_failure_exception_types: Array[singleton(Exception)],
5760
workflow_payload_codec_thread_pool: ThreadPool?,
5861
unsafe_workflow_io_enabled: bool,
62+
workflow_task_poller_behavior: PollerBehavior,
63+
activity_task_poller_behavior: PollerBehavior,
5964
deployment_options: Worker::DeploymentOptions,
6065
debug_mode: bool
6166
) -> void
@@ -106,6 +111,8 @@ module Temporalio
106111
?workflow_failure_exception_types: Array[singleton(Exception)],
107112
?workflow_payload_codec_thread_pool: ThreadPool?,
108113
?unsafe_workflow_io_enabled: bool,
114+
?workflow_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior,
115+
?activity_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior,
109116
?deployment_options: Worker::DeploymentOptions,
110117
?debug_mode: bool
111118
) -> void
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
module Temporalio
2+
class Worker
3+
class PollerBehavior
4+
def self.simple_maximum: (Integer) -> SimpleMaximumPollerBehavior
5+
def self.autoscaling: (?minimum: Integer, ?maximum: Integer, ?initial: Integer) -> AutoscalingPollerBehavior
6+
def _to_bridge_options: -> untyped
7+
end
8+
9+
class SimpleMaximumPollerBehavior < PollerBehavior
10+
attr_reader maximum: Integer
11+
12+
def initialize: (?maximum: Integer) -> void
13+
def _to_bridge_options: -> Internal::Bridge::Worker::PollerBehaviorSimpleMaximum
14+
end
15+
16+
class AutoscalingPollerBehavior < PollerBehavior
17+
attr_reader minimum: Integer
18+
attr_reader maximum: Integer
19+
attr_reader initial: Integer
20+
21+
def initialize: (?minimum: Integer, ?maximum: Integer, ?initial: Integer) -> void
22+
def _to_bridge_options: -> Internal::Bridge::Worker::PollerBehaviorAutoscaling
23+
end
24+
end
25+
end

0 commit comments

Comments
 (0)