Skip to content

Commit

Permalink
Fix the problem that batchMessageId is converted to messageIdImpl (ap…
Browse files Browse the repository at this point in the history
…ache#8779)

Fixes apache#8712

### Motivation
TopicReaderTest.testHasMessageAvailable is flaky
Cause Analysis:
When there is only one message in the batch, calling Broker's `GetLastMessageId` will determine that this is not a batch message, and then return a `MessageIdImpl`. (See ServerCnx line 1553)
In the same scenario, the client will return a `BatchMessageId` after sending a message. (See ProducerImpl line 1151)

This will happen:
MessageIdImpl: `3:31:-1`
BatchMessageId: `3:31:-1:0`

When calling `reader.hasMessageAvailable()`, the two ids will be compared, like this: `lastMessageIdInBroker.compareTo(messageId)`
Although it is the same messageId, the results will not be equal.

### Modifications
When we call `admin.topics().getLastMessageId`, even if there is only one message in the batch, it is considered to be BatchMessageId. Client also acts like this.
Therefore, we keep consistent everywhere.
Even if there is only one message in the batch, we think it is `BatchMessageId`
  • Loading branch information
315157973 authored Dec 3, 2020
1 parent e7c7e2e commit 85f3ff4
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1550,15 +1550,15 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int batchSize = metadata.getNumMessagesInBatch();
entry.release();
return batchSize;
return metadata.hasNumMessagesInBatch() ? batchSize : -1;
});

batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
ctx.writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage()));
} else {
int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;
int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -982,6 +980,76 @@ public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive)
producer.close();
}

@Test(timeOut = 20000)
public void testHasMessageAvailableWithBatch() throws Exception {
final String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
final int numOfMessage = 10;

Producer<byte[]> producer = pulsarClient.newProducer()
.enableBatching(true)
.batchingMaxMessages(10)
.batchingMaxPublishDelay(2,TimeUnit.SECONDS)
.topic(topicName).create();

//For batch-messages with single message, the type of client messageId should be the same as that of broker
MessageId messageId = producer.send("msg".getBytes());
assertTrue(messageId instanceof MessageIdImpl);
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageId lastMsgId = reader.getConsumer().getLastMessageId();
assertTrue(lastMsgId instanceof BatchMessageIdImpl);
assertTrue(messageId instanceof BatchMessageIdImpl);
assertEquals(lastMsgId, messageId);
reader.close();

CountDownLatch latch = new CountDownLatch(numOfMessage);
List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
Assert.fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}
producer.flush();
latch.await();
producer.close();

//For batch-message with multi messages, the type of client messageId should be the same as that of broker
for (MessageId id : allIds) {
reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
if (id instanceof BatchMessageIdImpl) {
MessageId lastMessageId = reader.getConsumer().getLastMessageId();
assertTrue(lastMessageId instanceof BatchMessageIdImpl);
log.info("id {} instance of BatchMessageIdImpl",id);
} else {
assertTrue(id instanceof MessageIdImpl);
MessageId lastMessageId = reader.getConsumer().getLastMessageId();
assertTrue(lastMessageId instanceof MessageIdImpl);
log.info("id {} instance of MessageIdImpl",id);
}
reader.close();
}
//For non-batch message, the type of client messageId should be the same as that of broker
producer = pulsarClient.newProducer()
.enableBatching(false).topic(topicName).create();
messageId = producer.send("non-batch".getBytes());
assertFalse(messageId instanceof BatchMessageIdImpl);
assertTrue(messageId instanceof MessageIdImpl);
reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).create();
MessageId lastMessageId = reader.getConsumer().getLastMessageId();
assertFalse(lastMessageId instanceof BatchMessageIdImpl);
assertTrue(lastMessageId instanceof MessageIdImpl);
assertEquals(lastMessageId, messageId);
producer.close();
reader.close();
}

@Test
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
final int numOfMessage = 10;
Expand Down

0 comments on commit 85f3ff4

Please sign in to comment.