Skip to content

Commit

Permalink
fix consume failure when BatchReceivePolicy#maxNumBytes < message size (
Browse files Browse the repository at this point in the history
apache#14139)

(cherry picked from commit 88fc844)
  • Loading branch information
Jason918 authored and zymap committed Feb 11, 2022
1 parent 6089daa commit 0a635c5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,28 @@ public void verifyBatchSizeIsEqualToPolicyConfiguration() throws Exception {
receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(consumer, batchReceivePolicy, messagesToSend / muxNumMessages);
}

@Test
public void verifyNumBytesSmallerThanMessageSize() throws Exception {
final int messagesToSend = 500;

final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(10).build();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s2")
.batchReceivePolicy(batchReceivePolicy)
.subscribe();

sendMessagesAsyncAndWait(producer, messagesToSend);
CountDownLatch latch = new CountDownLatch(messagesToSend+1);
receiveAsync(consumer, messagesToSend, latch);
latch.await();
}


private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer,
BatchReceivePolicy batchReceivePolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ protected MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) {
}

protected boolean canAdd(Message<T> message) {
if (currentNumberOfMessages == 0) {
// It's ok to add at least one message into a batch.
return true;
}
if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) {
return false;
}
Expand Down

0 comments on commit 0a635c5

Please sign in to comment.