Skip to content

Commit

Permalink
[pulsar-perf] Add batching max message into perf-producer (apache#5440)
Browse files Browse the repository at this point in the history
* [pulsar-perf] Add batching max message into perf-producer

* add default values
  • Loading branch information
rdhabalia authored Oct 24, 2019
1 parent e97ea92 commit 0f57c60
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ public class ProducerConfigurationData implements Serializable, Cloneable {

private static final long serialVersionUID = 1L;

public static final int DEFAULT_BATCHING_MAX_MESSAGES = 1000;
public static final int DEFAULT_MAX_PENDING_MESSAGES = 1000;
public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;

private String topicName = null;
private String producerName = null;
private long sendTimeoutMs = 30000;
private boolean blockIfQueueFull = false;
private int maxPendingMessages = 1000;
private int maxPendingMessagesAcrossPartitions = 50000;
private int maxPendingMessages = DEFAULT_MAX_PENDING_MESSAGES;
private int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
private MessageRoutingMode messageRoutingMode = null;
private HashingScheme hashingScheme = HashingScheme.JavaStringHash;

Expand All @@ -66,7 +70,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private MessageRouter customMessageRouter = null;

private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
private int batchingMaxMessages = 1000;
private int batchingMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;
private boolean batchingEnabled = true; // enabled by default
@JsonIgnore
private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -126,10 +129,10 @@ static class Arguments {
public String authParams;

@Parameter(names = { "-o", "--max-outstanding" }, description = "Max number of outstanding messages")
public int maxOutstanding = 1000;
public int maxOutstanding = DEFAULT_MAX_PENDING_MESSAGES;

@Parameter(names = { "-p", "--max-outstanding-across-partitions" }, description = "Max number of outstanding messages across partitions")
public int maxPendingMessagesAcrossPartitions = 50000;
public int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;

@Parameter(names = { "-c",
"--max-connections" }, description = "Max number of TCP connections to a single broker")
Expand All @@ -156,6 +159,10 @@ static class Arguments {
@Parameter(names = { "-b",
"--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)")
public double batchTimeMillis = 1.0;

@Parameter(names = { "-bn",
"--batch-max-msgs" }, description = "Number of max messages in batch.")
public int batchMaxMsgs = DEFAULT_BATCHING_MAX_MESSAGES;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
Expand Down Expand Up @@ -401,12 +408,14 @@ private static void runProducer(Arguments arguments,
// enable round robin message routing if it is a partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);

if (arguments.batchTimeMillis == 0.0) {
if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMsgs == 0) {
producerBuilder.enableBatching(false);
} else {
long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000);
producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS)
.enableBatching(true);
producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true);
}
if (arguments.batchMaxMsgs > 0) {
producerBuilder.batchingMaxMessages(arguments.batchMaxMsgs);
}

// Block if queue is full else we will start seeing errors in sendAsync
Expand Down

0 comments on commit 0f57c60

Please sign in to comment.