Skip to content

Commit

Permalink
[java-client] Introduce batchingMaxBytes setting in pulsar producer (
Browse files Browse the repository at this point in the history
…apache#5045)

* [client] Introduce `batchingMaxBytes` setting in pulsar producer

*Motivation*

The message size can vary between applications. Using number of messages to estimate
the resources used for batching leads to unpredictability.

This pull request introduces a new setting `batchingMaxBytes` in pulsar producer.
It allows applications planning the resource usage for batching in a better way.

* Address review comments

* Use MaxMessageSize

* Use MaxMessageSize as the cap for the buffer size

* Address review comments

* Fix tests

* Fix PerformanceProducer
  • Loading branch information
sijie authored and codelipenghui committed Nov 15, 2019
1 parent 320cebe commit dedfb2b
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,55 @@ public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressio
producer.close();
}

@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchBytes(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numBytesInBatch = 600;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-" + UUID.randomUUID();
final String subscriptionName = "sub-1" + compressionType.toString();

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.compressionType(compressionType)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxMessages(0)
.batchingMaxBytes(numBytesInBatch)
.enableBatching(true)
.batcherBuilder(builder)
.create();

List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
sendFutureList.add(producer.sendAsync(message));
}
FutureUtil.waitForAll(sendFutureList).get();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();

rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 2 messages in the backlog since we sent 50 messages with the batch size set to 25. We have set the
// batch time high enough for it to not affect the number of messages in the batch
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();

for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
Assert.assertEquals(receivedMessage, expectedMessage,
"Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
}
consumer.close();
producer.close();
}

@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ public interface ProducerBuilder<T> extends Cloneable {
* @param timeUnit
* the time unit of the {@code batchDelay}
* @return the producer builder instance
* @see #batchingMaxMessages(int)
* @see #batchingMaxBytes(int)
*/
ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit);

Expand All @@ -354,13 +356,29 @@ public interface ProducerBuilder<T> extends Cloneable {
* <p>All messages in batch will be published as a single batch message. The consumer will be delivered individual
* messages in the batch in the same order they were enqueued.
*
* @see #batchingMaxPublishDelay(long, TimeUnit)
* @param batchMessagesMaxMessagesPerBatch
* maximum number of messages in a batch
* @return the producer builder instance
* @see #batchingMaxPublishDelay(long, TimeUnit)
* @see #batchingMaxBytes(int)
*/
ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);

/**
* Set the maximum number of bytes permitted in a batch. <i>default: 128KB</i>
* If set to a value greater than 0, messages will be queued until this threshold is reached
* or other batching conditions are met.
*
* <p>All messages in a batch will be published as a single batched message. The consumer will be delivered
* individual messages in the batch in the same order they were enqueued.
*
* @param batchingMaxBytes maximum number of bytes in a batch
* @return the producer builder instance
* @see #batchingMaxPublishDelay(long, TimeUnit)
* @see #batchingMaxMessages(int)
*/
ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes);

/**
* Set the batcher builder {@link BatcherBuilder} of the producer. Producer will use the batcher builder to
* build a batch message container.This is only be used when batching is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
protected ProducerImpl producer;

protected int maxNumMessagesInBatch;
protected int maxBytesInBatch;
protected int numMessagesInBatch = 0;
protected long currentBatchSizeBytes = 0;

protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;

// This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
// allocate a new buffer that can hold the entire batch without needing costly reallocations
Expand All @@ -50,8 +50,16 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
int messageSize = msg.getDataBuffer().readableBytes();
return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES
&& numMessagesInBatch < maxNumMessagesInBatch);
return (
(maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize())
|| (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch)
) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);
}

protected boolean isBatchFull() {
return (maxBytesInBatch > 0 && currentBatchSizeBytes >= maxBytesInBatch)
|| (maxBytesInBatch <= 0 && currentBatchSizeBytes >= ClientCnx.getMaxMessageSize())
|| (maxNumMessagesInBatch > 0 && numMessagesInBatch >= maxNumMessagesInBatch);
}

@Override
Expand Down Expand Up @@ -83,5 +91,6 @@ public void setProducer(ProducerImpl<?> producer) {
.convertToWireProtocol(producer.getConfiguration().getCompressionType());
this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
this.maxNumMessagesInBatch = producer.getConfiguration().getBatchingMaxMessages();
this.maxBytesInBatch = producer.getConfiguration().getBatchingMaxBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public interface BatchMessageContainerBase extends BatchMessageContainer {
*
* @param msg message will add to the batch message container
* @param callback message send callback
* @return true if the batch is full, otherwise false
*/
void add(MessageImpl<?> msg, SendCallback callback);
boolean add(MessageImpl<?> msg, SendCallback callback);

/**
* Check the batch message container have enough space for the message want to add.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
protected SendCallback firstCallback;

@Override
public void add(MessageImpl<?> msg, SendCallback callback) {
public boolean add(MessageImpl<?> msg, SendCallback callback) {

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
Expand All @@ -72,7 +72,7 @@ public void add(MessageImpl<?> msg, SendCallback callback) {
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
.buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES));
.buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
}

if (previousCallback != null) {
Expand All @@ -81,12 +81,15 @@ public void add(MessageImpl<?> msg, SendCallback callback) {
previousCallback = callback;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
messages.add(msg);

if (lowestSequenceId == -1L) {
lowestSequenceId = msg.getSequenceId();
messageMetadata.setSequenceId(lowestSequenceId);
}
highestSequenceId = msg.getSequenceId();
producer.lastSequenceIdPushed = msg.getSequenceId();

return isBatchFull();
}

private ByteBuf getCompressedBatchMetadataAndPayload() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
private Map<String, KeyedBatch> batches = new HashMap<>();

@Override
public void add(MessageImpl<?> msg, SendCallback callback) {
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
numMessagesInBatch);
Expand All @@ -75,6 +75,7 @@ public void add(MessageImpl<?> msg, SendCallback callback) {
} else {
part.addMsg(msg, callback);
}
return isBatchFull();
}

@Override
Expand Down Expand Up @@ -220,7 +221,7 @@ private void addMsg(MessageImpl<?> msg, SendCallback callback) {
messageMetadata.setOrderingKey(ByteString.copyFrom(msg.getOrderingKey()));
}
batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
.buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES));
.buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
firstCallback = callback;
}
if (previousCallback != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ public ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBat
return this;
}

@Override
public ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes) {
conf.setBatchingMaxBytes(batchingMaxBytes);
return this;
}

@Override
public ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder) {
conf.setBatcherBuilder(batcherBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private volatile Timeout sendTimeout = null;
private volatile Timeout batchMessageAndSendTimeout = null;
private long createProducerTimeout;
private final int maxNumMessagesInBatch;
private final BatchMessageContainerBase batchMessageContainer;
private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);

Expand Down Expand Up @@ -175,15 +174,13 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration

this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
if (conf.isBatchingEnabled()) {
this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
BatcherBuilder containerBuilder = conf.getBatcherBuilder();
if (containerBuilder == null) {
containerBuilder = BatcherBuilder.DEFAULT;
}
this.batchMessageContainer = (BatchMessageContainerBase)containerBuilder.build();
this.batchMessageContainer.setProducer(this);
} else {
this.maxNumMessagesInBatch = 1;
this.batchMessageContainer = null;
}
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
Expand Down Expand Up @@ -401,11 +398,10 @@ public void sendAsync(Message<?> message, SendCallback callback) {
} else {
// handle boundary cases where message being added would exceed
// batch size and/or max message size
batchMessageContainer.add(msg, callback);
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
if (batchMessageContainer.getNumMessagesInBatch() == maxNumMessagesInBatch
|| batchMessageContainer.getCurrentBatchSize() >= BatchMessageContainerImpl.MAX_MESSAGE_BATCH_SIZE_BYTES) {
if (isBatchFull) {
batchMessageAndSend();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {

private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
private int batchingMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;
private int batchingMaxBytes = 128 * 1024; // 128KB (keep the maximum consistent as previous versions)
private boolean batchingEnabled = true; // enabled by default
@JsonIgnore
private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
Expand Down Expand Up @@ -129,10 +130,13 @@ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPa
}

public void setBatchingMaxMessages(int batchingMaxMessages) {
checkArgument(batchingMaxMessages > 1, "batchingMaxMessages needs to be > 1");
this.batchingMaxMessages = batchingMaxMessages;
}

public void setBatchingMaxBytes(int batchingMaxBytes) {
this.batchingMaxBytes = batchingMaxBytes;
}

public void setSendTimeoutMs(int sendTimeout, TimeUnit timeUnit) {
checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
this.sendTimeoutMs = timeUnit.toMillis(sendTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,6 @@ public void testProducerBuilderImplWhenBatchingMaxPublishDelayPropertyIsNegative
producerBuilderImpl.batchingMaxPublishDelay(-1, TimeUnit.MILLISECONDS);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testProducerBuilderImplWhenBatchingMaxMessagesPropertyIsNegative() {
producerBuilderImpl.batchingMaxMessages(-1);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testProducerBuilderImplWhenSendTimeoutPropertyIsNegative() {
producerBuilderImpl.sendTimeout(-1, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,15 @@ static class Arguments {
"--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)")
public double batchTimeMillis = 1.0;

@Parameter(names = { "-bn",
"--batch-max-msgs" }, description = "Number of max messages in batch.")
public int batchMaxMsgs = DEFAULT_BATCHING_MAX_MESSAGES;
@Parameter(names = {
"-bm", "--batch-max-messages"
}, description = "Maximum number of messages per batch")
public int batchMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;

@Parameter(names = {
"-bb", "--batch-max-bytes"
}, description = "Maximum number of bytes per batch")
public int batchMaxBytes = 4 * 1024 * 1024;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
Expand Down Expand Up @@ -415,14 +421,17 @@ private static void runProducer(Arguments arguments,
// enable round robin message routing if it is a partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);

if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMsgs == 0) {
if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages == 0) {
producerBuilder.enableBatching(false);
} else {
long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000);
producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true);
}
if (arguments.batchMaxMsgs > 0) {
producerBuilder.batchingMaxMessages(arguments.batchMaxMsgs);
if (arguments.batchMaxMessages > 0) {
producerBuilder.batchingMaxMessages(arguments.batchMaxMessages);
}
if (arguments.batchMaxBytes > 0) {
producerBuilder.batchingMaxBytes(arguments.batchMaxBytes);
}

// Block if queue is full else we will start seeing errors in sendAsync
Expand Down

0 comments on commit dedfb2b

Please sign in to comment.