Skip to content

Commit

Permalink
Fix producer stucks on creating ledger timeout (apache#7319)
Browse files Browse the repository at this point in the history
* Fix producer stucks on creating ledger timeout

*Motivation*

The `ledgerCreated` flag is passed as ctx to the createLedger callback.
The callback already had the logic on handling `ledgerCreated` flag. But we set the flag to `false`
when timeout happens. It will trigger the following race condition:

a) The createComplete callback is triggered when timeout. But the pending add requests are not error'd out.
b) If the createComplete callback eventually completes, it will see `ledgerCreated` flag is set to true,
so it will cause `checkAndCompleteLedgerOpTask` returns false and exist too early without processing the
pending add requests.

This race condition only happens when creating ledger times out.

```
    public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
        }

        if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
            return;
        }
```

*Modification*

The timeout logic shouldn't modify the `ledgerCreated` context. It should let the callback itself to process
the `ledgerCreated` context.

* Change to use CAS
  • Loading branch information
sijie authored Jun 22, 2020
1 parent aeee10f commit a34f693
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3177,8 +3177,7 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
ledgerCreated.set(true);
cb.createComplete(BKException.Code.TimeoutException, null, null);
cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
}
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
}
Expand All @@ -3194,14 +3193,15 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx instanceof AtomicBoolean) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
if (((AtomicBoolean) (ctx)).get()) {
if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
return false;
} else {
if (rc == BKException.Code.OK) {
log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId());
asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES);
}
return true;
}
((AtomicBoolean) ctx).set(true);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
Expand Down Expand Up @@ -2326,16 +2327,18 @@ public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
AtomicInteger response = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
response.set(rc);
latch.countDown();
}
AtomicReference<Object> ctxHolder = new AtomicReference<>();
ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
response.set(rc);
latch.countDown();
ctxHolder.set(ctx);
}, Collections.emptyMap());

latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
assertTrue(ctxHolder.get() instanceof AtomicBoolean);
AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
assertFalse(ledgerCreated.get());

ledger.close();
}
Expand Down

0 comments on commit a34f693

Please sign in to comment.