A worker that listens to a StepFunctions activity and executes a provided function using the inputs from the activity task.
The StepFunctions Activity Worker encapsulates all the parts of communicating with the StepFunctions API so you don't have to worry about task heartbeats or maintaining task tokens and success/failure scenarios; all you have to worry about is executing the task.
Install from PyPI:
pip install stepfunctions_activity_worker
from stepfunctions_activity_worker import ActivityWorker
def my_task(**task_input):
"""Perform the task based on this task's input."""
# Perform your task here!
return {"result": "done!"}
if __name__ == "__main__":
activity_arn = "PLACE YOUR ACTIVITY ARN HERE"
worker = ActivityWorker(activity_arn, my_task)
worker.listen()
The ActivityWorker
class, if not provided with a client
argument on instantiation, will create a properly configured client from your default session.
However, if you are providing an already instantiated client
to the ActivityWorker
class, make sure it is proply configured to make StepFunctions API calls!
The GetActivityTask
API call blocks for 60 seconds which matches the botocore.config.Config
default read_timeout
. This means that if the API response for GetActivityTask
is not punctual (which it often isn't) it will cause unnecessary retry-requests & eventually bubble up an HTTP exception.
import boto3
import botocore
from stepfunctions_activity_worker import ActivityWorker
def my_task(**task_input):
"""Perform the task based on this task's input."""
# Perform your task here!
return {"result": "done!"}
config = botocore.config.Config(
read_timeout=70,
# Insert other custom configuration here
)
stepfunctions = boto3.client('stepfunctions', config=config)
activity_worker = ActivityWorker(activity_arn, my_task, client=stepfunctions)