Skip to content

Commit

Permalink
Add WorkflowUpdateRPCTimeoutOrCancelledError (temporalio#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jun 20, 2024
1 parent 4f646c2 commit 2331aa4
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
# Build and test the project
build-lint-test:
strategy:
fail-fast: true
fail-fast: false
matrix:
python: ["3.8", "3.12"]
os: [ubuntu-latest, ubuntu-arm, macos-intel, macos-arm, windows-latest]
Expand Down
82 changes: 68 additions & 14 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,10 @@ async def execute_update(
rpc_timeout: Optional RPC deadline to set for the RPC call.
Raises:
WorkflowUpdateFailedError: If the update failed
WorkflowUpdateFailedError: If the update failed.
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out
or was cancelled. This doesn't mean the update itself was timed
out or cancelled.
RPCError: There was some issue sending the update to the workflow.
"""
handle = await self._start_update(
Expand Down Expand Up @@ -1968,6 +1971,9 @@ async def start_update(
rpc_timeout: Optional RPC deadline to set for the RPC call.
Raises:
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out
or was cancelled. This doesn't mean the update itself was timed
out or cancelled.
RPCError: There was some issue sending the update to the workflow.
"""
return await self._start_update(
Expand Down Expand Up @@ -4305,7 +4311,10 @@ async def result(
it will be retried until the result is available.
Raises:
WorkflowUpdateFailedError: If the update failed
WorkflowUpdateFailedError: If the update failed.
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out
or was cancelled. This doesn't mean the update itself was timed
out or cancelled.
RPCError: Update result could not be fetched for some other reason.
"""
# Poll until outcome reached
Expand Down Expand Up @@ -4357,15 +4366,28 @@ async def _poll_until_outcome(

# Continue polling as long as we have no outcome
while True:
res = await self._client.workflow_service.poll_workflow_execution_update(
req,
retry=True,
metadata=rpc_metadata,
timeout=rpc_timeout,
)
if res.HasField("outcome"):
self._known_outcome = res.outcome
return
try:
res = (
await self._client.workflow_service.poll_workflow_execution_update(
req,
retry=True,
metadata=rpc_metadata,
timeout=rpc_timeout,
)
)
if res.HasField("outcome"):
self._known_outcome = res.outcome
return
except RPCError as err:
if (
err.status == RPCStatusCode.DEADLINE_EXCEEDED
or err.status == RPCStatusCode.CANCELLED
):
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err
else:
raise
except asyncio.CancelledError as err:
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err


class WorkflowUpdateStage(IntEnum):
Expand Down Expand Up @@ -4456,6 +4478,24 @@ def cause(self) -> BaseException:
return self.__cause__


class RPCTimeoutOrCancelledError(temporalio.exceptions.TemporalError):
"""Error that occurs on some client calls that timeout or get cancelled."""

pass


class WorkflowUpdateRPCTimeoutOrCancelledError(RPCTimeoutOrCancelledError):
"""Error that occurs when update RPC call times out or is cancelled.
Note, this is not related to any general concept of timing out or cancelling
a running update, this is only related to the client call itself.
"""

def __init__(self) -> None:
"""Create workflow update timeout or cancelled error."""
super().__init__("Timeout or cancellation waiting for update")


class AsyncActivityCancelledError(temporalio.exceptions.TemporalError):
"""Error that occurs when async activity attempted heartbeat but was cancelled."""

Expand Down Expand Up @@ -5261,9 +5301,23 @@ async def start_workflow_update(
# the user cannot specify sooner than ACCEPTED)
resp: temporalio.api.workflowservice.v1.UpdateWorkflowExecutionResponse
while True:
resp = await self._client.workflow_service.update_workflow_execution(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
try:
resp = await self._client.workflow_service.update_workflow_execution(
req,
retry=True,
metadata=input.rpc_metadata,
timeout=input.rpc_timeout,
)
except RPCError as err:
if (
err.status == RPCStatusCode.DEADLINE_EXCEEDED
or err.status == RPCStatusCode.CANCELLED
):
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err
else:
raise
except asyncio.CancelledError as err:
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err
if (
resp.stage >= req.wait_policy.lifecycle_stage
or resp.stage
Expand Down
23 changes: 23 additions & 0 deletions tests/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from datetime import timedelta
from typing import Awaitable, Callable, Optional, Sequence, Type, TypeVar

from temporalio.api.common.v1 import WorkflowExecution
from temporalio.api.enums.v1 import IndexedValueType
from temporalio.api.operatorservice.v1 import (
AddSearchAttributesRequest,
ListSearchAttributesRequest,
)
from temporalio.api.update.v1 import UpdateRef
from temporalio.api.workflowservice.v1 import PollWorkflowExecutionUpdateRequest
from temporalio.client import BuildIdOpAddNewDefault, Client
from temporalio.common import SearchAttributeKey
from temporalio.service import RPCError, RPCStatusCode
Expand Down Expand Up @@ -105,3 +108,23 @@ def find_free_port() -> int:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]


async def workflow_update_exists(
client: Client, workflow_id: str, update_id: str
) -> bool:
try:
await client.workflow_service.poll_workflow_execution_update(
PollWorkflowExecutionUpdateRequest(
namespace=client.namespace,
update_ref=UpdateRef(
workflow_execution=WorkflowExecution(workflow_id=workflow_id),
update_id=update_id,
),
)
)
return True
except RPCError as err:
if err.status != RPCStatusCode.NOT_FOUND:
raise
return False
128 changes: 108 additions & 20 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from temporalio.api.update.v1 import UpdateRef
from temporalio.api.workflowservice.v1 import (
GetWorkflowExecutionHistoryRequest,
PollWorkflowExecutionUpdateRequest,
ResetStickyTaskQueueRequest,
)
from temporalio.bridge.proto.workflow_activation import WorkflowActivation
Expand All @@ -57,6 +56,7 @@
WorkflowQueryFailedError,
WorkflowUpdateFailedError,
WorkflowUpdateHandle,
WorkflowUpdateRPCTimeoutOrCancelledError,
WorkflowUpdateStage,
)
from temporalio.common import (
Expand Down Expand Up @@ -107,6 +107,7 @@
ensure_search_attributes_present,
find_free_port,
new_worker,
workflow_update_exists,
)
from tests.helpers.external_coroutine import wait_on_timer
from tests.helpers.external_stack_trace import (
Expand Down Expand Up @@ -4364,25 +4365,8 @@ async def test_workflow_update_before_worker_start(
task_queue=task_queue,
)

async def update_exists() -> bool:
try:
await client.workflow_service.poll_workflow_execution_update(
PollWorkflowExecutionUpdateRequest(
namespace=client.namespace,
update_ref=UpdateRef(
workflow_execution=WorkflowExecution(workflow_id=handle.id),
update_id="my-update",
),
)
)
return True
except RPCError as err:
if err.status != RPCStatusCode.NOT_FOUND:
raise
return False

# Confirm update not there
assert not await update_exists()
assert not await workflow_update_exists(client, handle.id, "my-update")

# Execute update in background
update_task = asyncio.create_task(
Expand All @@ -4392,7 +4376,9 @@ async def update_exists() -> bool:
)

# Wait until update exists
await assert_eq_eventually(True, update_exists)
await assert_eq_eventually(
True, lambda: workflow_update_exists(client, handle.id, "my-update")
)

# Start no-cache worker on the task queue
async with new_worker(
Expand Down Expand Up @@ -4466,6 +4452,108 @@ async def test_workflow_update_separate_handle(
assert "workflow-done" == await handle.result()


@workflow.defn
class UpdateTimeoutOrCancelWorkflow:
@workflow.run
async def run(self) -> None:
await workflow.wait_condition(lambda: False)

@workflow.update
async def do_update(self, sleep: float) -> None:
await asyncio.sleep(sleep)


async def test_workflow_update_timeout_or_cancel(
client: Client, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip(
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
)

# Confirm start timeout via short timeout on update w/ no worker running
handle = await client.start_workflow(
UpdateTimeoutOrCancelWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue="does-not-exist",
)
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
rpc_timeout=timedelta(milliseconds=1),
)

# Confirm start cancel via cancel on update w/ no worker running
handle = await client.start_workflow(
UpdateTimeoutOrCancelWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue="does-not-exist",
)
task = asyncio.create_task(
handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
id="my-update",
)
)
# Have to wait for update to exist before cancelling to capture
await assert_eq_eventually(
True, lambda: workflow_update_exists(client, handle.id, "my-update")
)
task.cancel()
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await task

# Start worker
async with new_worker(client, UpdateTimeoutOrCancelWorkflow) as worker:
# Start the workflow
handle = await client.start_workflow(
UpdateTimeoutOrCancelWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Start an update
update_handle = await handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
# Timeout a poll call
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await update_handle.result(rpc_timeout=timedelta(milliseconds=1))

# Cancel a poll call
update_handle = await handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
result_task = asyncio.create_task(update_handle.result())
# Unfortunately there is not a way for us to confirm this is actually
# pending the server call and if you cancel early you get an asyncio
# cancelled error because it never even reached the gRPC client. We
# considered sleeping, but that makes for flaky tests. So what we are
# going to do is patch the poll call to notify us when it was called.
called = asyncio.Event()
unpatched_call = client.workflow_service.poll_workflow_execution_update

async def patched_call(*args, **kwargs):
called.set()
return await unpatched_call(*args, **kwargs)

client.workflow_service.poll_workflow_execution_update = patched_call # type: ignore
try:
await called.wait()
finally:
client.workflow_service.poll_workflow_execution_update = unpatched_call
result_task.cancel()
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await result_task


@workflow.defn
class TimeoutSupportWorkflow:
@workflow.run
Expand Down

0 comments on commit 2331aa4

Please sign in to comment.