Skip to content

Commit

Permalink
[PIP 81] Part-1 Split createNewMetadataLedger into multiple methods f…
Browse files Browse the repository at this point in the history
…or reuse (apache#15425)

### Motivation
It is difficult to get CR due to so many modifications in PIP 81 apache#10729.
So I split this PR into multiple sub-PRs to facilitate CR

### Modifications
I split the logic in createNewMetadataLedger into multiple methods to facilitate subsequent reuse
1. Put the logic of creating Ledger into `doCreateNewMetadataLedger` separately
2. `switchToNewLedger` does not need to wrap another layer of callback, so remove redundant wrapping
3. When persisting data fails, we need to delete the created Ledger, this part of the logic is put into `deleteLedger`

### Verifying this change
This modification does not change any previous behavior, so no unit tests are required, but the previous unit tests must be ensured to pass
  • Loading branch information
315157973 authored May 6, 2022
1 parent 5c2700b commit ac6bd3c
Showing 1 changed file with 58 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -59,7 +60,6 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -2532,10 +2532,45 @@ void internalFlushPendingMarkDeletes() {

void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();
doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
if (newLedgerHandle == null) {
return;
}
MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
// Created the ledger, now write the last position content
persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
@Override
public void operationComplete() {
if (log.isDebugEnabled()) {
log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);
}
switchToNewLedger(newLedgerHandle, callback);
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);

deleteLedgerAsync(newLedgerHandle);
callback.operationFailed(exception);
}
});
}).whenComplete((result, e) -> {
ledger.mbean.endCursorLedgerCreateOp();
if (e != null) {
callback.operationFailed(createManagedLedgerException(e));
}
});
}

private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {

if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
future.complete(null);
return;
}

Expand All @@ -2544,63 +2579,35 @@ void createNewMetadataLedger(final VoidCallback callback) {
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
BKException.getMessage(rc));
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
}
// Created the ledger, now write the last position
// content
MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
persistPositionToLedger(lh, mdEntry, new VoidCallback() {
@Override
public void operationComplete() {
if (log.isDebugEnabled()) {
log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);
}
switchToNewLedger(lh, new VoidCallback() {
@Override
public void operationComplete() {
callback.operationComplete();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
// it means it failed to switch the newly created ledger so, it should be
// deleted to prevent leak
bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
lh.getId());
}
}, null);
callback.operationFailed(exception);
}
});
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);

ledger.mbean.startCursorLedgerDeleteOp();
bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
ledger.mbean.endCursorLedgerDeleteOp();
}
}, null);
callback.operationFailed(exception);
}
});
future.complete(lh);
}));
}, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));

return future;
}

private CompletableFuture<Void> deleteLedgerAsync(LedgerHandle ledgerHandle) {
ledger.mbean.startCursorLedgerDeleteOp();
CompletableFuture<Void> future = new CompletableFuture<>();
bookkeeper.asyncDeleteLedger(ledgerHandle.getId(), (int rc, Object ctx) -> {
future.complete(null);
ledger.mbean.endCursorLedgerDeleteOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
ledgerHandle.getId());
}
}, null);
return future;
}


private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
Expand Down Expand Up @@ -2786,7 +2793,9 @@ public void operationComplete(Void result, Stat stat) {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update consumer {}", ledger.getName(), name, e);
callback.operationFailed(e);
// it means it failed to switch the newly created ledger so, it should be
// deleted to prevent leak
deleteLedgerAsync(lh).thenRun(() -> callback.operationFailed(e));
}
}, false);
}
Expand Down

0 comments on commit ac6bd3c

Please sign in to comment.