Skip to content

Commit

Permalink
Fix: set subscription-type based on message ordering (apache#2259)
Browse files Browse the repository at this point in the history
* Fix: set subscription-type based on message ordering

* set failover sub on EFFECTIVELY_ONCE processing guarantee
  • Loading branch information
rdhabalia authored Aug 2, 2018
1 parent d2f6dd9 commit c85dc46
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.WindowConfig;
import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
Expand Down Expand Up @@ -240,6 +241,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
@Parameter(names = "--userConfig", description = "User-defined config key/values")
protected String userConfigString;
@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order")
protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)")
protected Integer parallelism;
@Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)")
Expand Down Expand Up @@ -315,6 +318,9 @@ void processArguments() throws Exception {
if (null != processingGuarantees) {
functionConfig.setProcessingGuarantees(processingGuarantees);
}

functionConfig.setRetainOrdering(retainOrdering);

if (null != userConfigString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
Expand Down Expand Up @@ -567,24 +573,13 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern());
}

// Set subscription type based on processing semantics
if (functionConfig.getProcessingGuarantees() != null) {
switch (functionConfig.getProcessingGuarantees()) {
case ATMOST_ONCE:
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
break;
case ATLEAST_ONCE:
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
break;
case EFFECTIVELY_ONCE:
sourceSpecBuilder.setSubscriptionType(SubscriptionType.FAILOVER);
break;
default:
throw new RuntimeException("Unknown processing guarantee: "
+ functionConfig.getProcessingGuarantees().name());
}
}

// Set subscription type based on ordering and EFFECTIVELY_ONCE semantics
SubscriptionType subType = (functionConfig.isRetainOrdering()
|| ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
? SubscriptionType.FAILOVER
: SubscriptionType.SHARED;
sourceSpecBuilder.setSubscriptionType(subType);

if (typeArgs != null) {
sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.SinkConfig;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
Expand Down Expand Up @@ -468,7 +469,6 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep
// set source spec
// source spec classname should be empty so that the default pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
if (sinkConfig.getTopicToSerdeClassName() != null) {
sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
}
Expand All @@ -483,9 +483,12 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
}

sourceSpecBuilder.setSubscriptionType(
sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER : SubscriptionType.SHARED);

SubscriptionType subType = (sinkConfig.isRetainOrdering()
|| ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
? SubscriptionType.FAILOVER
: SubscriptionType.SHARED;
sourceSpecBuilder.setSubscriptionType(subType);

functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public enum Runtime {
@isValidTopicName
private String logTopic;
private ProcessingGuarantees processingGuarantees;
private boolean retainOrdering;
private Map<String, Object> userConfig;
private Runtime runtime;
private boolean autoAck;
Expand Down

0 comments on commit c85dc46

Please sign in to comment.