Skip to content

Commit

Permalink
Fixed readers backlog stats after data is skipped (apache#7236)
Browse files Browse the repository at this point in the history
### Motivation

The metrics for the reader backlog keep increasing when data is dropped because the reader cursor only moves on the next read attempt.
Instead we should proactively move the cursor forward on the first valid ledger.
  • Loading branch information
merlimat authored Jun 16, 2020
1 parent bc0718b commit 6b9c90f
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -85,6 +85,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
Expand Down Expand Up @@ -2067,6 +2068,8 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
return;
}

advanceNonDurableCursors(ledgersToDelete);

// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
ledgerCache.remove(ls.getLedgerId());
Expand Down Expand Up @@ -2125,6 +2128,37 @@ public void operationFailed(MetaStoreException e) {
}
}

/**
* Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
* This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
* entries and the stats are reported correctly.
*/
private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
if (ledgersToDelete.isEmpty()) {
return;
}

long firstNonDeletedLedger = ledgers
.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);

cursors.forEach(cursor -> {
if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name,
exception.getMessage());
}
}, null);
}
});
}

/**
* Delete this ManagedLedger completely from the system.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@

import com.google.common.base.Charsets;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -678,6 +681,48 @@ public void testGetSlowestConsumer() throws Exception {
ledger.close();
}

@Test
public void testBacklogStatsWhenDroppingData() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
ManagedCursor c1 = ledger.openCursor("c1");
ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);

assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);

List<Position> positions = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
}

assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);

c1.markDelete(positions.get(4));
assertEquals(c1.getNumberOfEntries(), 5);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);

// Since the durable cursor has moved, the data will be trimmed
CompletableFuture<Void> promise = new CompletableFuture<>();
ledger.internalTrimConsumedLedgers(promise);
promise.join();

assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);

c1.close();
ledger.deleteCursor(c1.getName());
promise = new CompletableFuture<>();
ledger.internalTrimConsumedLedgers(promise);
promise.join();

assertEquals(nonDurableCursor.getNumberOfEntries(), 1);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1);

ledger.close();
}

@Test(expectedExceptions = NullPointerException.class)
void testCursorWithNameIsNotNull() throws Exception {
final String p1CursorName = "entry-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testBacklogQuotaWithReader() throws Exception {
// non-durable mes should still
assertEquals(stats.subscriptions.size(), 1);
long nonDurableSubscriptionBacklog = stats.subscriptions.values().iterator().next().msgBacklog;
assertEquals(nonDurableSubscriptionBacklog, numMsgs,
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;

try {
Expand Down

0 comments on commit 6b9c90f

Please sign in to comment.