From e93350c515192eb2af4c324b30c16d0250161db6 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 28 May 2025 13:57:41 -0700 Subject: [PATCH 1/5] Add priority annotations --- temporalio/lib/temporalio/client/interceptor.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/temporalio/lib/temporalio/client/interceptor.rb b/temporalio/lib/temporalio/client/interceptor.rb index 0ed9bdad..cab459db 100644 --- a/temporalio/lib/temporalio/client/interceptor.rb +++ b/temporalio/lib/temporalio/client/interceptor.rb @@ -37,6 +37,7 @@ def intercept_client(next_interceptor) :start_delay, :request_eager_start, :headers, + :priority, :versioning_override, :priority, :rpc_options From e8a51685477cc3b15991df81e5cb7e5a83dc8ee5 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 28 May 2025 16:43:13 -0700 Subject: [PATCH 2/5] Expose poller automation --- temporalio/ext/src/worker.rs | 42 ++++++--- .../lib/temporalio/internal/bridge/worker.rb | 16 +++- temporalio/lib/temporalio/worker.rb | 19 ++++- .../lib/temporalio/worker/poller_behavior.rb | 85 +++++++++++++++++++ .../temporalio/worker/workflow_replayer.rb | 5 +- .../sig/temporalio/internal/bridge/worker.rbs | 22 ++++- temporalio/sig/temporalio/worker.rbs | 7 ++ .../sig/temporalio/worker/poller_behavior.rbs | 25 ++++++ temporalio/test/worker_test.rb | 80 +++++++++++++++++ 9 files changed, 277 insertions(+), 24 deletions(-) create mode 100644 temporalio/lib/temporalio/worker/poller_behavior.rb create mode 100644 temporalio/sig/temporalio/worker/poller_behavior.rbs diff --git a/temporalio/ext/src/worker.rs b/temporalio/ext/src/worker.rs index 19f66b07..0b4ea504 100644 --- a/temporalio/ext/src/worker.rs +++ b/temporalio/ext/src/worker.rs @@ -6,23 +6,23 @@ use std::{ }; use crate::{ + ROOT_MOD, client::Client, enter_sync, error, id, new_error, runtime::{AsyncCommand, Runtime, RuntimeHandle}, util::{AsyncCallback, Struct}, - ROOT_MOD, }; +use futures::{StreamExt, stream::BoxStream}; use futures::{future, stream}; -use futures::{stream::BoxStream, StreamExt}; use magnus::{ - class, function, method, prelude::*, typed_data, DataTypeFunctions, Error, IntoValue, RArray, - RString, RTypedData, Ruby, TypedData, Value, + DataTypeFunctions, Error, IntoValue, RArray, RString, RTypedData, Ruby, TypedData, Value, + class, function, method, prelude::*, typed_data, }; use prost::Message; use temporal_sdk_core::{ - replay::{HistoryForReplay, ReplayWorkerInput}, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions, SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfig, WorkerConfigBuilder, + replay::{HistoryForReplay, ReplayWorkerInput}, }; use temporal_sdk_core_api::{ errors::{PollError, WorkflowErrorType}, @@ -34,7 +34,7 @@ use temporal_sdk_core_api::{ use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion}; use temporal_sdk_core_protos::temporal::api::history::v1::History; -use tokio::sync::mpsc::{channel, Sender}; +use tokio::sync::mpsc::{Sender, channel}; use tokio_stream::wrappers::ReceiverStream; pub fn init(ruby: &Ruby) -> Result<(), Error> { @@ -480,15 +480,21 @@ fn build_config(options: Struct) -> Result { }) .client_identity_override(options.member::>(id!("identity_override"))?) .max_cached_workflows(options.member::(id!("max_cached_workflows"))?) - .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum( - options.member::(id!("max_concurrent_workflow_task_polls"))?, - )) + .workflow_task_poller_behavior({ + let poller_behavior = options + .child(id!("workflow_task_poller_behavior"))? + .ok_or_else(|| error!("Worker options must have workflow_task_poller_behavior"))?; + extract_poller_behavior(poller_behavior)? + }) .nonsticky_to_sticky_poll_ratio( options.member::(id!("nonsticky_to_sticky_poll_ratio"))?, ) - .activity_task_poller_behavior(PollerBehavior::SimpleMaximum( - options.member::(id!("max_concurrent_activity_task_polls"))?, - )) + .activity_task_poller_behavior({ + let poller_behavior = options + .child(id!("activity_task_poller_behavior"))? + .ok_or_else(|| error!("Worker options must have activity_task_poller_behavior"))?; + extract_poller_behavior(poller_behavior)? + }) .no_remote_activities(options.member::(id!("no_remote_activities"))?) .sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64( options.member(id!("sticky_queue_schedule_to_start_timeout"))?, @@ -605,3 +611,15 @@ fn build_tuner_resource_options( Some(slots_options), )) } + +fn extract_poller_behavior(poller_behavior: Struct) -> Result { + Ok(if poller_behavior.member::(id!("initial")).is_ok() { + PollerBehavior::Autoscaling { + minimum: poller_behavior.member::(id!("minimum"))?, + maximum: poller_behavior.member::(id!("maximum"))?, + initial: poller_behavior.member::(id!("initial"))?, + } + } else { + PollerBehavior::SimpleMaximum(poller_behavior.member::(id!("maximum"))?) + }) +} diff --git a/temporalio/lib/temporalio/internal/bridge/worker.rb b/temporalio/lib/temporalio/internal/bridge/worker.rb index d3e915ff..1677edd0 100644 --- a/temporalio/lib/temporalio/internal/bridge/worker.rb +++ b/temporalio/lib/temporalio/internal/bridge/worker.rb @@ -14,9 +14,9 @@ class Worker :tuner, :identity_override, :max_cached_workflows, - :max_concurrent_workflow_task_polls, + :workflow_task_poller_behavior, :nonsticky_to_sticky_poll_ratio, - :max_concurrent_activity_task_polls, + :activity_task_poller_behavior, :no_remote_activities, :sticky_queue_schedule_to_start_timeout, :max_heartbeat_throttle_interval, @@ -65,6 +65,18 @@ class Worker keyword_init: true ) + PollerBehaviorSimpleMaximum = Struct.new( + :simple_maximum, + keyword_init: true + ) + + PollerBehaviorAutoscaling = Struct.new( + :minimum, + :maximum, + :initial, + keyword_init: true + ) + def self.finalize_shutdown_all(workers) queue = Queue.new async_finalize_all(workers, queue) diff --git a/temporalio/lib/temporalio/worker.rb b/temporalio/lib/temporalio/worker.rb index 3242caae..12b106c0 100644 --- a/temporalio/lib/temporalio/worker.rb +++ b/temporalio/lib/temporalio/worker.rb @@ -13,6 +13,7 @@ require 'temporalio/internal/worker/workflow_worker' require 'temporalio/worker/activity_executor' require 'temporalio/worker/interceptor' +require 'temporalio/worker/poller_behavior' require 'temporalio/worker/thread_pool' require 'temporalio/worker/tuner' require 'temporalio/worker/workflow_executor' @@ -52,7 +53,9 @@ class Worker :workflow_payload_codec_thread_pool, :unsafe_workflow_io_enabled, :deployment_options, - :debug_mode + :debug_mode, + :workflow_task_poller_behavior, + :activity_task_poller_behavior ) # Options as returned from {options} for `**to_h` splat use in {initialize}. See {initialize} for details. @@ -385,6 +388,8 @@ def initialize( workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, + workflow_task_poller_behavior: nil, + activity_task_poller_behavior: nil, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase) ) raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty? @@ -419,7 +424,9 @@ def initialize( workflow_payload_codec_thread_pool:, unsafe_workflow_io_enabled:, deployment_options:, - debug_mode: + debug_mode:, + workflow_task_poller_behavior:, + activity_task_poller_behavior: ).freeze should_enforce_versioning_behavior = @@ -435,6 +442,10 @@ def initialize( workflow_failure_exception_types:, workflow_definitions: ) + # Convert deprecated max concurrent polls to poller behaviors if not specified + workflow_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_workflow_task_polls) + activity_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_activity_task_polls) + # Create the bridge worker @bridge_worker = Internal::Bridge::Worker.new( client.connection._core_client, @@ -446,9 +457,9 @@ def initialize( tuner: tuner._to_bridge_options, identity_override: identity, max_cached_workflows:, - max_concurrent_workflow_task_polls:, + workflow_task_poller_behavior: workflow_task_poller_behavior._to_bridge_options, nonsticky_to_sticky_poll_ratio:, - max_concurrent_activity_task_polls:, + activity_task_poller_behavior: activity_task_poller_behavior._to_bridge_options, # For shutdown to work properly, we must disable remote activities # ourselves if there are no activities no_remote_activities: no_remote_activities || activities.empty?, diff --git a/temporalio/lib/temporalio/worker/poller_behavior.rb b/temporalio/lib/temporalio/worker/poller_behavior.rb new file mode 100644 index 00000000..1155b5e4 --- /dev/null +++ b/temporalio/lib/temporalio/worker/poller_behavior.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +module Temporalio + class Worker + # Base class for poller behaviors that control how polling scales. + # + # Use factory methods {.simple_maximum} or {.autoscaling} to create instances. + class PollerBehavior + # Creates a simple maximum poller behavior + # The poller will attempt to poll as long as a slot is available, up to the + # provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. + # + # @param maximum [Integer] Maximum number of concurrent poll requests. + # @return [SimpleMaximumPollerBehavior] A simple maximum poller behavior + def self.simple_maximum(maximum = 5) + SimpleMaximumPollerBehavior.new(maximum: maximum) + end + + # Creates an autoscaling poller behavior + # The poller will automatically scale the number of pollers based on feedback + # from the server. A slot must be available before beginning polling. + # + # @param minimum [Integer] Minimum number of poll calls (assuming slots are available). + # @param maximum [Integer] Maximum number of poll calls that will ever be open at once. + # @param initial [Integer] Number of polls attempted initially before scaling kicks in. + # @return [AutoscalingPollerBehavior] An autoscaling poller behavior + def self.autoscaling(minimum: 1, maximum: 100, initial: 5) + AutoscalingPollerBehavior.new(minimum: minimum, maximum: maximum, initial: initial) + end + + # @!visibility private + def _to_bridge_options + raise NotImplementedError, 'Subclasses must implement this method' + end + end + + # A poller behavior that attempts to poll as long as a slot is available, up to the + # provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. + class SimpleMaximumPollerBehavior < PollerBehavior + # @return [Integer] Maximum number of concurrent poll requests. + attr_reader :maximum + + # @param maximum [Integer] Maximum number of concurrent poll requests. + def initialize(maximum: 5) + super() + @maximum = maximum + end + + # @!visibility private + def _to_bridge_options + Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: maximum) + end + end + + # A poller behavior that automatically scales the number of pollers based on feedback + # from the server. A slot must be available before beginning polling. + class AutoscalingPollerBehavior < PollerBehavior + # @return [Integer] Minimum number of poll calls (assuming slots are available). + attr_reader :minimum + # @return [Integer] Maximum number of poll calls that will ever be open at once. + attr_reader :maximum + # @return [Integer] Number of polls attempted initially before scaling kicks in. + attr_reader :initial + + # @param minimum [Integer] Minimum number of poll calls (assuming slots are available). + # @param maximum [Integer] Maximum number of poll calls that will ever be open at once. + # @param initial [Integer] Number of polls attempted initially before scaling kicks in. + def initialize(minimum: 1, maximum: 100, initial: 5) + super() + @minimum = minimum + @maximum = maximum + @initial = initial + end + + # @!visibility private + def _to_bridge_options + Internal::Bridge::Worker::PollerBehaviorAutoscaling.new( + minimum: minimum, + maximum: maximum, + initial: initial + ) + end + end + end +end diff --git a/temporalio/lib/temporalio/worker/workflow_replayer.rb b/temporalio/lib/temporalio/worker/workflow_replayer.rb index 214c6bfc..f8f52566 100644 --- a/temporalio/lib/temporalio/worker/workflow_replayer.rb +++ b/temporalio/lib/temporalio/worker/workflow_replayer.rb @@ -7,6 +7,7 @@ require 'temporalio/internal/worker/multi_runner' require 'temporalio/internal/worker/workflow_worker' require 'temporalio/worker/interceptor' +require 'temporalio/worker/poller_behavior' require 'temporalio/worker/thread_pool' require 'temporalio/worker/tuner' require 'temporalio/worker/workflow_executor' @@ -205,9 +206,9 @@ def initialize( )._to_bridge_options, identity_override: options.identity, max_cached_workflows: 2, - max_concurrent_workflow_task_polls: 2, + workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(2)._to_bridge_options, nonsticky_to_sticky_poll_ratio: 1.0, - max_concurrent_activity_task_polls: 1, + activity_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(1)._to_bridge_options, no_remote_activities: true, sticky_queue_schedule_to_start_timeout: 1.0, max_heartbeat_throttle_interval: 1.0, diff --git a/temporalio/sig/temporalio/internal/bridge/worker.rbs b/temporalio/sig/temporalio/internal/bridge/worker.rbs index 4ca61974..747b0f00 100644 --- a/temporalio/sig/temporalio/internal/bridge/worker.rbs +++ b/temporalio/sig/temporalio/internal/bridge/worker.rbs @@ -10,9 +10,9 @@ module Temporalio attr_accessor tuner: TunerOptions attr_accessor identity_override: String? attr_accessor max_cached_workflows: Integer - attr_accessor max_concurrent_workflow_task_polls: Integer + attr_accessor workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling attr_accessor nonsticky_to_sticky_poll_ratio: Float - attr_accessor max_concurrent_activity_task_polls: Integer + attr_accessor activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling attr_accessor no_remote_activities: bool attr_accessor sticky_queue_schedule_to_start_timeout: Float attr_accessor max_heartbeat_throttle_interval: Float @@ -32,9 +32,9 @@ module Temporalio tuner: TunerOptions, identity_override: String?, max_cached_workflows: Integer, - max_concurrent_workflow_task_polls: Integer, + workflow_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling, nonsticky_to_sticky_poll_ratio: Float, - max_concurrent_activity_task_polls: Integer, + activity_task_poller_behavior: PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling, no_remote_activities: bool, sticky_queue_schedule_to_start_timeout: Float, max_heartbeat_throttle_interval: Float, @@ -142,6 +142,20 @@ module Temporalio ) -> void end + class PollerBehaviorSimpleMaximum + attr_accessor simple_maximum: Integer + + def initialize: (simple_maximum: Integer) -> void + end + + class PollerBehaviorAutoscaling + attr_accessor minimum: Integer + attr_accessor maximum: Integer + attr_accessor initial: Integer + + def initialize: (minimum: Integer, maximum: Integer, initial: Integer) -> void + end + class WorkflowReplayer def self.new: (Runtime runtime, Options options) -> [WorkflowReplayer, Worker] diff --git a/temporalio/sig/temporalio/worker.rbs b/temporalio/sig/temporalio/worker.rbs index 93376ef2..c2191251 100644 --- a/temporalio/sig/temporalio/worker.rbs +++ b/temporalio/sig/temporalio/worker.rbs @@ -1,5 +1,6 @@ module Temporalio class Worker + class Options attr_reader client: Client attr_reader task_queue: String @@ -27,6 +28,8 @@ module Temporalio attr_reader workflow_failure_exception_types: Array[singleton(Exception)] attr_reader workflow_payload_codec_thread_pool: ThreadPool? attr_reader unsafe_workflow_io_enabled: bool + attr_reader workflow_task_poller_behavior: PollerBehavior + attr_reader activity_task_poller_behavior: PollerBehavior attr_reader debug_mode: bool def initialize: ( @@ -56,6 +59,8 @@ module Temporalio workflow_failure_exception_types: Array[singleton(Exception)], workflow_payload_codec_thread_pool: ThreadPool?, unsafe_workflow_io_enabled: bool, + workflow_task_poller_behavior: PollerBehavior, + activity_task_poller_behavior: PollerBehavior, deployment_options: Worker::DeploymentOptions, debug_mode: bool ) -> void @@ -106,6 +111,8 @@ module Temporalio ?workflow_failure_exception_types: Array[singleton(Exception)], ?workflow_payload_codec_thread_pool: ThreadPool?, ?unsafe_workflow_io_enabled: bool, + ?workflow_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior, + ?activity_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior, ?deployment_options: Worker::DeploymentOptions, ?debug_mode: bool ) -> void diff --git a/temporalio/sig/temporalio/worker/poller_behavior.rbs b/temporalio/sig/temporalio/worker/poller_behavior.rbs new file mode 100644 index 00000000..cbd7ca34 --- /dev/null +++ b/temporalio/sig/temporalio/worker/poller_behavior.rbs @@ -0,0 +1,25 @@ +module Temporalio + class Worker + class PollerBehavior + def self.simple_maximum: (Integer) -> SimpleMaximumPollerBehavior + def self.autoscaling: (?minimum: Integer, ?maximum: Integer, ?initial: Integer) -> AutoscalingPollerBehavior + def _to_bridge_options: -> untyped + end + + class SimpleMaximumPollerBehavior < PollerBehavior + attr_reader maximum: Integer + + def initialize: (?maximum: Integer) -> void + def _to_bridge_options: -> Internal::Bridge::Worker::PollerBehaviorSimpleMaximum + end + + class AutoscalingPollerBehavior < PollerBehavior + attr_reader minimum: Integer + attr_reader maximum: Integer + attr_reader initial: Integer + + def initialize: (?minimum: Integer, ?maximum: Integer, ?initial: Integer) -> void + def _to_bridge_options: -> Internal::Bridge::Worker::PollerBehaviorAutoscaling + end + end +end diff --git a/temporalio/test/worker_test.rb b/temporalio/test/worker_test.rb index 7b4e7984..919eaaaf 100644 --- a/temporalio/test/worker_test.rb +++ b/temporalio/test/worker_test.rb @@ -160,4 +160,84 @@ def test_can_run_with_composite_tuner end end end + + class WaitOnSignalWorkflow < Temporalio::Workflow::Definition + def execute + Temporalio::Workflow.wait_condition { @complete } + Temporalio::Workflow.execute_activity( + SimpleActivity, + 'dogg', + start_to_close_timeout: 10, + retry_policy: Temporalio::RetryPolicy.new(max_attempts: 1) + ) + end + + workflow_signal + def complete(value) + @complete = value + end + end + + def test_can_run_with_autoscaling_poller_behavior + prom_addr = "127.0.0.1:#{find_free_port}" + runtime = Temporalio::Runtime.new( + telemetry: Temporalio::Runtime::TelemetryOptions.new( + metrics: Temporalio::Runtime::MetricsOptions.new( + prometheus: Temporalio::Runtime::PrometheusMetricsOptions.new( + bind_address: prom_addr + ) + ) + ) + ) + conn_opts = env.client.connection.options.with(runtime:) + client_opts = env.client.options.with( + connection: Temporalio::Client::Connection.new(**conn_opts.to_h) # steep:ignore + ) + client = Temporalio::Client.new(**client_opts.to_h) # steep:ignore + worker = Temporalio::Worker.new( + client: client, + task_queue: "tq-#{SecureRandom.uuid}", + workflows: [WaitOnSignalWorkflow], + activities: [SimpleActivity], + workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior.autoscaling( + initial: 2 + ), + activity_task_poller_behavior: Temporalio::Worker::PollerBehavior.autoscaling( + initial: 2 + ) + ) + worker.run do + # Give pollers a beat to get started + sleep(0.3) + + dump = Net::HTTP.get(URI("http://#{prom_addr}/metrics")) + lines = dump.split("\n") + + matches = lines.select { |l| l.include?('temporal_num_pollers') } + activity_pollers = matches.select { |l| l.include?('activity_task') } + assert_equal 1, activity_pollers.size + assert activity_pollers[0].end_with?('2') + + workflow_pollers = matches.select { |l| l.include?('workflow_task') } + assert_equal 2, workflow_pollers.size + # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on + # initialization timing. + assert(workflow_pollers[0].end_with?('2') || workflow_pollers[0].end_with?('1')) + assert(workflow_pollers[1].end_with?('2') || workflow_pollers[1].end_with?('1')) + + handles = Array.new(20) do + env.client.start_workflow( + WaitOnSignalWorkflow, + id: "wf-#{SecureRandom.uuid}", + task_queue: worker.task_queue + ) + end + handles.each do |handle| + handle.signal(:complete, true) + end + handles.each do |handle| # rubocop:disable Style/CombinableLoops + assert_equal 'Hello, dogg!', handle.result + end + end + end end From caab471d0613ead8ebe925f3d4c8806ae436946a Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 29 May 2025 11:05:29 -0700 Subject: [PATCH 3/5] Review comments --- temporalio/ext/src/worker.rs | 14 +-- temporalio/lib/temporalio/worker.rb | 20 ++-- .../lib/temporalio/worker/poller_behavior.rb | 112 +++++++----------- .../temporalio/worker/workflow_replayer.rb | 6 +- temporalio/sig/temporalio/worker.rbs | 4 +- .../sig/temporalio/worker/poller_behavior.rbs | 6 +- temporalio/test/worker_test.rb | 4 +- 7 files changed, 71 insertions(+), 95 deletions(-) diff --git a/temporalio/ext/src/worker.rs b/temporalio/ext/src/worker.rs index 0b4ea504..9b05e17a 100644 --- a/temporalio/ext/src/worker.rs +++ b/temporalio/ext/src/worker.rs @@ -6,23 +6,23 @@ use std::{ }; use crate::{ - ROOT_MOD, client::Client, enter_sync, error, id, new_error, runtime::{AsyncCommand, Runtime, RuntimeHandle}, util::{AsyncCallback, Struct}, + ROOT_MOD, }; -use futures::{StreamExt, stream::BoxStream}; use futures::{future, stream}; +use futures::{stream::BoxStream, StreamExt}; use magnus::{ - DataTypeFunctions, Error, IntoValue, RArray, RString, RTypedData, Ruby, TypedData, Value, - class, function, method, prelude::*, typed_data, + class, function, method, prelude::*, typed_data, DataTypeFunctions, Error, IntoValue, RArray, + RString, RTypedData, Ruby, TypedData, Value, }; use prost::Message; use temporal_sdk_core::{ + replay::{HistoryForReplay, ReplayWorkerInput}, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions, SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfig, WorkerConfigBuilder, - replay::{HistoryForReplay, ReplayWorkerInput}, }; use temporal_sdk_core_api::{ errors::{PollError, WorkflowErrorType}, @@ -34,7 +34,7 @@ use temporal_sdk_core_api::{ use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion}; use temporal_sdk_core_protos::temporal::api::history::v1::History; -use tokio::sync::mpsc::{Sender, channel}; +use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; pub fn init(ruby: &Ruby) -> Result<(), Error> { @@ -620,6 +620,6 @@ fn extract_poller_behavior(poller_behavior: Struct) -> Result(id!("initial"))?, } } else { - PollerBehavior::SimpleMaximum(poller_behavior.member::(id!("maximum"))?) + PollerBehavior::SimpleMaximum(poller_behavior.member::(id!("simple_maximum"))?) }) } diff --git a/temporalio/lib/temporalio/worker.rb b/temporalio/lib/temporalio/worker.rb index 12b106c0..7bf45864 100644 --- a/temporalio/lib/temporalio/worker.rb +++ b/temporalio/lib/temporalio/worker.rb @@ -53,9 +53,9 @@ class Worker :workflow_payload_codec_thread_pool, :unsafe_workflow_io_enabled, :deployment_options, - :debug_mode, :workflow_task_poller_behavior, - :activity_task_poller_behavior + :activity_task_poller_behavior, + :debug_mode ) # Options as returned from {options} for `**to_h` splat use in {initialize}. See {initialize} for details. @@ -357,6 +357,10 @@ def self.default_illegal_workflow_calls # with a block for narrower enabling of IO. # @param deployment_options [DeploymentOptions, nil] Deployment options for the worker. # WARNING: This is an experimental feature and may change in the future. + # @param workflow_task_poller_behavior [PollerBehavior] Specify the behavior of workflow task + # polling. Defaults to a 5-poller maximum. + # @param activity_task_poller_behavior [PollerBehavior] Specify the behavior of activity task + # polling. Defaults to a 5-poller maximum. # @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks # if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is # `true` or `1`. @@ -388,8 +392,8 @@ def initialize( workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, - workflow_task_poller_behavior: nil, - activity_task_poller_behavior: nil, + workflow_task_poller_behavior: SimpleMaximumPollerBehavior.new(maximum: max_concurrent_workflow_task_polls), + activity_task_poller_behavior: SimpleMaximumPollerBehavior.new(maximum: max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase) ) raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty? @@ -424,9 +428,9 @@ def initialize( workflow_payload_codec_thread_pool:, unsafe_workflow_io_enabled:, deployment_options:, - debug_mode:, workflow_task_poller_behavior:, - activity_task_poller_behavior: + activity_task_poller_behavior:, + debug_mode: ).freeze should_enforce_versioning_behavior = @@ -442,10 +446,6 @@ def initialize( workflow_failure_exception_types:, workflow_definitions: ) - # Convert deprecated max concurrent polls to poller behaviors if not specified - workflow_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_workflow_task_polls) - activity_task_poller_behavior ||= SimpleMaximumPollerBehavior.new(maximum: max_concurrent_activity_task_polls) - # Create the bridge worker @bridge_worker = Internal::Bridge::Worker.new( client.connection._core_client, diff --git a/temporalio/lib/temporalio/worker/poller_behavior.rb b/temporalio/lib/temporalio/worker/poller_behavior.rb index 1155b5e4..bba013b2 100644 --- a/temporalio/lib/temporalio/worker/poller_behavior.rb +++ b/temporalio/lib/temporalio/worker/poller_behavior.rb @@ -3,82 +3,58 @@ module Temporalio class Worker # Base class for poller behaviors that control how polling scales. - # - # Use factory methods {.simple_maximum} or {.autoscaling} to create instances. class PollerBehavior - # Creates a simple maximum poller behavior - # The poller will attempt to poll as long as a slot is available, up to the - # provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. - # - # @param maximum [Integer] Maximum number of concurrent poll requests. - # @return [SimpleMaximumPollerBehavior] A simple maximum poller behavior - def self.simple_maximum(maximum = 5) - SimpleMaximumPollerBehavior.new(maximum: maximum) - end - - # Creates an autoscaling poller behavior - # The poller will automatically scale the number of pollers based on feedback - # from the server. A slot must be available before beginning polling. - # - # @param minimum [Integer] Minimum number of poll calls (assuming slots are available). - # @param maximum [Integer] Maximum number of poll calls that will ever be open at once. - # @param initial [Integer] Number of polls attempted initially before scaling kicks in. - # @return [AutoscalingPollerBehavior] An autoscaling poller behavior - def self.autoscaling(minimum: 1, maximum: 100, initial: 5) - AutoscalingPollerBehavior.new(minimum: minimum, maximum: maximum, initial: initial) - end - # @!visibility private def _to_bridge_options raise NotImplementedError, 'Subclasses must implement this method' end - end - - # A poller behavior that attempts to poll as long as a slot is available, up to the - # provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. - class SimpleMaximumPollerBehavior < PollerBehavior - # @return [Integer] Maximum number of concurrent poll requests. - attr_reader :maximum - - # @param maximum [Integer] Maximum number of concurrent poll requests. - def initialize(maximum: 5) - super() - @maximum = maximum - end - # @!visibility private - def _to_bridge_options - Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: maximum) - end - end - - # A poller behavior that automatically scales the number of pollers based on feedback - # from the server. A slot must be available before beginning polling. - class AutoscalingPollerBehavior < PollerBehavior - # @return [Integer] Minimum number of poll calls (assuming slots are available). - attr_reader :minimum - # @return [Integer] Maximum number of poll calls that will ever be open at once. - attr_reader :maximum - # @return [Integer] Number of polls attempted initially before scaling kicks in. - attr_reader :initial - - # @param minimum [Integer] Minimum number of poll calls (assuming slots are available). - # @param maximum [Integer] Maximum number of poll calls that will ever be open at once. - # @param initial [Integer] Number of polls attempted initially before scaling kicks in. - def initialize(minimum: 1, maximum: 100, initial: 5) - super() - @minimum = minimum - @maximum = maximum - @initial = initial + # A poller behavior that attempts to poll as long as a slot is available, up to the + # provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. + class SimpleMaximum < PollerBehavior + # @return [Integer] Maximum number of concurrent poll requests. + attr_reader :maximum + + # @param maximum [Integer] Maximum number of concurrent poll requests. + def initialize(maximum: 5) + super() + @maximum = maximum + end + + # @!visibility private + def _to_bridge_options + Internal::Bridge::Worker::PollerBehaviorSimpleMaximum.new(simple_maximum: @maximum) + end end - # @!visibility private - def _to_bridge_options - Internal::Bridge::Worker::PollerBehaviorAutoscaling.new( - minimum: minimum, - maximum: maximum, - initial: initial - ) + # A poller behavior that automatically scales the number of pollers based on feedback + # from the server. A slot must be available before beginning polling. + class Autoscaling < PollerBehavior + # @return [Integer] Minimum number of poll calls (assuming slots are available). + attr_reader :minimum + # @return [Integer] Maximum number of poll calls that will ever be open at once. + attr_reader :maximum + # @return [Integer] Number of polls attempted initially before scaling kicks in. + attr_reader :initial + + # @param minimum [Integer] Minimum number of poll calls (assuming slots are available). + # @param maximum [Integer] Maximum number of poll calls that will ever be open at once. + # @param initial [Integer] Number of polls attempted initially before scaling kicks in. + def initialize(minimum: 1, maximum: 100, initial: 5) + super() + @minimum = minimum + @maximum = maximum + @initial = initial + end + + # @!visibility private + def _to_bridge_options + Internal::Bridge::Worker::PollerBehaviorAutoscaling.new( + minimum: @minimum, + maximum: @maximum, + initial: @initial + ) + end end end end diff --git a/temporalio/lib/temporalio/worker/workflow_replayer.rb b/temporalio/lib/temporalio/worker/workflow_replayer.rb index f8f52566..8560243c 100644 --- a/temporalio/lib/temporalio/worker/workflow_replayer.rb +++ b/temporalio/lib/temporalio/worker/workflow_replayer.rb @@ -206,9 +206,11 @@ def initialize( )._to_bridge_options, identity_override: options.identity, max_cached_workflows: 2, - workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(2)._to_bridge_options, + workflow_task_poller_behavior: + Temporalio::Worker::PollerBehavior::SimpleMaximum.new(2)._to_bridge_options, nonsticky_to_sticky_poll_ratio: 1.0, - activity_task_poller_behavior: Temporalio::Worker::PollerBehavior.simple_maximum(1)._to_bridge_options, + activity_task_poller_behavior: + Temporalio::Worker::PollerBehavior::SimpleMaximum.new(1)._to_bridge_options, no_remote_activities: true, sticky_queue_schedule_to_start_timeout: 1.0, max_heartbeat_throttle_interval: 1.0, diff --git a/temporalio/sig/temporalio/worker.rbs b/temporalio/sig/temporalio/worker.rbs index c2191251..1c877fb9 100644 --- a/temporalio/sig/temporalio/worker.rbs +++ b/temporalio/sig/temporalio/worker.rbs @@ -111,8 +111,8 @@ module Temporalio ?workflow_failure_exception_types: Array[singleton(Exception)], ?workflow_payload_codec_thread_pool: ThreadPool?, ?unsafe_workflow_io_enabled: bool, - ?workflow_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior, - ?activity_task_poller_behavior: SimpleMaximumPollerBehavior | AutoscalingPollerBehavior, + ?workflow_task_poller_behavior: PollerBehavior, + ?activity_task_poller_behavior: PollerBehavior, ?deployment_options: Worker::DeploymentOptions, ?debug_mode: bool ) -> void diff --git a/temporalio/sig/temporalio/worker/poller_behavior.rbs b/temporalio/sig/temporalio/worker/poller_behavior.rbs index cbd7ca34..82e0b304 100644 --- a/temporalio/sig/temporalio/worker/poller_behavior.rbs +++ b/temporalio/sig/temporalio/worker/poller_behavior.rbs @@ -1,19 +1,17 @@ module Temporalio class Worker class PollerBehavior - def self.simple_maximum: (Integer) -> SimpleMaximumPollerBehavior - def self.autoscaling: (?minimum: Integer, ?maximum: Integer, ?initial: Integer) -> AutoscalingPollerBehavior def _to_bridge_options: -> untyped end - class SimpleMaximumPollerBehavior < PollerBehavior + class SimpleMaximum < PollerBehavior attr_reader maximum: Integer def initialize: (?maximum: Integer) -> void def _to_bridge_options: -> Internal::Bridge::Worker::PollerBehaviorSimpleMaximum end - class AutoscalingPollerBehavior < PollerBehavior + class Autoscaling < PollerBehavior attr_reader minimum: Integer attr_reader maximum: Integer attr_reader initial: Integer diff --git a/temporalio/test/worker_test.rb b/temporalio/test/worker_test.rb index 919eaaaf..88b82dcf 100644 --- a/temporalio/test/worker_test.rb +++ b/temporalio/test/worker_test.rb @@ -199,10 +199,10 @@ def test_can_run_with_autoscaling_poller_behavior task_queue: "tq-#{SecureRandom.uuid}", workflows: [WaitOnSignalWorkflow], activities: [SimpleActivity], - workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior.autoscaling( + workflow_task_poller_behavior: Temporalio::Worker::PollerBehavior::Autoscaling.new( initial: 2 ), - activity_task_poller_behavior: Temporalio::Worker::PollerBehavior.autoscaling( + activity_task_poller_behavior: Temporalio::Worker::PollerBehavior::Autoscaling.new( initial: 2 ) ) From 41bf2743005716f9c90c2c2206a61bc743da0d76 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 29 May 2025 14:20:33 -0700 Subject: [PATCH 4/5] Make versioning overrides structure nicer --- .../lib/temporalio/versioning_override.rb | 87 ++++++++----------- .../sig/temporalio/versioning_override.rbs | 7 +- .../test/worker_workflow_versioning_test.rb | 2 +- 3 files changed, 37 insertions(+), 59 deletions(-) diff --git a/temporalio/lib/temporalio/versioning_override.rb b/temporalio/lib/temporalio/versioning_override.rb index 430c9d43..719cad8e 100644 --- a/temporalio/lib/temporalio/versioning_override.rb +++ b/temporalio/lib/temporalio/versioning_override.rb @@ -6,69 +6,50 @@ module Temporalio # Base class for version overrides that can be provided in start workflow options. # Used to control the versioning behavior of workflows started with this override. # - # Use factory methods {.auto_upgrade} or {.pinned} to create instances. - # # WARNING: Experimental API. class VersioningOverride - # Creates an auto-upgrade versioning override - # The workflow will auto-upgrade to the current deployment version on the next workflow task. - # - # @return [AutoUpgradeVersioningOverride] An auto-upgrade versioning override - def self.auto_upgrade - AutoUpgradeVersioningOverride.new - end - - # Creates a pinned versioning override - # The workflow will be pinned to a specific deployment version. - # - # @param version [WorkerDeploymentVersion] The worker deployment version to pin the workflow to - # @return [PinnedVersioningOverride] A pinned versioning override - def self.pinned(version) - PinnedVersioningOverride.new(version) - end - # @!visibility private def _to_proto raise NotImplementedError, 'Subclasses must implement this method' end - end - - # Represents a versioning override to pin a workflow to a specific version - class PinnedVersioningOverride < VersioningOverride - # The worker deployment version to pin to - # @return [WorkerDeploymentVersion] - attr_reader :version - - # Create a new pinned versioning override - # - # @param version [WorkerDeploymentVersion] The worker deployment version to pin to - def initialize(version) - @version = version - super() - end - - # TODO: Remove deprecated field setting once removed from server - # @!visibility private - def _to_proto - Temporalio::Api::Workflow::V1::VersioningOverride.new( - behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED, - pinned_version: @version.to_canonical_string, - pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new( - version: @version._to_proto + # Represents a versioning override to pin a workflow to a specific version + class Pinned < VersioningOverride + # The worker deployment version to pin to + # @return [WorkerDeploymentVersion] + attr_reader :version + + # Create a new pinned versioning override + # + # @param version [WorkerDeploymentVersion] The worker deployment version to pin to + def initialize(version) + @version = version + super() + end + + # TODO: Remove deprecated field setting once removed from server + + # @!visibility private + def _to_proto + Temporalio::Api::Workflow::V1::VersioningOverride.new( + behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED, + pinned_version: @version.to_canonical_string, + pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new( + version: @version._to_proto + ) ) - ) + end end - end - # Represents a versioning override to auto-upgrade a workflow - class AutoUpgradeVersioningOverride < VersioningOverride - # @!visibility private - def _to_proto - Temporalio::Api::Workflow::V1::VersioningOverride.new( - behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE, - auto_upgrade: true - ) + # Represents a versioning override to auto-upgrade a workflow + class AutoUpgrade < VersioningOverride + # @!visibility private + def _to_proto + Temporalio::Api::Workflow::V1::VersioningOverride.new( + behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE, + auto_upgrade: true + ) + end end end end diff --git a/temporalio/sig/temporalio/versioning_override.rbs b/temporalio/sig/temporalio/versioning_override.rbs index 8f91f393..2620db78 100644 --- a/temporalio/sig/temporalio/versioning_override.rbs +++ b/temporalio/sig/temporalio/versioning_override.rbs @@ -1,19 +1,16 @@ module Temporalio class VersioningOverride - def self.auto_upgrade: -> AutoUpgradeVersioningOverride - def self.pinned: (WorkerDeploymentVersion version) -> PinnedVersioningOverride - def _to_proto: -> untyped end - class PinnedVersioningOverride < VersioningOverride + class Pinned < VersioningOverride attr_reader version: WorkerDeploymentVersion def initialize: (WorkerDeploymentVersion version) -> void def _to_proto: -> untyped end - class AutoUpgradeVersioningOverride < VersioningOverride + class AutoUpgrade < VersioningOverride def initialize: -> void def _to_proto: -> untyped end diff --git a/temporalio/test/worker_workflow_versioning_test.rb b/temporalio/test/worker_workflow_versioning_test.rb index d7520668..1640cf31 100644 --- a/temporalio/test/worker_workflow_versioning_test.rb +++ b/temporalio/test/worker_workflow_versioning_test.rb @@ -552,7 +552,7 @@ def test_workflows_can_use_versioning_override DeploymentVersioningWorkflowV1AutoUpgrade, id: "override-versioning-#{SecureRandom.uuid}", task_queue: task_queue, - versioning_override: Temporalio::VersioningOverride.pinned(worker_v1) + versioning_override: Temporalio::VersioningOverride::Pinned.new(worker_v1) ) # Send signal to finish From 9003ad211555b4f6a6dd28b73e0226133fe923b4 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 29 May 2025 14:29:18 -0700 Subject: [PATCH 5/5] Last review comments --- .../lib/temporalio/client/interceptor.rb | 1 - temporalio/lib/temporalio/worker.rb | 4 +-- .../lib/temporalio/worker/poller_behavior.rb | 2 +- .../sig/temporalio/versioning_override.rbs | 18 +++++----- .../sig/temporalio/worker/poller_behavior.rbs | 2 +- temporalio/test/worker_test.rb | 33 +++++++++---------- 6 files changed, 29 insertions(+), 31 deletions(-) diff --git a/temporalio/lib/temporalio/client/interceptor.rb b/temporalio/lib/temporalio/client/interceptor.rb index cab459db..0ed9bdad 100644 --- a/temporalio/lib/temporalio/client/interceptor.rb +++ b/temporalio/lib/temporalio/client/interceptor.rb @@ -37,7 +37,6 @@ def intercept_client(next_interceptor) :start_delay, :request_eager_start, :headers, - :priority, :versioning_override, :priority, :rpc_options diff --git a/temporalio/lib/temporalio/worker.rb b/temporalio/lib/temporalio/worker.rb index 7bf45864..3e42f31d 100644 --- a/temporalio/lib/temporalio/worker.rb +++ b/temporalio/lib/temporalio/worker.rb @@ -392,8 +392,8 @@ def initialize( workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, - workflow_task_poller_behavior: SimpleMaximumPollerBehavior.new(maximum: max_concurrent_workflow_task_polls), - activity_task_poller_behavior: SimpleMaximumPollerBehavior.new(maximum: max_concurrent_activity_task_polls), + workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls), + activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase) ) raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty? diff --git a/temporalio/lib/temporalio/worker/poller_behavior.rb b/temporalio/lib/temporalio/worker/poller_behavior.rb index bba013b2..e4068ef2 100644 --- a/temporalio/lib/temporalio/worker/poller_behavior.rb +++ b/temporalio/lib/temporalio/worker/poller_behavior.rb @@ -16,7 +16,7 @@ class SimpleMaximum < PollerBehavior attr_reader :maximum # @param maximum [Integer] Maximum number of concurrent poll requests. - def initialize(maximum: 5) + def initialize(maximum) super() @maximum = maximum end diff --git a/temporalio/sig/temporalio/versioning_override.rbs b/temporalio/sig/temporalio/versioning_override.rbs index 2620db78..c4b13adb 100644 --- a/temporalio/sig/temporalio/versioning_override.rbs +++ b/temporalio/sig/temporalio/versioning_override.rbs @@ -1,17 +1,17 @@ module Temporalio class VersioningOverride def _to_proto: -> untyped - end - class Pinned < VersioningOverride - attr_reader version: WorkerDeploymentVersion + class Pinned < VersioningOverride + attr_reader version: WorkerDeploymentVersion - def initialize: (WorkerDeploymentVersion version) -> void - def _to_proto: -> untyped - end + def initialize: (WorkerDeploymentVersion version) -> void + def _to_proto: -> untyped + end - class AutoUpgrade < VersioningOverride - def initialize: -> void - def _to_proto: -> untyped + class AutoUpgrade < VersioningOverride + def initialize: -> void + def _to_proto: -> untyped + end end end diff --git a/temporalio/sig/temporalio/worker/poller_behavior.rbs b/temporalio/sig/temporalio/worker/poller_behavior.rbs index 82e0b304..b46ef35f 100644 --- a/temporalio/sig/temporalio/worker/poller_behavior.rbs +++ b/temporalio/sig/temporalio/worker/poller_behavior.rbs @@ -7,7 +7,7 @@ module Temporalio class SimpleMaximum < PollerBehavior attr_reader maximum: Integer - def initialize: (?maximum: Integer) -> void + def initialize: (Integer) -> void def _to_bridge_options: -> Internal::Bridge::Worker::PollerBehaviorSimpleMaximum end diff --git a/temporalio/test/worker_test.rb b/temporalio/test/worker_test.rb index 88b82dcf..1207ae36 100644 --- a/temporalio/test/worker_test.rb +++ b/temporalio/test/worker_test.rb @@ -207,23 +207,22 @@ def test_can_run_with_autoscaling_poller_behavior ) ) worker.run do - # Give pollers a beat to get started - sleep(0.3) - - dump = Net::HTTP.get(URI("http://#{prom_addr}/metrics")) - lines = dump.split("\n") - - matches = lines.select { |l| l.include?('temporal_num_pollers') } - activity_pollers = matches.select { |l| l.include?('activity_task') } - assert_equal 1, activity_pollers.size - assert activity_pollers[0].end_with?('2') - - workflow_pollers = matches.select { |l| l.include?('workflow_task') } - assert_equal 2, workflow_pollers.size - # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on - # initialization timing. - assert(workflow_pollers[0].end_with?('2') || workflow_pollers[0].end_with?('1')) - assert(workflow_pollers[1].end_with?('2') || workflow_pollers[1].end_with?('1')) + assert_eventually do + dump = Net::HTTP.get(URI("http://#{prom_addr}/metrics")) + lines = dump.split("\n") + + matches = lines.select { |l| l.include?('temporal_num_pollers') } + activity_pollers = matches.select { |l| l.include?('activity_task') } + assert_equal 1, activity_pollers.size + assert activity_pollers[0].end_with?('2') + + workflow_pollers = matches.select { |l| l.include?('workflow_task') } + assert_equal 2, workflow_pollers.size + # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on + # initialization timing. + assert(workflow_pollers[0].end_with?('2') || workflow_pollers[0].end_with?('1')) + assert(workflow_pollers[1].end_with?('2') || workflow_pollers[1].end_with?('1')) + end handles = Array.new(20) do env.client.start_workflow(