The following is an example activity worker written in Ruby. This provides an implementation based on best practices that can be used as a reference for your activity worker. The code implements a consumer-producer pattern with a configurable number of threads for pollers and activity workers. The poller threads are constantly long polling the activity task. Once an activity task is retrieved, it is passed through a bounded blocking queue for the activity thread to pick it up.
-
For more information on the AWS SDK for Ruby, see the AWS SDK for Ruby API Reference.
-
To download this code and related resources, see step-functions-ruby-activity-worker on GitHub.com.
The following Roby code is the main entry point and example usage for the Ruby activity worker that follows.
require_relative '../lib/step_functions/activity'
credentials = Aws::SharedCredentials.new
region = 'us-west-2'
activity_arn = 'ACTIVITY_ARN'
activity = StepFunctions::Activity.new(
credentials: credentials,
region: region,
activity_arn: activity_arn,
workers_count: 1,
pollers_count: 1,
heartbeat_delay: 30
)
# The start method takes as argument the block that is the actual logic of your custom activity.
activity.start do |input|
{ result: :SUCCESS, echo: input['value'] }
The above code includes defaults you can change to reference your activity, and to adapt it to your specific implementation. This code takes as input the actual implementation logic, allows you to reference your specific activity and credentials, and allows you to configure the number of threads and heartbeat delay. For more information and to download the code, see Step Functions Ruby Activity Worker.
Item | Description |
---|---|
require_relative |
Relative path to the activity worker code below. |
region |
AWS region of your activity. |
workers_count |
The number of threads for your activity worker. For most implementations, between 10 and 20 threads should be sufficient. The longer the activity takes to process, the more threads it may need. As an estimate, multiply the number of process activities per second by the 99th percentile activity processing latency, in seconds. |
pollers_count |
The number of threads for your pollers. Between 10 and 20 threads should be sufficient for most implementations. |
heartbeat_delay |
The delay in seconds between heartbeats. |
input | Implementation logic of your activity. |
The following is the Ruby activity worker used by the example above and referenced with ../lib/step_functions/activity
.
require 'set'
require 'json'
require 'thread'
require 'logger'
require 'aws-sdk'
module Validate
def self.positive(value)
raise ArgumentError, 'Argument has to positive' if value <= 0
value
end
def self.required(value)
raise ArgumentError, 'Argument is required' if value.nil?
value
end
end
module StepFunctions
class RetryError < StandardError
def initialize(message)
super(message)
end
end
def self.with_retries(options = {}, &block)
retries = 0
base_delay_seconds = options[:base_delay_seconds] || 2
max_retries = options[:max_retries] || 3
begin
block.call
rescue => e
puts e
if retries < max_retries
retries += 1
sleep base_delay_seconds**retries
retry
end
raise RetryError, 'All retries of operation had failed'
end
end
class Activity
def initialize(options = {})
@states = Aws::States::Client.new(
credentials: Validate.required(options[:credentials]),
region: Validate.required(options[:region]),
http_read_timeout: Validate.positive(options[:http_read_timeout] || 60)
)
@activity_arn = Validate.required(options[:activity_arn])
@heartbeat_delay = Validate.positive(options[:heartbeat_delay] || 60)
@queue_max = Validate.positive(options[:queue_max] || 5)
@pollers_count = Validate.positive(options[:pollers_count] || 1)
@workers_count = Validate.positive(options[:workers_count] || 1)
@max_retry = Validate.positive(options[:workers_count] || 3)
@logger = Logger.new(STDOUT)
end
def start(&block)
@sink = SizedQueue.new(@queue_max)
@activities = Set.new
start_heartbeat_worker(@activities)
start_workers(@activities, block, @sink)
start_pollers(@activities, @sink)
wait
end
def queue_size
return 0 if @sink.nil?
@sink.size
end
def activities_count
return 0 if @activities.nil?
@activities.size
end
private
def start_pollers(activities, sink)
@pollers = Array.new(@pollers_count) do
PollerWorker.new(
states: @states,
activity_arn: @activity_arn,
sink: sink,
activities: activities,
max_retry: @max_retry
)
end
@pollers.each(&:start)
end
def start_workers(activities, block, sink)
@workers = Array.new(@workers_count) do
ActivityWorker.new(
states: @states,
block: block,
sink: sink,
activities: activities,
max_retry: @max_retry
)
end
@workers.each(&:start)
end
def start_heartbeat_worker(activities)
@heartbeat_worker = HeartbeatWorker.new(
states: @states,
activities: activities,
heartbeat_delay: @heartbeat_delay,
max_retry: @max_retry
)
@heartbeat_worker.start
end
def wait
sleep
rescue Interrupt
shutdown
ensure
Thread.current.exit
end
def shutdown
stop_workers(@pollers)
wait_workers(@pollers)
wait_activities_drained
stop_workers(@workers)
wait_activities_completed
shutdown_workers(@workers)
shutdown_worker(@heartbeat_worker)
end
def shutdown_workers(workers)
workers.each do |worker|
shutdown_worker(worker)
end
end
def shutdown_worker(worker)
worker.kill
end
def wait_workers(workers)
workers.each(&:wait)
end
def wait_activities_drained
wait_condition { @sink.empty? }
end
def wait_activities_completed
wait_condition { @activities.empty? }
end
def wait_condition(&block)
loop do
break if block.call
sleep(1)
end
end
def stop_workers(workers)
workers.each(&:stop)
end
class Worker
def initialize
@logger = Logger.new(STDOUT)
@running = false
end
def run
raise 'Method run hasn\'t been implemented'
end
def process
loop do
begin
break unless @running
run
rescue => e
puts e
@logger.error('Unexpected error had occurred')
@logger.error(e)
end
end
end
def start
return unless @thread.nil?
@running = true
@thread = Thread.new do
process
end
end
def stop
@running = false
end
def kill
return if @thread.nil?
@thread.kill
@thread = nil
end
def wait
@thread.join
end
end
class PollerWorker < Worker
def initialize(options = {})
@states = options[:states]
@activity_arn = options[:activity_arn]
@sink = options[:sink]
@activities = options[:activities]
@max_retry = options[:max_retry]
@logger = Logger.new(STDOUT)
end
def run
activity_task = StepFunctions.with_retries(max_retry: @max_retry) do
begin
@states.get_activity_task(activity_arn: @activity_arn)
rescue => e
@logger.error('Failed to retrieve activity task')
@logger.error(e)
end
end
return if activity_task.nil? || activity_task.task_token.nil?
@activities.add(activity_task.task_token)
@sink.push(activity_task)
end
end
class ActivityWorker < Worker
def initialize(options = {})
@states = options[:states]
@block = options[:block]
@sink = options[:sink]
@activities = options[:activities]
@max_retry = options[:max_retry]
@logger = Logger.new(STDOUT)
end
def run
activity_task = @sink.pop
result = @block.call(JSON.parse(activity_task.input))
send_task_success(activity_task, result)
rescue => e
send_task_failure(activity_task, e)
ensure
@activities.delete(activity_task.task_token) unless activity_task.nil?
end
def send_task_success(activity_task, result)
StepFunctions.with_retries(max_retry: @max_retry) do
begin
@states.send_task_success(
task_token: activity_task.task_token,
output: JSON.dump(result)
)
rescue => e
@logger.error('Failed to send task success')
@logger.error(e)
end
end
end
def send_task_failure(activity_task, error)
StepFunctions.with_retries do
begin
@states.send_task_failure(
task_token: activity_task.task_token,
cause: error.message
)
rescue => e
@logger.error('Failed to send task failure')
@logger.error(e)
end
end
end
end
class HeartbeatWorker < Worker
def initialize(options = {})
@states = options[:states]
@activities = options[:activities]
@heartbeat_delay = options[:heartbeat_delay]
@max_retry = options[:max_retry]
@logger = Logger.new(STDOUT)
end
def run
sleep(@heartbeat_delay)
@activities.each do |token|
send_heartbeat(token)
end
end
def send_heartbeat(token)
StepFunctions.with_retries(max_retry: @max_retry) do
begin
@states.send_task_heartbeat(token)
rescue => e
@logger.error('Failed to send heartbeat for activity')
@logger.error(e)
end
end
rescue => e
@logger.error('Failed to send heartbeat for activity')
@logger.error(e)
end
end
end
end