Skip to content

Commit

Permalink
Return the last position of the compacted data while the original dat…
Browse files Browse the repository at this point in the history
…a been deleted. (apache#12161)

Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted.

```
09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created subscription on topic xxx
09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, readPos=44946:1} to 66425:-1 since ledger consumed completely
09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 contains the current last confirmed entry 44946:0, and it is going to be deleted
09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End TrimConsumedLedgers. ledgers=1 totalSize=0
09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 44946 - size: 3999
```

After the rollover task, the topic internal stats will be:

```
{
    "entriesAddedCounter": 0,
    "numberOfEntries": 0,
    "totalSize": 0,
    "currentLedgerEntries": 0,
    "currentLedgerSize": 0,
    "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z",
    "waitingCursorsCount": 29,
    "pendingAddEntriesCount": 0,
    "lastConfirmedEntry": "44946:0",
    "state": "LedgerOpened",
    "ledgers": [
        {
            "ledgerId": 66425,
            "entries": 0,
            "size": 0,
            "offloaded": false,
            "underReplicated": false
        }
    ],
    "cursors": {
        "__compaction": {
            "markDeletePosition": "44946:0",
            "readPosition": "44946:1",
            "waitingReadOp": false,
            "pendingReadOps": 0,
            "messagesConsumedCounter": 0,
            "cursorLedger": -1,
            "cursorLedgerLastEntry": -1,
            "individuallyDeletedMessages": "[]",
            "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z",
            "state": "NoLedger",
            "numberOfEntriesSinceFirstNotAckedMessage": 1,
            "totalNonContiguousDeletedMessagesRange": 0,
            "subscriptionHavePendingRead": false,
            "subscriptionHavePendingReplayRead": false,
            "properties": {
                "CompactedTopicLedger": 64365
            }
        }
    },
    "schemaLedgers": [],
    "compactedLedger": {
        "ledgerId": 64365,
        "entries": 1,
        "size": 4024,
        "offloaded": false,
        "underReplicated": false
    }
}
```

At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted.

```
12:41:40.937 [pulsar-io-4-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created subscription on topic xxx / yyy
12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening ledger for reading at position 44946:0 - org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: No such ledger exists on Metadata Server
```

The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client.

Added the test for the new changes.
  • Loading branch information
codelipenghui authored Sep 24, 2021
1 parent e2997e8 commit 86e720f
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1716,12 +1716,33 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
// in this case, the ledgers been removed except the current ledger
// and current ledger without any data
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-1, -1, partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
if (entry != null) {
// in this case, all the data has been compacted, so return the last position
// in the compacted ledger to the client
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int bs = metadata.getNumMessagesInBatch();
int largestBatchIndex = bs > 0 ? bs - 1 : -1;
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
entry.getLedgerId(), entry.getLedgerId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
entry.release();
} else {
// in this case, the ledgers been removed except the current ledger
// and current ledger without any data
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-1, -1, partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
}
}).exceptionally(ex -> {
ctx.writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to read last entry of the compacted Ledger "
+ ex.getCause().getMessage()));
return null;
});
} else {
ctx.writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;
Expand All @@ -31,4 +32,5 @@ void asyncReadEntriesOrWait(ManagedCursor cursor,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
CompletableFuture<Entry> readLastEntryOfCompactedLedger();
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,18 @@ public Optional<CompactedTopicContext> getCompactedTopicContext() throws Executi
return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get());
}

@Override
public CompletableFuture<Entry> readLastEntryOfCompactedLedger() {
if (compactionHorizon == null) {
return CompletableFuture.completedFuture(null);
}
return compactedTopicContext.thenCompose(context ->
readEntries(context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
.thenCompose(entries -> entries.size() > 0
? CompletableFuture.completedFuture(entries.get(0))
: CompletableFuture.completedFuture(null)));
}

private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
Expand All @@ -56,8 +59,10 @@
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -360,4 +365,55 @@ public void testReadMessageFromCompactedLedger() throws Exception {
Assert.assertEquals(compactedMsgCount, 1);
Assert.assertEquals(nonCompactedMsgCount, numMessages);
}

@Test
public void testLastMessageIdForCompactedLedger() throws Exception {
String topic = "persistent://my-property/use/my-ns/testLastMessageIdForCompactedLedger-" + UUID.randomUUID();
final String key = "1";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
final int numMessages = 10;
final String msg = "test compaction msg";
for (int i = 0; i < numMessages; ++i) {
producer.newMessage().key(key).value(msg).send();
}
admin.topics().triggerCompaction(topic);
boolean succeed = retryStrategically((test) -> {
try {
return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);

Assert.assertTrue(succeed);

PersistentTopicInternalStats stats0 = admin.topics().getInternalStats(topic);
admin.topics().unload(topic);
PersistentTopicInternalStats stats1 = admin.topics().getInternalStats(topic);
// Make sure the ledger rollover has triggered.
Assert.assertTrue(stats0.currentLedgerSize != stats1.currentLedgerSize);

Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
Assert.assertTrue(topicRef.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();

Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(managedLedger.getCurrentLedgerEntries(), 0);
Assert.assertTrue(managedLedger.getLastConfirmedEntry().getEntryId() != -1);
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
});

Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();

Assert.assertTrue(reader.hasMessageAvailable());
Assert.assertEquals(msg, reader.readNext().getValue());
Assert.assertFalse(reader.hasMessageAvailable());
}
}

0 comments on commit 86e720f

Please sign in to comment.