Skip to content

Commit

Permalink
fix compaction entry read exception (apache#11175)
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 authored Jul 2, 2021
1 parent a85fb1c commit c716495
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRe
if (startPoint == NEWER_THAN_COMPACTED) {
cursor.seek(compactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), ctx);
return CompletableFuture.completedFuture(null);
}
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
Expand Down Expand Up @@ -305,4 +307,57 @@ public void testCompactWithEmptyMessage(boolean batchEnabled) throws Exception {

producer.close();
}

@Test(timeOut = 30000)
public void testReadMessageFromCompactedLedger() throws Exception {
final String key = "1";
String msg = "test compaction msg";
final String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 1);
final int numMessages = 10;

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
for (int i = 0; i < numMessages; ++i) {
producer.newMessage().key(key).value(msg).send();
}

admin.topics().triggerCompaction(topic);
boolean succeed = retryStrategically((test) -> {
try {
return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);

Assert.assertTrue(succeed);


final String newKey = "2";
String newMsg = "test compaction msg v2";
for (int i = 0; i < numMessages; ++i) {
producer.newMessage().key(newKey).value(newMsg).send();
}

Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();

int compactedMsgCount = 0;
int nonCompactedMsgCount = 0;
while (reader.hasMessageAvailable()) {
Message<String> message = reader.readNext();
if (key.equals(message.getKey()) && msg.equals(message.getValue())) {
compactedMsgCount++;
} else if (newKey.equals(message.getKey()) && newMsg.equals(message.getValue())) {
nonCompactedMsgCount++;
}
}

Assert.assertEquals(compactedMsgCount, 1);
Assert.assertEquals(nonCompactedMsgCount, numMessages);
}
}

0 comments on commit c716495

Please sign in to comment.