Skip to content

Commit

Permalink
Async the method onManagedLedgerLastLedgerInitialize for ManagedLedge…
Browse files Browse the repository at this point in the history
…rInterceptor (apache#10706)

* Async the method onManagedLedgerLastLedgerInitialize for ManagedLedgerInterceptor

* Addressed comment.
  • Loading branch information
codelipenghui authored May 26, 2021
1 parent 6bc9e74 commit f355272
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ public static class ManagedLedgerInterceptException extends ManagedLedgerExcepti
public ManagedLedgerInterceptException(String msg) {
super(msg);
}

public ManagedLedgerInterceptException(Throwable e) {
super(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,16 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
.setTimestamp(clock.millis()).build();
ledgers.put(id, info);
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh);
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
.thenRun(() -> initializeBookKeeper(callback))
.exceptionally(ex -> {
callback.initializeFailed(
new ManagedLedgerInterceptException(ex.getCause()));
return null;
});
} else {
initializeBookKeeper(callback);
}
initializeBookKeeper(callback);
} else if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Ledger not found: {}", name, ledgers.lastKey());
ledgers.remove(ledgers.lastKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.bookkeeper.mledger.impl.OpAddEntry;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Interceptor for ManagedLedger.
Expand Down Expand Up @@ -51,7 +52,7 @@ public interface ManagedLedgerInterceptor {
* @param name name of ManagedLedger
* @param ledgerHandle a LedgerHandle.
*/
void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle);
CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle);

/**
* @param propertiesMap map of properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
Expand Down Expand Up @@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
}

@Override
public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
LedgerEntries ledgerEntries = null;
try {
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
ledgerEntries =
lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed());
for (LedgerEntry entry : ledgerEntries) {
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
((AppendIndexMetadataInterceptor) interceptor)
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
CompletableFuture<Void> promise = new CompletableFuture<>();
boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream()
.anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor);
if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) {
lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
if (ex != null) {
log.error("[{}] Read last entry error.", name, ex);
promise.completeExceptionally(ex);
} else {
if (entries != null) {
try {
LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
if (ledgerEntry != null) {
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
((AppendIndexMetadataInterceptor) interceptor)
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
}
}
}
}
entries.close();
promise.complete(null);
} catch (Exception e) {
log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
name, e);
promise.completeExceptionally(e);
}
} else {
promise.complete(null);
}

}
}
} catch (org.apache.bookkeeper.client.api.BKException | InterruptedException e) {
log.error("[{}] Read last entry error.", name, e);
} finally {
if (ledgerEntries != null) {
ledgerEntries.close();
}
});
} else {
promise.complete(null);
}
return promise;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
Expand All @@ -47,8 +48,8 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa
}

@Override
public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle) {

public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle) {
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down

0 comments on commit f355272

Please sign in to comment.