Skip to content

Commit

Permalink
Add subscribe position param for consumer of sink (apache#5532)
Browse files Browse the repository at this point in the history
* Add subscribe position param for consumer of sink

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored Nov 4, 2019
1 parent 7275d53 commit 39af477
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
Expand Down Expand Up @@ -264,6 +265,9 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
protected String subsName;

@Parameter(names = "--subs-position", description = "Pulsar source subscription position if user wants to consume messages from the specified location")
protected SubscriptionInitialPosition subsPosition;

@Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true)
protected String DEPRECATED_customSerdeInputString;
@Parameter(names = "--custom-serde-inputs", description = "The map of input topics to SerDe class names (as a JSON string)")
Expand Down Expand Up @@ -378,6 +382,10 @@ void processArguments() throws Exception {
sinkConfig.setSourceSubscriptionName(subsName);
}

if (null != subsPosition) {
sinkConfig.setSourceSubscriptionPosition(subsPosition);
}

if (null != topicsPattern) {
sinkConfig.setTopicsPattern(topicsPattern);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
Expand All @@ -51,6 +52,7 @@ public class SinkConfig {
private String name;
private String className;
private String sourceSubscriptionName;
private SubscriptionInitialPosition sourceSubscriptionPosition;

private Collection<String> inputs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.protocol.schema.LatestVersion;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
Expand Down Expand Up @@ -679,6 +681,15 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));

switch (sourceSpec.getSubscriptionPosition()) {
case EARLIEST:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
break;
default:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
break;
}

switch (sourceSpec.getSubscriptionType()) {
case FAILOVER:
pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
log.info("Creating consumers for topic : {}, schema : {}", topic, conf.getSchema());

ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
// consume message even if can't decrypt and deliver it along with encryption-ctx
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscriptionName(pulsarSourceConfig.getSubscriptionName())
.subscriptionInitialPosition(pulsarSourceConfig.getSubscriptionPosition())
.subscriptionType(pulsarSourceConfig.getSubscriptionType())
.messageListener(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import lombok.Data;

import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.functions.ConsumerConfig;
Expand All @@ -37,6 +38,7 @@ public class PulsarSourceConfig {
private FunctionConfig.ProcessingGuarantees processingGuarantees;
SubscriptionType subscriptionType;
private String subscriptionName;
private SubscriptionInitialPosition subscriptionPosition;
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Integer maxMessageRetries = -1;
private String deadLetterTopic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down Expand Up @@ -80,6 +82,7 @@ private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionInitialPosition(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
doReturn(consumerBuilder).when(consumerBuilder).messageListener(any());
Expand All @@ -96,6 +99,8 @@ private static PulsarSourceConfig getPulsarConfigs() {
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
pulsarConfig.setTopicSchema(consumerConfigs);
pulsarConfig.setTypeClassName(String.class.getName());
pulsarConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
pulsarConfig.setSubscriptionType(SubscriptionType.Shared);
return pulsarConfig;
}

Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ enum SubscriptionType {
FAILOVER = 1;
}

enum SubscriptionPosition {
LATEST = 0;
EARLIEST = 1;
}

message Resources {
double cpu = 1;
int64 ram = 2;
Expand Down Expand Up @@ -112,6 +117,7 @@ message SourceSpec {
string builtin = 8;
string subscriptionName = 9;
bool cleanupSubscription = 11;
SubscriptionPosition subscriptionPosition = 12;
}

message SinkSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
Expand Down Expand Up @@ -170,6 +171,12 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
sourceSpecBuilder.setCleanupSubscription(true);
}

if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
} else {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.LATEST);
}

functionDetailsBuilder.setSource(sourceSpecBuilder);

// set up sink spec
Expand Down

0 comments on commit 39af477

Please sign in to comment.