Skip to content

Commit

Permalink
hasMessageAvailable should return false when Reader read from lates…
Browse files Browse the repository at this point in the history
…t messages (apache#5023)

Fixes apache#4912

*Motivation*

When the reader read from the latest message, `hasMessageAvailable` will timeout or hang on.

*Modifications*

Make `lastDequeuedMessage` start position should keep with `startMessageId`.
  • Loading branch information
zymap authored and sijie committed Nov 19, 2019
1 parent 568df47 commit bfcfdf6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,40 @@ public void testReadMessageWithoutBatching() throws Exception {
testReadMessages(topic, false);
}

@Test
public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-inclusive";
Set<String> keys = publishMessages(topic, 10, false);

Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.startMessageIdInclusive().readerName(subscription).create();

Assert.assertTrue(reader.hasMessageAvailable());
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
Assert.assertFalse(reader.hasMessageAvailable());
}

@Test
public void testReadMessageWithBatching() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching";
testReadMessages(topic, true);
}

@Test
public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
Set<String> keys = publishMessages(topic, 10, true);

Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.startMessageIdInclusive().readerName(subscription).create();

while (reader.hasMessageAvailable()) {
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
}
Assert.assertTrue(keys.isEmpty());
Assert.assertFalse(reader.hasMessageAvailable());
}

private void testReadMessages(String topic, boolean enableBatch) throws Exception {
int numKeys = 10;

Expand All @@ -125,6 +153,10 @@ private void testReadMessages(String topic, boolean enableBatch) throws Exceptio
Assert.assertTrue(keys.remove(message.getKey()));
}
Assert.assertTrue(keys.isEmpty());

Reader<byte[]> readLatest = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.readerName(subscription + "latest").create();
Assert.assertFalse(readLatest.hasMessageAvailable());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId;
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
AVAILABLE_PERMITS_UPDATER.set(this, 0);
Expand Down Expand Up @@ -1450,6 +1451,12 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
}

public boolean hasMessageAvailable() throws PulsarClientException {
// we need to seek to the last position then the last message can be received when the resetIncludeHead
// specified.
if (lastDequeuedMessage == MessageId.latest && resetIncludeHead) {
lastDequeuedMessage = getLastMessageId();
seek(lastDequeuedMessage);
}
try {
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessage)) {
return true;
Expand Down

0 comments on commit bfcfdf6

Please sign in to comment.