Skip to content

Commit

Permalink
Fixed error deleted a named non-durable cursor (apache#4144)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Apr 27, 2019
1 parent 479f067 commit e4c4bf1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ enum PositionBound {
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;

private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
// last read-operation's callback to check read-timeout on it.
private volatile ReadEntryCallbackWrapper lastReadCallback = null;

/**
* Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
* created asynchronously and hence there is no ready ledger to write into.
Expand Down Expand Up @@ -753,6 +753,10 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
if (cursor == null) {
callback.deleteCursorFailed(new ManagedLedgerException("ManagedCursor not found: " + consumerName), ctx);
return;
} else if (!cursor.isDurable()) {
cursors.removeCursor(consumerName);
callback.deleteCursorComplete(ctx);
return;
}

// First remove the consumer form the MetaStore. If this operation succeeds and the next one (removing the
Expand Down Expand Up @@ -1202,7 +1206,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
if (this.timeoutTask != null) {
this.timeoutTask.cancel(false);
}

}

private void closeAllCursors(CloseCallback callback, final Object ctx) {
Expand Down Expand Up @@ -1725,7 +1729,7 @@ public void readFailed(ManagedLedgerException exception, Object ctx) {
recycle();
}
}

private boolean checkCallbackCompleted(Object ctx) {
// if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
// assigned to different request
Expand Down Expand Up @@ -3134,7 +3138,7 @@ private void checkReadTimeout() {
lastReadCallback = null;
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -622,5 +622,16 @@ void testCursorWithNameIsNotNull() throws Exception {
}
}

@Test
void deleteNonDurableCursorWithName() throws Exception {
ManagedLedger ledger = factory.open("deleteManagedLedgerWithNonDurableCursor");

ManagedCursor c = ledger.newNonDurableCursor(PositionImpl.earliest, "custom-name");
assertEquals(Iterables.size(ledger.getCursors()), 1);

ledger.deleteCursor(c.getName());
assertEquals(Iterables.size(ledger.getCursors()), 0);
}

private static final Logger log = LoggerFactory.getLogger(NonDurableCursorTest.class);
}

0 comments on commit e4c4bf1

Please sign in to comment.