Skip to content

Commit

Permalink
[Issue 8340] [pulsar-testclient] Fix to support to specify topics and…
Browse files Browse the repository at this point in the history
… subscriptions (apache#9716)

### Motivation

Fixes apache#8340

### Modifications

- Fix option `main parameter` for CLI `pulsar-perf produce/consume/read` to specify multi topics.
- Add option `--subscriptions` for CLI `pulsar-perf` consume to specify multi subscriptions.
- Deprecate and hide the option `--subscriber-name`.
- Modify the corresponding doc.
  • Loading branch information
murong00 authored Mar 3, 2021
1 parent 38a1f1d commit e137f26
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.io.FileInputStream;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -84,9 +83,13 @@ static class Arguments {
@Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
public int numSubscriptions = 1;

@Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix")
@Deprecated
@Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true)
public String subscriberName = "sub";

@Parameter(names = { "-ss", "--subscriptions" }, description = "A list of subscriptions to consume on (e.g. sub1,sub2)")
public List<String> subscriptions = Collections.singletonList("sub");

@Parameter(names = { "-st", "--subscription-type" }, description = "Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Exclusive;

Expand All @@ -105,7 +108,7 @@ static class Arguments {
@Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
public boolean replicatedSubscription = false;

@Parameter(names = { "--acks-delay-millis" }, description = "Acknowlegments grouping delay in millis")
@Parameter(names = { "--acks-delay-millis" }, description = "Acknowledgements grouping delay in millis")
public int acknowledgmentsGroupingDelayMillis = 100;

@Parameter(names = { "-c",
Expand Down Expand Up @@ -185,8 +188,8 @@ public static void main(String[] args) throws Exception {
System.exit(-1);
}

if (arguments.topic.size() != 1) {
System.out.println("Only one topic name is allowed");
if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) {
System.out.println("The size of topics list should be equal to --num-topics");
jc.usage();
System.exit(-1);
}
Expand All @@ -197,6 +200,14 @@ public static void main(String[] args) throws Exception {
System.exit(-1);
}

if (arguments.subscriptionType != SubscriptionType.Exclusive &&
arguments.subscriptions != null &&
arguments.subscriptions.size() != arguments.numConsumers) {
System.out.println("The size of subscriptions list should be equal to --num-consumers when subscriptionType isn't Exclusive");
jc.usage();
System.exit(-1);
}

if (arguments.confFile != null) {
Properties prop = new Properties(System.getProperties());
prop.load(new FileInputStream(arguments.confFile));
Expand Down Expand Up @@ -238,8 +249,6 @@ public static void main(String[] args) throws Exception {
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Starting Pulsar performance consumer with config: {}", w.writeValueAsString(arguments));

final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0));

final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
Expand Down Expand Up @@ -314,18 +323,12 @@ public static void main(String[] args) throws Exception {
}

for (int i = 0; i < arguments.numTopics; i++) {
final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
: TopicName.get(String.format("%s-%d", prefixTopicName, i));
final TopicName topicName = TopicName.get(arguments.topic.get(i));

log.info("Adding {} consumers per subscription on topic {}", arguments.numConsumers, topicName);

for (int j = 0; j < arguments.numSubscriptions; j++) {
String subscriberName;
if (arguments.numSubscriptions > 1) {
subscriberName = String.format("%s-%d", arguments.subscriberName, j);
} else {
subscriberName = arguments.subscriberName;
}

String subscriberName = arguments.subscriptions.get(j);
for (int k = 0; k < arguments.numConsumers; k++) {
futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
.subscribeAsync());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -238,8 +237,8 @@ public static void main(String[] args) throws Exception {
System.exit(-1);
}

if (arguments.topics.size() != 1) {
System.out.println("Only one topic name is allowed");
if (arguments.topics != null && arguments.topics.size() != arguments.numTopics) {
System.out.println("The size of topics list should be equal to --num-topic");
jc.usage();
System.exit(-1);
}
Expand Down Expand Up @@ -401,7 +400,6 @@ private static void runProducer(Arguments arguments,
PulsarClient client = null;
try {
// Now processing command line arguments
String prefixTopicName = arguments.topics.get(0);
List<Future<Producer<byte[]>>> futures = Lists.newArrayList();

ClientBuilder clientBuilder = PulsarClient.builder() //
Expand Down Expand Up @@ -455,7 +453,7 @@ private static void runProducer(Arguments arguments,
}

for (int i = 0; i < arguments.numTopics; i++) {
String topic = (arguments.numTopics == 1) ? prefixTopicName : String.format("%s-%d", prefixTopicName, i);
String topic = arguments.topics.get(i);
log.info("Adding {} publishers on topic {}", arguments.numProducers, topic);

for (int j = 0; j < arguments.numProducers; j++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public static void main(String[] args) throws Exception {
System.exit(-1);
}

if (arguments.topic.size() != 1) {
System.out.println("Only one topic name is allowed");
if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) {
System.out.println("The size of topics list should be equal to --num-topics");
jc.usage();
System.exit(-1);
}
Expand Down Expand Up @@ -189,8 +189,6 @@ public static void main(String[] args) throws Exception {
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Starting Pulsar performance reader with config: {}", w.writeValueAsString(arguments));

final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0));

final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
Expand Down Expand Up @@ -249,8 +247,7 @@ public static void main(String[] args) throws Exception {
.startMessageId(startMessageId);

for (int i = 0; i < arguments.numTopics; i++) {
final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
: TopicName.get(String.format("%s-%d", prefixTopicName, i));
final TopicName topicName = TopicName.get(arguments.topic.get(i));

futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync());
}
Expand Down
8 changes: 4 additions & 4 deletions site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,19 +429,19 @@ Options
|`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
|`--auth_plugin`|Authentication plugin class name||
|`--listener-name`|Listener name for the broker||
|`--acks-delay-millis`|Acknowlegments grouping delay in millis|100|
|`--acks-delay-millis`|Acknowledgements grouping delay in millis|100|
|`-k`, `--encryption-key-name`|The private key name to decrypt payload||
|`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
|`-h`, `--help`|Help message|false|
|`--conf-file`|Configuration file||
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-n`, `--num-consumers`|Number of consumers (per topic)|1|
|`-t`, `--num-topic`|The number of topics|1|
|`-t`, `--num-topics`|The number of topics|1|
|`-r`, `--rate`|Simulate a slow message consumer (rate in msg/s)|0|
|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
|`-u`, `--service-url`|Pulsar service URL||
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
|`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
Expand Down Expand Up @@ -504,7 +504,7 @@ Options
|`--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-t`, `--num-topic`|The number of topics|1|
|`-t`, `--num-topics`|The number of topics|1|
|`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|
|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
|`-u`, `--service-url`|Pulsar service URL||
Expand Down

0 comments on commit e137f26

Please sign in to comment.