Skip to content

Commit

Permalink
Data is not deleted after expiration due to connected readers (apache…
Browse files Browse the repository at this point in the history
…#5621)

* Data is not deleted after expiration due to connected readers

*Problem*

A problem is observed when stress testing pulsar using [pulsar-flink](https://github.com/streamnative/pulsar-flink) -
No matter what TTL or retention setting is used, the data is never cleaned up. So the stress test ends up failing due
to disk filled up.

The root cause of the problem is described as below.

when a reader is opened using `MessageId.earliest`, a non-durable cursor with position (-1, -2) is added to the cursor heap.
The position `(-1, -2)` in the heap is never updated because non-durable cursors are never advanced when mark-deletions
happen. So the slowest cursor position is always `(-1, -2)`, thus causing no ledger can be deleted even they are expired
or over quota.

*Motivation*

Fix the problem to make sure Pulsar honor to TTL and retention settings.

*Modifications*

- Fix the `startPosition` when PersistentTopic opens a non-durable cursor on `MessageId.earliest`.
  So the `startPosition` is (-1, -1) not (-1, -2).

- Fix the `NonDurableCursorImpl` constructor to check if the position in the ledger of `MessageId.earliest`.
  If the provided position is in the `earliest` ledger, the mark-deleted position will be set to the previous
  position of first position.

- Fix the `NonDurableCursorImpl` to advance ledger cursor when mark-deletion happens on a non-durable cursor.

*Verify this change*

Unit tests are coming.
  • Loading branch information
sijie authored Nov 12, 2019
1 parent 0279431 commit 3e7cb68
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ private ByteBuf copyEntry(EntryImpl entry) {
public void invalidateEntries(final PositionImpl lastPosition) {
final PositionImpl firstPosition = PositionImpl.get(-1, 0);

if (firstPosition.compareTo(lastPosition) > 0) {
log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}",
firstPosition, lastPosition);
return;
}

Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false);
int entriesRemoved = removed.getLeft();
long sizeRemoved = removed.getRight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* care about ledgers to be deleted.
*
*/
class ManagedCursorContainer implements Iterable<ManagedCursor> {
public class ManagedCursorContainer implements Iterable<ManagedCursor> {

private static class Item {
final ManagedCursor cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class ManagedCursorImpl implements ManagedCursor {

protected volatile PositionImpl markDeletePosition;
protected volatile PositionImpl readPosition;
private volatile MarkDeleteEntry lastMarkDeleteEntry;
protected volatile MarkDeleteEntry lastMarkDeleteEntry;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
Expand Down Expand Up @@ -178,7 +178,7 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
}
}

private final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
protected final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,11 +829,9 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException {
checkManagedLedgerIsOpen();
checkFenced();

return new NonDurableCursorImpl(bookKeeper, config, this, null,
(PositionImpl) startCursorPosition);
return newNonDurableCursor(
startCursorPosition,
"non-durable-cursor-" + UUID.randomUUID());
}

@Override
Expand Down Expand Up @@ -863,12 +861,12 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
}

@Override
public Iterable<ManagedCursor> getCursors() {
public ManagedCursorContainer getCursors() {
return cursors;
}

@Override
public Iterable<ManagedCursor> getActiveCursors() {
public ManagedCursorContainer getActiveCursors() {
return activeCursors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
// Start from last entry
initializeCursorPosition(ledger.getLastPositionAndCounter());
} else if (startCursorPosition.equals(PositionImpl.earliest)) {
} else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
} else {
Expand Down Expand Up @@ -83,6 +83,12 @@ void recover(final VoidCallback callback) {
protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx) {
// Bypass persistence of mark-delete position and individually deleted messages info

MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
lastMarkDeleteEntry = mdEntry;
// it is important to advance cursor so the retention can kick in as expected.
ledger.updateCursor(NonDurableCursorImpl.this, mdEntry.newPosition);

callback.markDeleteComplete(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
Expand Down Expand Up @@ -80,7 +81,7 @@ void testZNodeBypassed() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");

ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.earliest);
assertTrue(Iterables.isEmpty(ledger.getCursors()));
assertFalse(Iterables.isEmpty(ledger.getCursors()));

c1.close();
ledger.close();
Expand Down Expand Up @@ -610,6 +611,50 @@ void testCursorWithNameIsCachable() throws Exception {
ledger.close();
}

@Test
public void testGetSlowestConsumer() throws Exception {
final String mlName = "test-get-slowest-consumer-ml";
final String c1 = "cursor1";
final String nc1 = "non-durable-cursor1";
final String ncEarliest = "non-durable-cursor-earliest";

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, new ManagedLedgerConfig());
Position p1 = ledger.addEntry(c1.getBytes(UTF_8));
log.info("write entry 1 : pos = {}", p1);
Position p2 = ledger.addEntry(nc1.getBytes(UTF_8));
log.info("write entry 2 : pos = {}", p2);
Position p3 = ledger.addEntry(nc1.getBytes(UTF_8));
log.info("write entry 3 : pos = {}", p3);

ManagedCursor cursor1 = ledger.openCursor(c1);
cursor1.seek(p3);
assertEquals(p3, ledger.getCursors().getSlowestReaderPosition());

ManagedCursor nonCursor1 = ledger.newNonDurableCursor(p2, nc1);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());

PositionImpl earliestPos = new PositionImpl(-1, -2);

ManagedCursor nonCursorEarliest = ledger.newNonDurableCursor(earliestPos, ncEarliest);
PositionImpl expectedPos = new PositionImpl(((PositionImpl) p1).getLedgerId(), -1);
assertEquals(expectedPos, ledger.getCursors().getSlowestReaderPosition());

// move non-durable cursor should update the slowest reader position
nonCursorEarliest.markDelete(p1);
assertEquals(p1, ledger.getCursors().getSlowestReaderPosition());

nonCursorEarliest.markDelete(p2);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());

nonCursorEarliest.markDelete(p3);
assertEquals(p2, ledger.getCursors().getSlowestReaderPosition());

nonCursor1.markDelete(p3);
assertEquals(p3, ledger.getCursors().getSlowestReaderPosition());

ledger.close();
}

@Test(expectedExceptions = NullPointerException.class)
void testCursorWithNameIsNotNull() throws Exception {
final String p1CursorName = "entry-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,8 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri

long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
if (msgId instanceof BatchMessageIdImpl) {
if (ledgerId >= 0
&& msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerSer
throws BrokerServiceException.NamingException, ManagedLedgerException {
super(topic, ledger, brokerService);
this.txnCursor = new TransactionCursorImpl();
this.retentionCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
this.retentionCursor = ledger.newNonDurableCursor(
PositionImpl.earliest, "txn-buffer-retention");
}

@Override
Expand Down

0 comments on commit 3e7cb68

Please sign in to comment.