Skip to content

Commit

Permalink
Add subscribe initial position for consumer cli. (apache#6442)
Browse files Browse the repository at this point in the history
### Motivation

In some case, users expect to consume messages from beginning similar to the option `--from-beginning` of kafka consumer CLI. 

### Modifications

Add `--subscription-position` for `pulsar-client` and `pulsar-perf`.
  • Loading branch information
murong00 authored Mar 8, 2020
1 parent 5c2c058 commit 9fc5185
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
Expand Down Expand Up @@ -78,9 +79,12 @@ public class CmdConsume {
@Parameter(description = "TopicName", required = true)
private List<String> mainOptions = new ArrayList<String>();

@Parameter(names = { "-t", "--subscription-type" }, description = "Subscription type: Exclusive, Shared, Failover.")
@Parameter(names = { "-t", "--subscription-type" }, description = "Subscription type.")
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

@Parameter(names = { "-p", "--subscription-position" }, description = "Subscription position.")
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

@Parameter(names = { "-s", "--subscription-name" }, required = true, description = "Subscription name.")
private String subscriptionName;

Expand All @@ -95,7 +99,7 @@ public class CmdConsume {
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;

@Parameter(names = { "--regex" }, description = "Indicate thetopic name is a regex pattern")
@Parameter(names = { "--regex" }, description = "Indicate the topic name is a regex pattern")
private boolean isRegex = false;

private ClientBuilder clientBuilder;
Expand Down Expand Up @@ -182,7 +186,8 @@ private int consume(String topic) {
PulsarClient client = clientBuilder.build();
ConsumerBuilder<byte[]> builder = client.newConsumer()
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType);
.subscriptionType(subscriptionType)
.subscriptionInitialPosition(subscriptionInitialPosition);

if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,9 +87,12 @@ static class Arguments {
@Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix")
public String subscriberName = "sub";

@Parameter(names = { "-st", "--subscription-type" }, description = "Subscriber name prefix")
@Parameter(names = { "-st", "--subscription-type" }, description = "Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Exclusive;

@Parameter(names = { "-sp", "--subscription-position" }, description = "Subscription position")
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

@Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message consumer (rate in msg/s)")
public double rate = 0;

Expand Down Expand Up @@ -257,6 +261,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
.receiverQueueSize(arguments.receiverQueueSize) //
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
.subscriptionType(arguments.subscriptionType)
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
.replicateSubscriptionState(arguments.replicatedSubscription);

if (arguments.encKeyName != null) {
Expand Down
5 changes: 4 additions & 1 deletion site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,10 @@ Options
|`--hex`|Display binary messages in hexadecimal format.|false|
|`-n`, `--num-messages`|Number of messages to consume, 0 means to consume forever.|1|
|`-r`, `--rate`|Rate (in messages per second) at which to consume; a value 0 means to consume messages as fast as possible|0.0|
|`--regex`|Indicate the topic name is a regex pattern|false|
|`-s`, `--subscription-name`|Subscription name||
|`-t`, `--subscription-type`|The type of the subscription. Possible values: Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-p`, `--subscription-position`|The position of the subscription. Possible values: Latest, Earliest.|Latest|



Expand Down Expand Up @@ -426,7 +428,8 @@ Options
|`-u`, `--service-url`|Pulsar service URL||
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
|`-st`, `--subscription-type`|Subscriber name prefix. Possible values are Exclusive, Shared, Failover.|Exclusive|
|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
|`--trust-cert-file`|Path for the trusted TLS certificate file||


Expand Down

0 comments on commit 9fc5185

Please sign in to comment.