diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index e6ba8887aedb6..ccd5a50dfd474 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1724,7 +1724,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { int bs = metadata.getNumMessagesInBatch(); int largestBatchIndex = bs > 0 ? bs - 1 : -1; ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - entry.getLedgerId(), entry.getLedgerId(), partitionIndex, largestBatchIndex, + entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex, markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); entry.release(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index b70f49536e507..4dc7bf3f51fdb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -413,7 +414,10 @@ public void testLastMessageIdForCompactedLedger() throws Exception { .create(); Assert.assertTrue(reader.hasMessageAvailable()); - Assert.assertEquals(msg, reader.readNext().getValue()); + Message received = reader.readNext(); + Assert.assertEquals(msg, received.getValue()); + MessageId messageId = ((ReaderImpl) reader).getConsumer().getLastMessageId(); + Assert.assertEquals(messageId, received.getMessageId()); Assert.assertFalse(reader.hasMessageAvailable()); } }