Skip to content

Commit

Permalink
Add support for async update (temporalio#1766)
Browse files Browse the repository at this point in the history
Add support for async update to test server and client
  • Loading branch information
Quinn-With-Two-Ns authored May 19, 2023
1 parent 16dc271 commit 1be0cee
Show file tree
Hide file tree
Showing 22 changed files with 1,159 additions and 245 deletions.
5 changes: 4 additions & 1 deletion docker/buildkite/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ system.enableEagerWorkflowStart:
constraints: {}
frontend.enableUpdateWorkflowExecution:
- value: true
constraints: {}
constraints: {}
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
- value: true
constraints: { }
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.Experimental;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Experimental
final class UpdateHandleImpl<T> implements UpdateHandle<T> {
final class CompletedUpdateHandleImpl<T> implements UpdateHandle<T> {

private final String id;
private final WorkflowExecution execution;
private final CompletableFuture<T> future;
private final T result;

UpdateHandleImpl(String id, WorkflowExecution execution, CompletableFuture<T> future) {
CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) {
this.id = id;
this.execution = execution;
this.future = future;
this.result = result;
}

@Override
Expand All @@ -49,6 +50,11 @@ public String getId() {

@Override
public CompletableFuture<T> getResultAsync() {
return future;
return CompletableFuture.completedFuture(result);
}

@Override
public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
return CompletableFuture.completedFuture(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.Experimental;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Experimental
final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {

private final WorkflowClientCallsInterceptor workflowClientInvoker;
private final String workflowType;
private final String updateName;
private final String id;
private final WorkflowExecution execution;
private final Class<T> resultClass;
private final Type resultType;

LazyUpdateHandleImpl(
WorkflowClientCallsInterceptor workflowClientInvoker,
String workflowType,
String updateName,
String id,
WorkflowExecution execution,
Class<T> resultClass,
Type resultType) {
this.workflowClientInvoker = workflowClientInvoker;
this.workflowType = workflowType;
this.updateName = updateName;
this.id = id;
this.execution = execution;
this.resultClass = resultClass;
this.resultType = resultType;
}

@Override
public WorkflowExecution getExecution() {
return execution;
}

@Override
public String getId() {
return id;
}

@Override
public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput output =
workflowClientInvoker.pollWorkflowUpdate(
new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>(
execution, updateName, id, resultClass, resultType, timeout, unit));

return output
.getResult()
.exceptionally(
failure -> {
if (failure instanceof CompletionException) {
// unwrap the CompletionException
failure = ((Throwable) failure).getCause();
}
failure = CheckedExceptionWrapper.unwrap((Throwable) failure);
if (failure instanceof Error) {
throw (Error) failure;
}
if (failure instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) failure;
if (Status.Code.NOT_FOUND.equals(sre.getStatus().getCode())) {
// Currently no way to tell if the NOT_FOUND was because the workflow ID
// does not exist or because the update ID does not exist.
throw sre;
}
} else if (failure instanceof WorkflowException) {
throw (WorkflowException) failure;
} else if (failure instanceof TimeoutException) {
throw new CompletionException((TimeoutException) failure);
}
throw new WorkflowServiceException(execution, workflowType, (Throwable) failure);
});
}

@Override
public CompletableFuture<T> getResultAsync() {
return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
13 changes: 12 additions & 1 deletion temporal-sdk/src/main/java/io/temporal/client/UpdateHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.Experimental;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* UpdateHandle is a handle to an update workflow execution request that can be used to get the
Expand All @@ -45,10 +46,20 @@ public interface UpdateHandle<T> {
String getId();

/**
* Returns a {@link CompletableFuture} with the update workflow execution request result
* Returns a {@link CompletableFuture} with the update workflow execution request result,
* potentially waiting for the update to complete.
*
* @return future completed with the result of the update or an exception
*/
CompletableFuture<T> getResultAsync();

/**
* Returns a {@link CompletableFuture} with the update workflow execution request result,
* potentially waiting for the update to complete.
*
* @param timeout maximum time to wait and perform the background long polling
* @param unit unit of timeout
* @return future completed with the result of the update or an exception
*/
CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit);
}
Loading

0 comments on commit 1be0cee

Please sign in to comment.