Skip to content

Commit

Permalink
Fix reader skipped remaining compacted data during the topic unloadin…
Browse files Browse the repository at this point in the history
…g. (apache#13629)

### Motivation

To fix the reader skipping remaining compacted data while the topic has been unloaded.
apache#11287 fixed the data skipped issue while the reader first time to read the messages
with the earliest position. But if the reader has consumed some messages from the
compacted ledger but not all, the start position will not be `earliest`, the broker
will rewind the cursor for the reader to the next valid position of the original topic.
So the remaining messages in the compacted ledger will be skipped.

Here are the logs from the broker:

```
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://xxx/product-full-prod/5126 - dedup is disabled
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://xxx/product-full-prod/5126][xxx] Creating non-durable subscription at msg id 181759:14:-1:-1
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl - [xxx/product-full-prod/persistent/5126] Created non-durable cursor read-position=221199:0 mark-delete-position=181759:13
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx/product-full-prod/persistent/5126] Opened new cursor: NonDurableCursorImpl{ledger=xxx/product-full-prod/persistent/5126, ackPos=181759:13, readPos=221199:0}
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [xxx/product-full-prod/persistent/5126-xxx] Rewind from 221199:0 to 221199:0
```

There some many compacted messages after `181759:13`, but the broker will not dispatch them to the reader.
The issue also can be reproduced by the unit test that was added in this PR.

### Modification

If the cursor with `readCompacted = true`, just rewind to the next message of the mark delete position,
so that the reader can continue to read the data from the compacted ledger.

### Verification

A new test added for testing the reader can get all the compacted messages and non-compacted messages from the topic during the topic unloading.
  • Loading branch information
codelipenghui authored Jan 7, 2022
1 parent fd3ba55 commit 07f131f
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ public interface ManagedLedger {
*/
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition,
boolean isReadCompacted) throws ManagedLedgerException;

/**
* Delete a ManagedCursor asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,11 +1033,12 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws Ma

@Override
public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException {
return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest);
return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false);
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition)
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition,
boolean isReadCompacted)
throws ManagedLedgerException {
Objects.requireNonNull(cursorName, "cursor name can't be null");
checkManagedLedgerIsOpen();
Expand All @@ -1052,7 +1053,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
}

NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
(PositionImpl) startCursorPosition, initialPosition);
(PositionImpl) startCursorPosition, initialPosition, isReadCompacted);
cursor.setActive();

log.info("[{}] Opened new cursor: {}", name, cursor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@

public class NonDurableCursorImpl extends ManagedCursorImpl {

private volatile boolean readCompacted;
private final boolean readCompacted;

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) {
super(bookkeeper, config, ledger, cursorName);
this.readCompacted = isReadCompacted;

// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
Expand Down Expand Up @@ -67,7 +68,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {

private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
this.readPosition = ledger.getNextValidPosition(mdPosition);
this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
markDeletePosition = mdPosition;

// Initialize the counter such that the difference between the messages written on the ML and the
Expand Down Expand Up @@ -118,14 +119,22 @@ public void asyncDeleteCursor(final String consumerName, final DeleteCursorCallb
callback.deleteCursorComplete(ctx);
}

public void setReadCompacted(boolean readCompacted) {
this.readCompacted = readCompacted;
}

public boolean isReadCompacted() {
return readCompacted;
}

@Override
public void rewind() {
// For reading the compacted data,
// we couldn't reset the read position to the next valid position of the original topic.
// Otherwise, the remaining data in the compacted ledger will be skipped.
if (!readCompacted) {
super.rewind();
} else {
readPosition = markDeletePosition.getNext();
}
}

@Override
public synchronized String toString() {
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
Expand Down Expand Up @@ -181,9 +180,6 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
consumer.notifyActiveConsumerChange(currentActiveConsumer);
}
}
if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) {
((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec);
startMessageRollbackDurationSec, readCompacted);

int maxUnackedMessages = isDurable
? getMaxUnackedMessagesOnConsumer()
Expand Down Expand Up @@ -921,7 +921,8 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
}

private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec) {
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec,
boolean isReadCompacted) {
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);

CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -954,7 +955,8 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition);
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
isReadCompacted);
} catch (ManagedLedgerException e) {
return FutureUtil.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,4 +734,59 @@ public void testHasMessageAvailableWithNullValueMessage() throws Exception {
Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
}

public void testReadCompleteMessagesDuringTopicUnloading() throws Exception {
String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" +
UUID.randomUUID();
final int numMessages = 1000;
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.blockIfQueueFull(true)
.enableBatching(false)
.create();
CompletableFuture<MessageId> lastMessage = null;
for (int i = 0; i < numMessages; ++i) {
lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync();
}
producer.flush();
lastMessage.join();
admin.topics().triggerCompaction(topic);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
Assert.assertEquals(stats.compactedLedger.entries, numMessages);
Assert.assertEquals(admin.topics().getStats(topic)
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
});
// Unload the topic to make sure the original ledger been deleted.
admin.topics().unload(topic);
// Produce more messages to the original topic
for (int i = 0; i < numMessages; ++i) {
lastMessage = producer.newMessage().key(i + numMessages + "").value(String.format("msg [%d]", i + numMessages)).sendAsync();
}
producer.flush();
lastMessage.join();
// For now the topic has 1000 messages in the compacted ledger and 1000 messages in the original topic.
@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageIdInclusive()
.startMessageId(MessageId.earliest)
.readCompacted(true)
.create();

// Unloading the topic during reading the data to make sure the reader will not miss any messages.
for (int i = 0; i < numMessages / 2; ++i) {
Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i));
}
admin.topics().unload(topic);
for (int i = 0; i < numMessages / 2; ++i) {
Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages / 2));
}
admin.topics().unload(topic);
for (int i = 0; i < numMessages; ++i) {
Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public ManagedCursor newNonDurableCursor(Position startPosition, String subscrip

@Override
public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName,
CommandSubscribe.InitialPosition initialPosition) throws
ManagedLedgerException {
CommandSubscribe.InitialPosition initialPosition,
boolean isReadCompacted) throws ManagedLedgerException {
return null;
}

Expand Down

0 comments on commit 07f131f

Please sign in to comment.