Skip to content

Commit

Permalink
API CHANGE: WorkflowStarter is now workflow_starter, to match genera …
Browse files Browse the repository at this point in the history
…context manager naming practices
  • Loading branch information
darjus committed Jun 11, 2016
1 parent b7ee608 commit e92ad71
Show file tree
Hide file tree
Showing 19 changed files with 116 additions and 102 deletions.
2 changes: 1 addition & 1 deletion botoflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
from .activity_retrying import retry_on_exception
from .options import workflow_options, activity_options
from .workflow_definition import WorkflowDefinition
from .workflow_starter import WorkflowStarter
from .workflow_starter import workflow_starter
from . import workflow_types as types
2 changes: 1 addition & 1 deletion botoflow/context/start_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class StartWorkflowContext(ContextBase):
.. py:attribute:: workflow_starter
:rtype: botoflow.workflow_starter.WorkflowStarter
:rtype: botoflow.workflow_starter.workflow_starter
"""

Expand Down
2 changes: 1 addition & 1 deletion botoflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def execute(version,
"""This decorator indicates the entry point of the workflow.
The entry point of the workflow can be invoked remotely by your application
using :py:class`~botoflow.workflow_starter.WorkflowStarter` or direct API call from ``boto3`` or
using :py:class`~botoflow.workflow_starter.workflow_starter` or direct API call from ``boto3`` or
`AWS CLI <http://docs.aws.amazon.com/cli/latest/reference/swf/start-workflow-execution.html>`_.
:param str version: Required version of the workflow type. Maximum length
Expand Down
2 changes: 1 addition & 1 deletion botoflow/logging_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


class BotoflowFilter(logging.Filter):
"""You can use this filter with Python's `logging` module to filter out
"""You can use this filter with Python's :py:mod:`logging` module to filter out
botoflow logs that are being replayed by the decider.
For example::
Expand Down
75 changes: 43 additions & 32 deletions botoflow/workflow_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

__all__ = ('WorkflowDefinition', )

import six
from copy import copy

import six
from botoflow import get_context, async, return_
from botoflow.context import DecisionContext
from botoflow.exceptions import CancelledError

__all__ = ('WorkflowDefinition', )


class _WorkflowDefinitionMeta(type):

Expand Down Expand Up @@ -83,18 +82,18 @@ def _extract_workflows_and_signals(dct):


class WorkflowDefinition(six.with_metaclass(_WorkflowDefinitionMeta, object)):
"""Every workflow implementation needs to be a subclass of this class.
"""Every workflow implementation must be a subclass of :py:class:`~botoflow.workflow_definition.WorkflowDefinition`.
Usually there should be no need to instantiate the class manually, as
instead, the @execute method is called to start the workflow (you can think
of ths as having factory class methods).
instead, the :py:func:`@execute <botoflow.decorators.execute>` method is called to start the workflow
*(you can think of ths as having factory class methods in a sense)*.
Here's an example workflow implementation that has an @execute decorated
method and a @signal:
Here's an example workflow implementation that has an :py:func:`@execute <botoflow.decorators.execute>` decorated
method and a :py:func:`@signal <botoflow.decorators.signal>`:
.. code-block:: python
from botoflow import execute, Return, WorkflowDefinition
from botoflow import execute, return_, WorkflowDefinition
from botoflow.constants import MINUTES
from my_activities import MyActivities
Expand All @@ -108,27 +107,33 @@ def start_my_workflow(self, some_input):
result = yield MyActivities.activity1(some_input)
# return the result from the workflow
# in Python3 you can "return result" normally instead
return_(result)
@signal() # has to have () parentheses
def signal1(self, signal_input):
self.signal_input = signal_input
As with the @async decorated methods, returning values from the workflow is
a little bit inconvenient on Python 2 as instead of using the familiar
return keyword, the return value is "raised" like this: `raise
Return("Value")`.
.. note::
As with the :py:func:`@async <botoflow.core.decorators.async>` decorated methods, returning values from the
workflow is a little bit inconvenient on Python 2 as it does not allow returning from generator functions (see
:pep:`0380`). Instead of using the familiar ``return`` keyword, the return value is *raised* like this: ``raise
Return("Value")``.
"""

def __init__(self, workflow_execution):
"""
:param ~botoflow.workflow_execution.WorkflowExecution workflow_execution: Associated workflow execution.
"""
self.workflow_execution = workflow_execution
self.workflow_state = ""
self._workflow_result = None

@property
def workflow_execution(self):
"""Will contain the
:py:class:`botoflow.workflow_execution.WorkflowExecution` named tuple
:py:class:`~botoflow.workflow_execution.WorkflowExecution` named tuple
of the currently running workflow.
An example of the workflow_execution after starting a workflow:
Expand All @@ -141,12 +146,15 @@ def workflow_execution(self):
# start the workflow with a random workflow_id
with wf_worker:
instance = OneActivityWorkflow.execute(arg1=1, arg2=2)
print instance.workflow_execution
# prints:
# WorkflowExecution(
# workflow_id='73faf493fece67fefb1142739611c391a03bc23b',
# run_id='12Eg0ETHpm17rSWssUZKqAvEZVd5Ap0RELs8kE7U6igB8=')
instance = ExampleWorkflow.execute(arg1=1, arg2=2)
print(instance.workflow_execution)
Prints something like the following:
.. code-block:: python
WorkflowExecution(workflow_id='73faf493fece67fefb1142739611c391a03bc23b',
run_id='12Eg0ETHpm17rSWssUZKqAvEZVd5Ap0RELs8kE7U6igB8=')
"""
return self.__workflow_execution
Expand All @@ -160,13 +168,16 @@ def workflow_state(self):
"""This property is used to retrieve current workflow state.
The property is expected to perform read only access of the workflow
implementation object and is invoked synchronously which disallows
use of any asynchronous operations (like calling it with `yield`).
use of any asynchronous operations (like calling it with ``yield``).
The latest state reported by the workflow execution is returned to
you through visibility calls to the Amazon SWF service and in the
Amazon SWF console.
Example of setting the state between `yield` s:
To learn more about workflow execution state, please check out the official documentation on
`SWF Workflow History <http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dev-about-workflow-history.html>`_.
Example of setting the state between ``yield`` s:
.. code-block:: python
Expand All @@ -190,11 +201,10 @@ def workflow_result(self):
workflow execution.
The main use-case is when you have subworkflows, which results you'd
like to `yield` on and still be able to call signals on that
like to ``yield`` on and still be able to call signals on that
sub-workflow.
:returns: `botoflow.core.future.Future`, or None if the workflow has
not been started.
:returns: `~botoflow.core.future.Future`, or None if the workflow has not been started.
"""
return self._workflow_result

Expand All @@ -210,11 +220,10 @@ def cancel(self, details=""):
:param details: of request; is recorded in SWF history
:type details: str
:return: cancel Future that is empty until the message was succesfully delivered
to target execution. Does not indicate whether the target execution accepted
the request or not.
:rtype: awsflow.core.Future
:raises CancelledError: if workflow to cancel is of current context
:return: cancel :py:class:`~botoflow.core.future.Future` that is empty until the message was succesfully
delivered to target execution. Does not indicate whether the target execution accepted the request or not.
:rtype: botoflow.core.future.Future
:raises botoflow.core.exceptions.CancelledError: if workflow to cancel is of current context
"""
context = self._get_decision_context(self.cancel.__name__)
if self.workflow_execution == context._workflow_execution:
Expand All @@ -223,7 +232,9 @@ def cancel(self, details=""):

@async
def cancellation_handler(self):
"""Override to take cleanup actions before workflow cancels"""
"""Override this to take cleanup actions before workflow execution cancels.
"""
return_(None)

def _get_decision_context(self, calling_func):
Expand Down
24 changes: 17 additions & 7 deletions botoflow/workflow_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
from .context import StartWorkflowContext, get_context, set_context
from .utils import random_sha1_hash
from .swf_exceptions import swf_exception_wrapper
from .exceptions import (
WorkflowFailedError, WorkflowTimedOutError, WorkflowTerminatedError)
from .exceptions import WorkflowFailedError, WorkflowTimedOutError, WorkflowTerminatedError

log = logging.getLogger(__name__)


class WorkflowStarter(object):
class workflow_starter(object):
"""Use this context manager to start a new workflow execution
Example:
Expand All @@ -40,16 +39,16 @@ class WorkflowStarter(object):
# start the workflow using botocore session and ExampleWorkflow class
# with a random workflow_id
with WorkflowStarter(session, "us-east-1", "SOMEDOMAIN", "DEFAULT_TASKLIST"):
instance = OneActivityWorkflow.execute(arg1=1, arg2=2)
print instance.workflow_execution.workflow_id
with workflow_starter(session, "us-east-1", "SOMEDOMAIN", "DEFAULT_TASKLIST"):
instance = ExampleWorkflow.execute(arg1=1, arg2=2)
print(instance.workflow_execution.workflow_id)
# will print the workflow execution ID
"""

def __init__(self, session, aws_region, domain, default_task_list):
"""
:param session: BotoCore session.
:param session: Botocore session.
:type session: botocore.session.Session
:param aws_region:
:type aws_region: str
Expand Down Expand Up @@ -78,6 +77,17 @@ def __exit__(self, exc_type, value, traceback):
set_context(self._other_context)

def wait_for_completion(self, workflow_instance, poll_sleep_time, attempt_count=None):
"""This convenience method will block until the workflow completes or ``attempt_count`` is reached.
.. code-block:: python
with workflow_starter(session, "us-east-1", "SOMEDOMAIN", "DEFAULT_TASKLIST") as starter:
instance = ExampleWorkflow.execute(arg1=1, arg2=2)
# check every two minutes if the workflow completed and since we have not set
# the attempt_count, we will poll for as long as the workflow is running
result = starter.wait_for_completion(instance, poll_sleep_time, 2*MINUTES)
"""
workflow_execution = workflow_instance.workflow_execution
data_converter = workflow_instance._data_converter

Expand Down
6 changes: 3 additions & 3 deletions docs/source/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ You can start an instance of this workflow using the following code snippet:
session = botocore.session.get_session()
with WorkflowStarter(session, aws_region='us-east-1',
with workflow_starter(session, aws_region='us-east-1',
domain='domain1', task_list='tasklist1'):
HelloWorldWorkflow.hello_world() # starts the workflow
Here we use :py:func:`~botocore.session.get_session` from botocore
*(for authentication as well as low-level communication with SWF
service)* and pass it to our
:py:class:`~botoflow.workflow_starter.WorkflowStarter`. Then we call
:py:class:`~botoflow.workflow_starter.workflow_starter`. Then we call
``HelloWorldWorkflow.hello_world()`` in the
:py:class:`~botoflow.workflow_starter.WorkflowStarter` context to
:py:class:`~botoflow.workflow_starter.workflow_starter` context to
start a new workflow execution.


Expand Down
6 changes: 1 addition & 5 deletions docs/source/reference/workflow_definition.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
Workflow Definition
===================
workflow_definition
===================

botoflow.workflow_definition
----------------------------

.. automodule:: botoflow.workflow_definition
:show-inheritance:
Expand Down
5 changes: 1 addition & 4 deletions docs/source/reference/workflow_starter.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
================
workflow_starter
Workflow Starter
================

botoflow.workflow_starter
------------------------

.. automodule:: botoflow.workflow_starter
:members:
:undoc-members:
4 changes: 2 additions & 2 deletions test/integration/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from threading import Thread

from botoflow import (WorkflowDefinition, execute, return_, WorkflowWorker, ActivityWorker,
WorkflowStarter, async, workflow_time, workflow_options)
workflow_starter, async, workflow_time, workflow_options)
from botoflow.workflow_execution import WorkflowExecution
from botoflow.core import CancelledError
from botoflow.exceptions import RequestCancelExternalWorkflowExecutionFailedError
Expand Down Expand Up @@ -391,7 +391,7 @@ def execute(self, target_wf_id, target_run_id):
[source_wf, target_wf])

target_execution = self.start_workflow(target_wf)
with WorkflowStarter(self.session, self.region, self.domain, 'source_task_list'):
with workflow_starter(self.session, self.region, self.domain, 'source_task_list'):
instance = source_wf.execute(*target_execution)
source_execution = instance.workflow_execution

Expand Down
8 changes: 4 additions & 4 deletions test/integration/test_child_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging

from botoflow import (WorkflowDefinition, execute, return_, WorkflowWorker,
ActivityWorker, WorkflowStarter, workflow_options)
ActivityWorker, workflow_starter, workflow_options)
from botoflow.exceptions import ChildWorkflowTimedOutError, ChildWorkflowFailedError
from various_activities import BunchOfActivities
from botoflow.logging_filters import BotoflowFilter
Expand Down Expand Up @@ -74,7 +74,7 @@ def test_two_workflows(self):
MasterWorkflow, ChildWorkflow)
act_worker = ActivityWorker(
self.session, self.region, self.domain, self.task_list, BunchOfActivities())
with WorkflowStarter(self.session, self.region, self.domain, self.task_list):
with workflow_starter(self.session, self.region, self.domain, self.task_list):
instance = MasterWorkflow.execute(arg1=1, arg2=2)
self.workflow_execution = instance.workflow_execution

Expand All @@ -98,7 +98,7 @@ def test_child_workflow_timed_out(self):
wf_worker = WorkflowWorker(
self.session, self.region, self.domain, self.task_list,
TimingOutMasterWorkflow, TimingOutChildWorkflow)
with WorkflowStarter(self.session, self.region, self.domain, self.task_list):
with workflow_starter(self.session, self.region, self.domain, self.task_list):
instance = TimingOutMasterWorkflow.execute()
self.workflow_execution = instance.workflow_execution

Expand All @@ -124,7 +124,7 @@ def test_raising_child_workflows(self):
self.session, self.region, self.domain, child_tasklist,
RaisingChildWorkflow)

with WorkflowStarter(self.session, self.region, self.domain, self.task_list):
with workflow_starter(self.session, self.region, self.domain, self.task_list):
instance = MasterWorkflowWithException.execute(child_tasklist)
self.workflow_execution = instance.workflow_execution

Expand Down
4 changes: 2 additions & 2 deletions test/integration/test_generic_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest

from botoflow import (WorkflowDefinition, execute, return_, ThreadedActivityExecutor, GenericWorkflowWorker, ActivityWorker,
WorkflowStarter)
workflow_starter)

from botoflow.utils import extract_workflows_dict
from utils import SWFMixIn
Expand Down Expand Up @@ -46,7 +46,7 @@ def execute(self, arg1, arg2):
act_worker = ThreadedActivityExecutor(ActivityWorker(
self.session, self.region, self.domain, self.task_list, BunchOfActivities()))

with WorkflowStarter(self.session, self.region, self.domain, self.task_list):
with workflow_starter(self.session, self.region, self.domain, self.task_list):
instance = OneActivityWorkflow.execute(arg1=1, arg2=2)
self.workflow_execution = instance.workflow_execution

Expand Down
6 changes: 3 additions & 3 deletions test/integration/test_manual_activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from botoflow import (WorkflowDefinition, execute, return_,
ThreadedActivityExecutor, WorkflowWorker, ActivityWorker,
WorkflowStarter)
workflow_starter)

from botoflow.manual_activity_completion_client import ManualActivityCompletionClient
from utils import SWFMixIn
Expand All @@ -32,7 +32,7 @@ def execute(self, template):
act_executor = ThreadedActivityExecutor(ActivityWorker(
self.session, self.region, self.domain, self.task_list, ManualActivities()))

with WorkflowStarter(self.session, self.region, self.domain, self.task_list):
with workflow_starter(self.session, self.region, self.domain, self.task_list):
instance = OneManualActivityWorkflow.execute(template='instructions.tmpl')
self.workflow_execution = instance.workflow_execution

Expand Down Expand Up @@ -84,7 +84,7 @@ def execute(self, template):
self.session, self.region, self.domain, self.task_list,
BunchOfActivities(), ManualActivities())

with WorkflowStarter(self.session, self.region, self.domain, self.task_list):
with workflow_starter(self.session, self.region, self.domain, self.task_list):
instance = OneManualOneAutomaticActivityWorkflow.execute(template='instructions.tmpl')
self.workflow_execution = instance.workflow_execution

Expand Down
Loading

0 comments on commit e92ad71

Please sign in to comment.