Skip to content

Commit

Permalink
Fix the wrong multi-topic has message available behavior (apache#13634)
Browse files Browse the repository at this point in the history
Fixes apache#13605

### Motivation

Currently, the multiTopicReader `hasMessageAvailable` might get the wrong result, we must check `numMessagesInQueue() > 0` again after finish all consumer `hasMessageAvaliableAsync` future, bacause some message might already in `MultiTopicsConsumerImpl#incomingMessages`. 

### Modifications

* Fix the wrong multi-topic has message available behavior.
* Use `reader.readNextAsync()` instead of block method `reader.readNext()`.
* Reduce the units test running time by changing `MultiTopicsReaderTest` to use `@BeforeClass`, `@AfterClass`.
  • Loading branch information
Demogorgon314 authored Jan 12, 2022
1 parent 1d9e7b5 commit e57dc8f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "flaky")
@Slf4j
@Test(groups = "flaky")
public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {

private static final String subscription = "reader-multi-topics-sub";

@BeforeMethod(alwaysRun = true)
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
Expand All @@ -87,7 +87,7 @@ protected void setup() throws Exception {
admin.namespaces().createNamespace("my-property/my-ns", policies);
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -169,15 +169,15 @@ public void testHasMessageAvailableAsync() throws Exception {
private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) {
reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
if (hasMessageAvailable) {
try {
Message<T> msg = reader.readNext();
reader.readNextAsync().whenComplete((msg, ex) -> {
if (ex != null) {
log.error("Read message failed.", ex);
latch.countDown();
return;
}
msgs.add(msg);
} catch (PulsarClientException e) {
log.error("Read message failed.", e);
latch.countDown();
return;
}
readMessageUseAsync(reader, msgs, latch);
readMessageUseAsync(reader, msgs, latch);
});
} else {
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
if (exception != null) {
completableFuture.completeExceptionally(exception);
} else {
completableFuture.complete(hasMessageAvailable.get());
completableFuture.complete(hasMessageAvailable.get() || numMessagesInQueue() > 0);
}
});
return completableFuture;
Expand Down

0 comments on commit e57dc8f

Please sign in to comment.