Skip to content

Commit

Permalink
[pulsar-flink] Add subscription initial position (apache#4129)
Browse files Browse the repository at this point in the history
### Motivation

Allow user to specify the initial position for consumer source builder.

### Modifications

Add initial position for PulsarConsumerSource.
  • Loading branch information
codelipenghui authored and sijie committed Apr 26, 2019
1 parent b3596c4 commit 4b378a2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
Expand Down Expand Up @@ -71,6 +72,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI

private final long acknowledgementBatchSize;
private long batchCount;
private final SubscriptionInitialPosition initialPosition;

private transient volatile boolean isRunning;

Expand All @@ -83,6 +85,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
this.deserializer = builder.deserializationSchema;
this.subscriptionName = builder.subscriptionName;
this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
this.initialPosition = builder.initialPosition;
}

@Override
Expand Down Expand Up @@ -203,12 +206,14 @@ Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientExceptio
return client.newConsumer().topicsPattern(topicsPattern)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionInitialPosition(initialPosition)
.subscribe();
} else {
return client.newConsumer()
.topics(Lists.newArrayList(topicNames))
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionInitialPosition(initialPosition)
.subscribe();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;

import java.util.Arrays;
import java.util.List;
Expand All @@ -51,6 +52,7 @@ public class PulsarSourceBuilder<T> {
Pattern topicsPattern;
String subscriptionName = "flink-sub";
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Latest;

private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
Expand Down Expand Up @@ -153,6 +155,18 @@ public PulsarSourceBuilder<T> subscriptionName(String subscriptionName) {
return this;
}

/**
* Sets the subscription initial position for the topic consumer. Default is {@link SubscriptionInitialPosition#Latest}
*
* @param initialPosition the subscription initial position.
* @return this builder
*/
public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
Preconditions.checkNotNull(initialPosition,"subscription initial position cannot be null");
this.initialPosition = initialPosition;
return this;
}

/**
* Sets the number of messages to receive before acknowledging. This defaults to 100. This
* value is only used when checkpointing is disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -47,6 +48,7 @@ public void testBuild() {
.serviceUrl("testServiceUrl")
.topic("testTopic")
.subscriptionName("testSubscriptionName")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.build();
Assert.assertNotNull(sourceFunction);
}
Expand Down Expand Up @@ -112,6 +114,11 @@ public void testSubscriptionNameWithBlank() {
pulsarSourceBuilder.subscriptionName(" ");
}

@Test(expectedExceptions = NullPointerException.class)
public void testSubscriptionInitialPosition() {
pulsarSourceBuilder.subscriptionInitialPosition(null);
}

private class TestDeserializationSchema<T> implements DeserializationSchema<T> {

@Override
Expand Down

0 comments on commit 4b378a2

Please sign in to comment.