Skip to content

Commit

Permalink
[pulsar-functions] Pass SubscriptionPosition from FunctionDetails
Browse files Browse the repository at this point in the history
… to `FunctionConfig` / `SinkConfig` (apache#11831)

* pass SubscriptionPosition from FunctionDetails to config

* address comment

* reduce code duplication

* set subscriptionPosition in FunctionConfig, SinkConfig with init value

* fix default values

* fix CI

* revert init data

* fix CI

* fix CI

* fix CI
  • Loading branch information
freeznet authored Sep 15, 2021
1 parent 524b103 commit bfd6542
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,6 @@ public enum Runtime {
// Whether the pulsar admin client exposed to function context, default is disabled.
private Boolean exposePulsarAdminClientEnabled;

private SubscriptionInitialPosition subscriptionPosition;
@Builder.Default
private SubscriptionInitialPosition subscriptionPosition = SubscriptionInitialPosition.Latest;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class SinkConfig {
private String name;
private String className;
private String sourceSubscriptionName;
private SubscriptionInitialPosition sourceSubscriptionPosition;
@Builder.Default
private SubscriptionInitialPosition sourceSubscriptionPosition = SubscriptionInitialPosition.Latest;

private Collection<String> inputs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.pulsar.functions.instance;

import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -675,14 +676,9 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));

switch (sourceSpec.getSubscriptionPosition()) {
case EARLIEST:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
break;
default:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
break;
}
pulsarSourceConfig.setSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(sourceSpec.getSubscriptionPosition())
);

Preconditions.checkNotNull(contextImpl.getSubscriptionType());
pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down Expand Up @@ -501,4 +502,13 @@ public static boolean isFunctionCodeBuiltin(org.apache.pulsar.functions.proto.Fu

return false;
}

public static SubscriptionInitialPosition convertFromFunctionDetailsSubscriptionPosition(
org.apache.pulsar.functions.proto.Function.SubscriptionPosition subscriptionPosition) {
if (org.apache.pulsar.functions.proto.Function.SubscriptionPosition.EARLIEST.equals(subscriptionPosition)) {
return SubscriptionInitialPosition.Earliest;
} else {
return SubscriptionInitialPosition.Latest;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.pulsar.common.functions.Utils.BUILTIN;
import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;

@Slf4j
public class FunctionConfigUtils {
Expand Down Expand Up @@ -163,15 +164,16 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
}

// Set subscription position
Function.SubscriptionPosition subPosition;
if (functionConfig.getSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
subPosition = Function.SubscriptionPosition.EARLIEST;
} else {
subPosition = Function.SubscriptionPosition.LATEST;
if (functionConfig.getSubscriptionPosition() != null) {
Function.SubscriptionPosition subPosition = null;
if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) {
subPosition = Function.SubscriptionPosition.EARLIEST;
} else {
subPosition = Function.SubscriptionPosition.LATEST;
}
sourceSpecBuilder.setSubscriptionPosition(subPosition);
}

sourceSpecBuilder.setSubscriptionPosition(subPosition);

if (typeArgs != null) {
sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
}
Expand Down Expand Up @@ -373,6 +375,11 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)

functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
functionConfig.setAutoAck(functionDetails.getAutoAck());

// Set subscription position
functionConfig.setSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));

if (functionDetails.getSource().getTimeoutMs() != 0) {
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;

Expand Down Expand Up @@ -286,6 +287,11 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
}
sinkConfig.setAutoAck(functionDetails.getAutoAck());

// Set subscription position
sinkConfig.setSourceSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));

if (functionDetails.getSource().getTimeoutMs() != 0) {
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
}
Expand Down

0 comments on commit bfd6542

Please sign in to comment.