Skip to content

Commit

Permalink
Allow ability to specify sub position in functions (apache#7891)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran <[email protected]>

Fixes apache#5552
Fixes apache#6531

### Motivation

Allowability to specify sub position in Pulsar Functions

### Modifications

- add `retainEarliestPosition ` params
- add test case
  • Loading branch information
wolfstudy authored Sep 11, 2020
1 parent 248ee49 commit 809b249
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
Expand Down Expand Up @@ -266,6 +267,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected Boolean forwardSourceMessageProperty = true;
@Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
protected String subsName;
@Parameter(names = "--subs-position", description = "Pulsar source subscription position if user wants to consume messages from the specified location")
protected SubscriptionInitialPosition subsPosition;
@Parameter(names = "--parallelism", description = "The parallelism factor of a Pulsar Function (i.e. the number of function instances to run)")
protected Integer parallelism;
@Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)")
Expand Down Expand Up @@ -417,6 +420,10 @@ void processArguments() throws Exception {
functionConfig.setSubName(subsName);
}

if (null != subsPosition) {
functionConfig.setSubscriptionPosition(subsPosition);
}

if (null != userConfigString) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;

/**
* Configuration of Pulsar Function.
Expand Down Expand Up @@ -124,4 +125,6 @@ public enum Runtime {
// Max pending async requests per instance to avoid large number of concurrent requests.
// Only used in AsyncFunction. Default: 1000.
private Integer maxPendingAsyncRequests;

private SubscriptionInitialPosition subscriptionPosition;
}
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ message FunctionDetails {
string builtin = 20;
bool retainOrdering = 21;
bool retainKeyOrdering = 22;
SubscriptionPosition subscriptionPosition = 23;
}

message ConsumerSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.*;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -147,10 +148,21 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
}
sourceSpecBuilder.setSubscriptionType(subType);

// Set subscription name
if (isNotBlank(functionConfig.getSubName())) {
sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
}

// Set subscription position
Function.SubscriptionPosition subPosition;
if (functionConfig.getSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
subPosition = Function.SubscriptionPosition.EARLIEST;
} else {
subPosition = Function.SubscriptionPosition.LATEST;
}

sourceSpecBuilder.setSubscriptionPosition(subPosition);

if (typeArgs != null) {
sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.functions.utils;

import com.google.gson.Gson;

import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.functions.*;
import org.apache.pulsar.common.util.Reflections;
Expand Down Expand Up @@ -440,6 +442,7 @@ private FunctionConfig createFunctionConfig() {
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
functionConfig.setRetainKeyOrdering(false);
functionConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
functionConfig.setForwardSourceMessageProperty(false);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
Expand Down

0 comments on commit 809b249

Please sign in to comment.