Skip to content

Commit

Permalink
add pulsar-perf new feature: one subscription has more than one consu…
Browse files Browse the repository at this point in the history
…mers (apache#8837)

### Motivation


*The official performance test tool pulsar-perf now supports only one consumer per subscription, not multiple consumers per subscription.*

### Modifications

- A new parameter `numSubscriptions` was added, which specifies the number of subscriptions per Topic
- Change the definition of `numConsumers`: before: number of consumers per topic; After: the number of consumers per subscription
- add new feature: one subscription has more than one consumers
- add new parameter: `receiver-queue-size-across-partitions`, which means *Max total size of the receiver queue across partitions*
  • Loading branch information
Gjiangtao authored Dec 7, 2020
1 parent d418e02 commit 78c8e32
Showing 1 changed file with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ static class Arguments {
@Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
public int numTopics = 1;

@Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per topic)")
@Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive")
public int numConsumers = 1;

@Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
public int numSubscriptions = 1;

@Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix")
public String subscriberName = "sub";

Expand All @@ -99,6 +102,9 @@ static class Arguments {
@Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue")
public int receiverQueueSize = 1000;

@Parameter(names = { "-p", "--receiver-queue-size-across-partitions" }, description = "Max total size of the receiver queue across partitions")
public int maxTotalReceiverQueueSizeAcrossPartitions = 50000;

@Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
public boolean replicatedSubscription = false;

Expand Down Expand Up @@ -188,6 +194,12 @@ public static void main(String[] args) throws Exception {
System.exit(-1);
}

if (arguments.subscriptionType == SubscriptionType.Exclusive && arguments.numConsumers > 1) {
System.out.println("Only one consumer is allowed when subscriptionType is Exclusive");
jc.usage();
System.exit(-1);
}

if (arguments.confFile != null) {
Properties prop = new Properties(System.getProperties());
prop.load(new FileInputStream(arguments.confFile));
Expand Down Expand Up @@ -306,6 +318,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
.messageListener(listener) //
.receiverQueueSize(arguments.receiverQueueSize) //
.maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions)
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
.subscriptionType(arguments.subscriptionType)
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
Expand All @@ -328,26 +341,28 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
for (int i = 0; i < arguments.numTopics; i++) {
final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName
: TopicName.get(String.format("%s-%d", prefixTopicName, i));
log.info("Adding {} consumers on topic {}", arguments.numConsumers, topicName);
log.info("Adding {} consumers per subscription on topic {}", arguments.numConsumers, topicName);

for (int j = 0; j < arguments.numConsumers; j++) {
for (int j = 0; j < arguments.numSubscriptions; j++) {
String subscriberName;
if (arguments.numConsumers > 1) {
if (arguments.numSubscriptions > 1) {
subscriberName = String.format("%s-%d", arguments.subscriberName, j);
} else {
subscriberName = arguments.subscriberName;
}

futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
.subscribeAsync());
for (int k = 0; k < arguments.numConsumers; k++) {
futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
.subscribeAsync());
}
}
}

for (Future<Consumer<byte[]>> future : futures) {
future.get();
}

log.info("Start receiving from {} consumers on {} topics", arguments.numConsumers,
log.info("Start receiving from {} consumers per subscription on {} topics", arguments.numConsumers,
arguments.numTopics);

long start = System.nanoTime();
Expand Down

0 comments on commit 78c8e32

Please sign in to comment.