diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index 5522dd7e495a2..1473f28075316 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -403,6 +403,28 @@ public void verifyBatchSizeIsEqualToPolicyConfiguration() throws Exception { receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(consumer, batchReceivePolicy, messagesToSend / muxNumMessages); } + @Test + public void verifyNumBytesSmallerThanMessageSize() throws Exception { + final int messagesToSend = 500; + + final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID(); + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(10).build(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("s2") + .batchReceivePolicy(batchReceivePolicy) + .subscribe(); + + sendMessagesAsyncAndWait(producer, messagesToSend); + CountDownLatch latch = new CountDownLatch(messagesToSend+1); + receiveAsync(consumer, messagesToSend, latch); + latch.await(); + } + private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer consumer, BatchReceivePolicy batchReceivePolicy, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java index 4ff23eb46f5b6..bb0335b1f158b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java @@ -45,6 +45,10 @@ protected MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) { } protected boolean canAdd(Message message) { + if (currentNumberOfMessages == 0) { + // It's ok to add at least one message into a batch. + return true; + } if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) { return false; }