Skip to content

Commit

Permalink
Add ScheduleClientInterceptor APIs (fixes temporalio#2048)
Browse files Browse the repository at this point in the history
Signed-off-by: Dan O'Reilly <[email protected]>
  • Loading branch information
dano committed May 2, 2024
1 parent ed211fa commit fec0af2
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

import com.uber.m3.tally.Scope;
import io.temporal.common.interceptors.ScheduleClientCallsInterceptor;
import io.temporal.common.interceptors.ScheduleClientInterceptor;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.client.RootScheduleClientInvoker;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.List;
import java.util.stream.Stream;
import javax.annotation.Nullable;

Expand All @@ -39,6 +41,7 @@ final class ScheduleClientImpl implements ScheduleClient {
private final GenericWorkflowClient genericClient;
private final Scope metricsScope;
private final ScheduleClientCallsInterceptor scheduleClientCallsInvoker;
private final List<ScheduleClientInterceptor> interceptors;

/**
* Creates client that connects to an instance of the Temporal Service. Cannot be used from within
Expand All @@ -65,7 +68,18 @@ public static ScheduleClient newInstance(
.getMetricsScope()
.tagged(MetricsTag.defaultTags(options.getNamespace()));
this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope);
this.scheduleClientCallsInvoker = new RootScheduleClientInvoker(genericClient, options);
this.interceptors = options.getInterceptors();
this.scheduleClientCallsInvoker = initializeClientInvoker();
}

private ScheduleClientCallsInterceptor initializeClientInvoker() {
ScheduleClientCallsInterceptor scheduleClientInvoker =
new RootScheduleClientInvoker(genericClient, options);
for (ScheduleClientInterceptor clientInterceptor : interceptors) {
scheduleClientInvoker =
clientInterceptor.scheduleClientCallsInterceptor(scheduleClientInvoker);
}
return scheduleClientInvoker;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.common.interceptors.ScheduleClientInterceptor;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -56,11 +57,14 @@ public static final class Builder {
private static final String DEFAULT_NAMESPACE = "default";
private static final List<ContextPropagator> EMPTY_CONTEXT_PROPAGATORS =
Collections.emptyList();
private static final List<ScheduleClientInterceptor> EMPTY_INTERCEPTORS =
Collections.emptyList();

private String namespace;
private DataConverter dataConverter;
private String identity;
private List<ContextPropagator> contextPropagators;
private List<ScheduleClientInterceptor> interceptors;

private Builder() {}

Expand All @@ -72,6 +76,7 @@ private Builder(ScheduleClientOptions options) {
dataConverter = options.dataConverter;
identity = options.identity;
contextPropagators = options.contextPropagators;
interceptors = options.interceptors;
}

/** Set the namespace this client will operate on. */
Expand Down Expand Up @@ -106,30 +111,44 @@ public Builder setContextPropagators(List<ContextPropagator> contextPropagators)
return this;
}

/**
* Set the interceptors for this client.
*
* @param interceptors specifies the list of interceptors to use with the client.
*/
public Builder setInterceptors(List<ScheduleClientInterceptor> interceptors) {
this.interceptors = interceptors;
return this;
}

public ScheduleClientOptions build() {
String name = identity == null ? ManagementFactory.getRuntimeMXBean().getName() : identity;
return new ScheduleClientOptions(
namespace == null ? DEFAULT_NAMESPACE : namespace,
dataConverter == null ? GlobalDataConverter.get() : dataConverter,
name,
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators);
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators,
interceptors == null ? EMPTY_INTERCEPTORS : interceptors);
}
}

private final String namespace;
private final DataConverter dataConverter;
private final String identity;
private final List<ContextPropagator> contextPropagators;
private final List<ScheduleClientInterceptor> interceptors;

private ScheduleClientOptions(
String namespace,
DataConverter dataConverter,
String identity,
List<ContextPropagator> contextPropagators) {
List<ContextPropagator> contextPropagators,
List<ScheduleClientInterceptor> interceptors) {
this.namespace = namespace;
this.dataConverter = dataConverter;
this.identity = identity;
this.contextPropagators = contextPropagators;
this.interceptors = interceptors;
}

/**
Expand Down Expand Up @@ -167,4 +186,13 @@ public String getIdentity() {
public List<ContextPropagator> getContextPropagators() {
return contextPropagators;
}

/**
* Get the interceptors of this client
*
* @return The list of interceptors to use with the client.
*/
public List<ScheduleClientInterceptor> getInterceptors() {
return interceptors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.common.interceptors;

import io.temporal.client.schedules.ScheduleClient;
import io.temporal.client.schedules.ScheduleHandle;
import io.temporal.common.Experimental;

/**
* Intercepts calls to the {@link ScheduleClient} and {@link ScheduleHandle} related to the
* lifecycle of a Schedule.
*/
@Experimental
public interface ScheduleClientInterceptor {

/**
* Called once during creation of ScheduleClient to create a chain of ScheduleClient Interceptors
*
* @param next next schedule client interceptor in the chain of interceptors
* @return new interceptor that should decorate calls to {@code next}
*/
ScheduleClientCallsInterceptor scheduleClientCallsInterceptor(
ScheduleClientCallsInterceptor next);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.common.interceptors;

import io.temporal.common.Experimental;

/** Convenience base class for ScheduleClientInterceptor implementations. */
@Experimental
public class ScheduleClientInterceptorBase implements ScheduleClientInterceptor {

@Override
public ScheduleClientCallsInterceptor scheduleClientCallsInterceptor(
ScheduleClientCallsInterceptor next) {
return next;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

package io.temporal.client.schedules;

import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

import io.temporal.api.enums.v1.ScheduleOverlapPolicy;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.interceptors.ScheduleClientInterceptor;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
Expand All @@ -47,13 +49,14 @@ public class ScheduleTest {
.setWorkflowTypes(ScheduleTest.QuickWorkflowImpl.class)
.build();

private ScheduleClient createScheduleClient() {
private ScheduleClient createScheduleClient(ScheduleClientInterceptor... interceptors) {
return new ScheduleClientImpl(
testWorkflowRule.getWorkflowServiceStubs(),
ScheduleClientOptions.newBuilder()
.setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace())
.setIdentity(testWorkflowRule.getWorkflowClient().getOptions().getIdentity())
.setDataConverter(testWorkflowRule.getWorkflowClient().getOptions().getDataConverter())
.setInterceptors(Arrays.asList(interceptors))
.build());
}

Expand Down Expand Up @@ -544,6 +547,40 @@ public void listSchedules() {
});
}

@Test
public void testInterceptors() {
TracingScheduleInterceptor.FilteredTrace ft = new TracingScheduleInterceptor.FilteredTrace();
String scheduleId = "my-schedule";
TracingScheduleInterceptor interceptor = new TracingScheduleInterceptor(ft);
interceptor.setExpected(
"createSchedule: " + scheduleId,
"listSchedules",
"describeSchedule: " + scheduleId,
"pauseSchedule: " + scheduleId,
"unpauseSchedule: " + scheduleId,
"triggerSchedule: " + scheduleId,
"backfillSchedule: " + scheduleId,
"describeSchedule: " + scheduleId, // Updating a schedule implicitly calls describe.
"updateSchedule: " + scheduleId,
"deleteSchedule: " + scheduleId);
ScheduleClient client = createScheduleClient(interceptor);
ScheduleHandle handle =
client.createSchedule(
scheduleId, createTestSchedule().build(), ScheduleOptions.newBuilder().build());
try {
assertTrue(client.listSchedules().anyMatch(s -> s.getScheduleId().equals(scheduleId)));
handle.describe();
handle.pause();
handle.unpause();
handle.trigger();
handle.backfill(Arrays.asList(new ScheduleBackfill(Instant.now(), Instant.now())));
handle.update(input -> new ScheduleUpdate(createTestSchedule().build()));
} finally {
handle.delete();
}
interceptor.assertExpected();
}

public static class QuickWorkflowImpl implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String arg) {
Expand Down
Loading

0 comments on commit fec0af2

Please sign in to comment.