Skip to content

Commit

Permalink
[Java Client] Optimize batch flush scheduling (apache#14185)
Browse files Browse the repository at this point in the history
* Java client: only schedule batch when there are pending messages

* Update variable name; handle reconnect case

* Remove code duplication

* Fix method name

* Improve method name; add tests; fix bugs raised by tests

* Remove comment that is now incorrect

* Chain batch flush task when task triggers delivery

* Prevent early batch flush by storing lastBatchSendNanos

* Refactor getPendingQueueSize

* Remove unintentionally committed comments

* Remove premature optimization to cancle and reschedule batchFlushTask

* Ensure buffered batchMessageContainer messages can fail due to send timeout

* Fix comment to match new design

* Fix batch message counting log line

* Guard against null batchMessageContainer

* Fix send timeout logic to fire when batching is disabled

Master Issue: apache#11100

### Motivation

As observed in apache#11100, the Java client producer consumes cpu even when doing nothing. This is especially true when using many producers. Instead, we can make it so that the batch timer is only scheduled when it has messages to deliver or just fired and delivered messages.

If there are concerns about this optimization, I will need to take some time to complete benchmarks.

### Modifications

* Skip message batch delivery if the producer is not in `Ready` state. As a consequence, ensure that messages pending in the `batchMessageContainer` are still eligible to fail for `sendTimeout`.
* If there is no current batch flush task, schedule a single flush task when a message is added to a batch message container (assuming the message does not also trigger the delivery of the batch and the producer is in READY state).
* Schedule another batch flush task if and only if the batch flush task itself was responsible for sending messages.
* Keep track of `lastBatchSendNanoTime`, and only flush a batch message container if the `BatchingMaxPublishDelayMicros` time has passed since the last send time.
* Note that the timer task is only ever updated within `synchronized (this)` block, so we are guaranteed to have a consistent view free of race conditions. (There is one race condition, and that is that an existing batch timer might get canceled while it is starting. This is of little consequence since it'll result in either no delivery or a small batch.)

### Verifying this change

There are existing tests that cover batch message delivery. Specifically, the `BatchMessageTest` in the `pulsar-broker` module covers these changes.

### Does this pull request potentially affect one of the following parts:

This is a backwards compatible change.

### Documentation

- [x] `no-need-doc`
  • Loading branch information
michaeljmarshall authored Mar 24, 2022
1 parent 0fe921f commit b21e548
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -328,7 +329,7 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType, B
for (int i = 0; i <= numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
LOG.info("received msg - {}", Arrays.toString(msg.getData()));
LOG.info("received msg size: {}", msg.getData().length);
consumer.acknowledge(msg);
}
Thread.sleep(100);
Expand Down Expand Up @@ -421,7 +422,7 @@ public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder

rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 10 messages in the backlog since we sent 10 messages with the batch size set to 5.
// we would expect 2 messages in the backlog since we sent 10 messages with the batch size set to 5.
// However, we are using synchronous send and so each message will go as an individual message
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 10);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Expand All @@ -438,6 +439,59 @@ public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder
producer.close();
}

@Test(dataProvider = "containerBuilder")
public void testSimpleBatchProducerWithStoppingAndStartingBroker(BatcherBuilder builder) throws Exception {
// Send enough messages to trigger one batch by size and then have a remaining message in the batch container
int numMsgs = 3;
int numMsgsInBatch = 2;
final String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-" + UUID.randomUUID();
final String subscriptionName = "syncsub-1";

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
.batchingMaxMessages(numMsgsInBatch)
.enableBatching(true)
.batcherBuilder(builder)
.create();

stopBroker();

List<CompletableFuture<MessageId>> messages = new ArrayList<>();
for (int i = 0; i < numMsgs; i++) {
byte[] message = ("my-message-" + i).getBytes();
messages.add(producer.sendAsync(message));
}

startBroker();

// Fail if any one message fails to get acknowledged
FutureUtil.waitForAll(messages).get(30, TimeUnit.SECONDS);

Awaitility.await().timeout(30, TimeUnit.SECONDS)
.until(() -> pulsar.getBrokerService().getTopicReference(topicName).isPresent());

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();

rolloverPerIntervalStats();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();

for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
Assert.assertEquals(receivedMessage, expectedMessage,
"Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
}
consumer.close();
producer.close();
}

@Test(dataProvider = "containerBuilder")
public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) throws Exception {
int numMsgs = 2000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne

private final ConnectionHandler connectionHandler;

private ScheduledFuture<?> batchTimerTask;
// A batch flush task is scheduled when one of the following is true:
// - A message is added to a message batch without also triggering a flush for that batch.
// - A batch flush task executes with messages in the batchMessageContainer, thus actually triggering messages.
// - A message was sent more recently than the configured BatchingMaxPublishDelayMicros. In this case, the task is
// scheduled to run BatchingMaxPublishDelayMicros after the most recent send time.
// The goal is to optimize batch density while also ensuring that a producer never waits longer than the configured
// batchingMaxPublishDelayMicros to send a batch.
// Only update from within synchronized block on this producer.
private ScheduledFuture<?> batchFlushTask;
// The time, in nanos, of the last batch send. This field ensures that we don't deliver batches via the
// batchFlushTask before the batchingMaxPublishDelayMicros duration has passed.
private long lastBatchSendNanoTime;

private Optional<Long> topicEpoch = Optional.empty();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
Expand Down Expand Up @@ -630,7 +641,9 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
lastSendFuture = callback.getFuture();
payload.release();
if (isBatchFull) {
batchMessageAndSend();
batchMessageAndSend(false);
} else {
maybeScheduleBatchFlushTask();
}
}
isLastSequenceIdPotentialDuplicated = false;
Expand Down Expand Up @@ -820,7 +833,7 @@ private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback, ByteBu
msg.getUncompressedSize());
}
try {
batchMessageAndSend();
batchMessageAndSend(false);
batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
} finally {
Expand Down Expand Up @@ -1647,29 +1660,6 @@ public void connectionOpened(final ClientCnx cnx) {
this.msgIdGenerator = lastSequenceId + 1;
}

if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
// schedule the first batch message task
batchTimerTask = cnx.ctx().executor()
.scheduleWithFixedDelay(catchingAndLoggingThrowables(() -> {
if (log.isTraceEnabled()) {
log.trace(
"[{}] [{}] Batching the messages from the batch container from "
+ "timer thread",
topic,
producerName);
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
// If it's closing/closed we need to ignore the send batch timer and not
// schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}

batchMessageAndSend();
}
}), 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
resendMessages(cnx, epoch);
}
}).exceptionally((e) -> {
Expand Down Expand Up @@ -1796,12 +1786,6 @@ private void closeProducerTasks() {
sendTimeout = null;
}

ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
if (batchTimerTask != null) {
batchTimerTask.cancel(false);
this.batchTimerTask = null;
}

if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
keyGeneratorTask.cancel(false);
}
Expand Down Expand Up @@ -1912,24 +1896,37 @@ public void run(Timeout timeout) throws Exception {
}

OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg == null) {
if (firstMsg == null && (batchMessageContainer == null || batchMessageContainer.isEmpty())) {
// If there are no pending messages, reset the timeout to the configured value.
timeToWaitMs = conf.getSendTimeoutMs();
} else {
long createdAt;
if (firstMsg != null) {
createdAt = firstMsg.createdAt;
} else {
// Because we don't flush batch messages while disconnected, we consider them "createdAt" when
// they would have otherwise been flushed.
createdAt = lastBatchSendNanoTime
+ TimeUnit.MICROSECONDS.toNanos(conf.getBatchingMaxPublishDelayMicros());
}
// If there is at least one message, calculate the diff between the message timeout and the elapsed
// time since first message was created.
long diff = conf.getSendTimeoutMs()
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstMsg.createdAt);
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - createdAt);
if (diff <= 0) {
// The diff is less than or equal to zero, meaning that the message has been timed out.
// Set the callback to timeout on every message, then clear the pending queue.
log.info("[{}] [{}] Message send timed out. Failing {} messages", topic, producerName,
pendingMessages.messagesCount());
getPendingQueueSize());
String msg = format("The producer %s can not send message to the topic %s within given timeout",
producerName, topic);
if (firstMsg != null) {
PulsarClientException te = new PulsarClientException.TimeoutException(msg, firstMsg.sequenceId);
failPendingMessages(cnx(), te);
} else {
failPendingBatchMessages(new PulsarClientException.TimeoutException(msg));
}

PulsarClientException te = new PulsarClientException.TimeoutException(
format("The producer %s can not send message to the topic %s within given timeout",
producerName, topic), firstMsg.sequenceId);
failPendingMessages(cnx(), te);
// Since the pending queue is cleared now, set timer to expire after configured value.
timeToWaitMs = conf.getSendTimeoutMs();
} else {
Expand Down Expand Up @@ -2005,7 +2002,7 @@ private void failPendingBatchMessages(PulsarClientException ex) {
public CompletableFuture<Void> flushAsync() {
synchronized (ProducerImpl.this) {
if (isBatchMessagingEnabled()) {
batchMessageAndSend();
batchMessageAndSend(false);
}
CompletableFuture<MessageId> lastSendFuture = this.lastSendFuture;
if (!(lastSendFuture == this.lastSendFutureWrapper.lastSendFuture)) {
Expand All @@ -2020,19 +2017,57 @@ public CompletableFuture<Void> flushAsync() {
protected void triggerFlush() {
if (isBatchMessagingEnabled()) {
synchronized (ProducerImpl.this) {
batchMessageAndSend();
batchMessageAndSend(false);
}
}
}

// must acquire semaphore before calling
private void maybeScheduleBatchFlushTask() {
if (this.batchFlushTask != null || getState() != State.Ready) {
return;
}
scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros());
}

// must acquire semaphore before calling
private void scheduleBatchFlushTask(long batchingDelayMicros) {
ClientCnx cnx = cnx();
if (cnx != null) {
this.batchFlushTask = cnx.ctx().executor().schedule(catchingAndLoggingThrowables(this::batchFlushTask),
batchingDelayMicros, TimeUnit.MICROSECONDS);
}
}

private synchronized void batchFlushTask() {
if (log.isTraceEnabled()) {
log.trace("[{}] [{}] Batching the messages from the batch container from flush thread",
topic, producerName);
}
this.batchFlushTask = null;
// If we're not ready, don't schedule another flush and don't try to send.
if (getState() != State.Ready) {
return;
}
// If a batch was sent more recently than the BatchingMaxPublishDelayMicros, schedule another flush to run just
// at BatchingMaxPublishDelayMicros after the last send.
long microsSinceLastSend = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - lastBatchSendNanoTime);
if (microsSinceLastSend < conf.getBatchingMaxPublishDelayMicros()) {
scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros() - microsSinceLastSend);
return;
}
batchMessageAndSend(true);
}

// must acquire semaphore before enqueuing
private void batchMessageAndSend() {
private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) {
if (log.isTraceEnabled()) {
log.trace("[{}] [{}] Batching the messages from the batch container with {} messages", topic, producerName,
batchMessageContainer.getNumMessagesInBatch());
}
if (!batchMessageContainer.isEmpty()) {
try {
lastBatchSendNanoTime = System.nanoTime();
List<OpSendMsg> opSendMsgs;
if (batchMessageContainer.isMultiBatches()) {
opSendMsgs = batchMessageContainer.createOpSendMsgs();
Expand All @@ -2048,6 +2083,10 @@ private void batchMessageAndSend() {
} catch (Throwable t) {
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
} finally {
if (shouldScheduleNextBatchFlush) {
maybeScheduleBatchFlushTask();
}
}
}
}
Expand All @@ -2058,7 +2097,7 @@ protected void processOpSendMsg(OpSendMsg op) {
}
try {
if (op.msg != null && isBatchMessagingEnabled()) {
batchMessageAndSend();
batchMessageAndSend(false);
}
if (isMessageSizeExceeded(op)) {
return;
Expand Down Expand Up @@ -2153,6 +2192,11 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
cnx.channel().close();
return;
}
// If any messages were enqueued while the producer was not Ready, we would have skipped
// scheduling the batch flush task. Schedule it now, if there are messages in the batch container.
if (isBatchMessagingEnabled() && !batchMessageContainer.isEmpty()) {
maybeScheduleBatchFlushTask();
}
if (pendingRegisteringOp != null) {
tryRegisterSchema(cnx, pendingRegisteringOp.msg, pendingRegisteringOp.callback, expectedEpoch);
}
Expand Down Expand Up @@ -2193,6 +2237,11 @@ public String getConnectedSince() {
}

public int getPendingQueueSize() {
if (isBatchMessagingEnabled()) {
synchronized (this) {
return pendingMessages.messagesCount() + batchMessageContainer.getNumMessagesInBatch();
}
}
return pendingMessages.messagesCount();
}

Expand Down

0 comments on commit b21e548

Please sign in to comment.