Skip to content

Commit

Permalink
If mark-delete operation fails, mark the cursor as "dirty" (apache#14256
Browse files Browse the repository at this point in the history
)
  • Loading branch information
merlimat authored Feb 13, 2022
1 parent d5c1271 commit 8928c34
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,7 @@ public void operationComplete() {

@Override
public void operationFailed(ManagedLedgerException exception) {
isDirty = true;
log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
ManagedCursorImpl.this, mdEntry.newPosition);
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3507,6 +3507,57 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
});
}



@Test
public void testFlushCursorAfterError() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setThrottleMarkDelete(1.0);

ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
factoryConfig.setCursorPositionFlushSeconds(1);

@Cleanup("shutdown")
ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config);
ManagedCursor c1 = ledger1.openCursor("c");
List<Position> positions = new ArrayList<>();

for (int i = 0; i < 20; i++) {
positions.add(ledger1.addEntry(new byte[1024]));
}

// Simulate BK write error
bkc.failNow(BKException.Code.NotEnoughBookiesException);
metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException(""));

try {
c1.markDelete(positions.get(positions.size() - 1));
fail("should have failed");
} catch (ManagedLedgerException e) {
// Expected
}

metadataStore.unsetAlwaysFail();

// In memory position is updated
assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));

Awaitility.await()
// Give chance to the flush to be automatically triggered.
// NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting
.pollDelay(Duration.ofMillis(2000))
.untilAsserted(() -> {
// Abruptly re-open the managed ledger without graceful close
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
});
}

@Test
public void testCursorCheckReadPositionChanged() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
Expand Down

0 comments on commit 8928c34

Please sign in to comment.