Skip to content

Commit

Permalink
Fix compactor skips data from last compacted Ledger (apache#12429)
Browse files Browse the repository at this point in the history
## Motivation

The PR is fixing the compacted data lost during the data compaction.
We see a few events deletion but the compacted events obviously dropped a lot.

![image](https://user-images.githubusercontent.com/12592133/138008777-00eb7c0b-358e-4291-bfd4-f4b27cbedbf4.png)

After investigating more details about the issue, only the first read operation reads the data from
the compacted ledger, since the second read operation, the broker start read data from the original
topic.

```
2021-10-19T23:09:30,021+0800 [broker-topic-workers-OrderedScheduler-7-0] INFO  org.apache.pulsar.compaction.CompactedTopicImpl - =====[public/default/persistent/c499d42c-75d7-48d1-9225-2e724c0e1d83] Read from compacted Ledger = cursor position: -1:-1, Horizon: 16:-1, isFirstRead: true
2021-10-19T23:09:30,049+0800 [broker-topic-workers-OrderedScheduler-7-0] INFO  org.apache.pulsar.compaction.CompactedTopicImpl - =====[public/default/persistent/c499d42c-75d7-48d1-9225-2e724c0e1d83] Read from original Ledger = cursor position: 16:0, Horizon: 16:-1, isFirstRead: false
```

## Modifications

The compaction task depends on the last snapshot and the incremental
entries to build the new snapshot. So for the compaction cursor, we
need to force seek the read position to ensure the compactor can read
the complete last snapshot because the compactor will read the data
before the compaction cursor mark delete position.

## Verifying this change

New test added for checking the compacted data will not lost.
  • Loading branch information
codelipenghui authored Oct 21, 2021
1 parent e358e92 commit 1830f90
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,11 @@ void markDelete(Position position, Map<String, Long> properties)
* @param newReadPosition
* the position where to move the cursor
*/
void seek(Position newReadPosition);
default void seek(Position newReadPosition) {
seek(newReadPosition, false);
}

void seek(Position newReadPosition, boolean force);

/**
* Clear the cursor backlog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2163,18 +2163,16 @@ public void rewind() {
}

@Override
public void seek(Position newReadPositionInt) {
public void seek(Position newReadPositionInt, boolean force) {
checkArgument(newReadPositionInt instanceof PositionImpl);
PositionImpl newReadPosition = (PositionImpl) newReadPositionInt;

lock.writeLock().lock();
try {
if (newReadPosition.compareTo(markDeletePosition) <= 0) {
if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) {
// Make sure the newReadPosition comes after the mark delete position
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
}

PositionImpl oldReadPosition = readPosition;
readPosition = newReadPosition;
} finally {
lock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void rewind() {
}

@Override
public void seek(Position newReadPosition) {
public void seek(Position newReadPosition, boolean force) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.compaction;

import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.ComparisonChain;
Expand Down Expand Up @@ -122,7 +123,13 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Entry lastEntry = entries.get(entries.size() - 1);
cursor.seek(lastEntry.getPosition().getNext());
// The compaction task depends on the last snapshot and the incremental
// entries to build the new snapshot. So for the compaction cursor, we
// need to force seek the read position to ensure the compactor can read
// the complete last snapshot because of the compactor will read the data
// before the compaction cursor mark delete position
cursor.seek(lastEntry.getPosition().getNext(),
cursor.getName().equals(COMPACTION_SUBSCRIPTION));
callback.readEntriesComplete(entries, consumer);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.compaction;

import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;

Expand Down Expand Up @@ -437,4 +438,48 @@ public void testLastMessageIdForCompactedLedger() throws Exception {
reader.readNext();
Assert.assertFalse(reader.hasMessageAvailable());
}

@Test
public void testDoNotLossTheLastCompactedLedgerData() throws Exception {
String topic = "persistent://my-property/use/my-ns/testDoNotLossTheLastCompactedLedgerData-" +
UUID.randomUUID();
final int numMessages = 2000;
final int keys = 200;
final String msg = "Test";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.blockIfQueueFull(true)
.maxPendingMessages(numMessages)
.enableBatching(false)
.create();
CompletableFuture<MessageId> lastMessage = null;
for (int i = 0; i < numMessages; ++i) {
lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync();
}
producer.flush();
lastMessage.join();
admin.topics().triggerCompaction(topic);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
Assert.assertEquals(stats.compactedLedger.entries, keys);
Assert.assertEquals(admin.topics().getStats(topic)
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
});
admin.topics().unload(topic);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
Assert.assertEquals(stats.ledgers.size(), 1);
Assert.assertEquals(admin.topics().getStats(topic)
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
});
admin.topics().unload(topic);
// Send one more key to and then to trigger the compaction
producer.newMessage().key(keys + "").value(msg).send();
admin.topics().triggerCompaction(topic);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
Assert.assertEquals(stats.compactedLedger.entries, keys + 1);
});
}
}

0 comments on commit 1830f90

Please sign in to comment.