Skip to content

Commit

Permalink
Revert "[Fix][Tiered Storage] Eagerly Delete Offloaded Segments On To…
Browse files Browse the repository at this point in the history
…pic Deletion (apache#15914)" (apache#17889)

This reverts commit 9026d19.
  • Loading branch information
eolivelli authored Sep 29, 2022
1 parent 6cba1f6 commit f0b6348
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 666 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,6 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void delete(String name) throws InterruptedException, ManagedLedgerException;

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
* @param name
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture)
throws InterruptedException, ManagedLedgerException;

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
Expand All @@ -164,16 +154,6 @@ void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture)
*/
void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx);

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
* @param name
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx);

/**
* Releases all the resources maintained by the ManagedLedgerFactory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -72,15 +71,13 @@
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -805,18 +802,12 @@ public void operationFailed(MetaStoreException e) {

@Override
public void delete(String name) throws InterruptedException, ManagedLedgerException {
delete(name, CompletableFuture.completedFuture(null));
}

@Override
public void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture)
throws InterruptedException, ManagedLedgerException {
class Result {
ManagedLedgerException e = null;
}
final Result r = new Result();
final CountDownLatch latch = new CountDownLatch(1);
asyncDelete(name, mlConfigFuture, new DeleteLedgerCallback() {
asyncDelete(name, new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
latch.countDown();
Expand All @@ -838,16 +829,10 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx) {
asyncDelete(name, CompletableFuture.completedFuture(null), callback, ctx);
}

@Override
public void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
CompletableFuture<ManagedLedgerImpl> future = ledgers.get(name);
if (future == null) {
// Managed ledger does not exist and we're not currently trying to open it
deleteManagedLedger(name, mlConfigFuture, callback, ctx);
deleteManagedLedger(name, callback, ctx);
} else {
future.thenAccept(ml -> {
// If it's open, delete in the normal way
Expand All @@ -862,8 +847,7 @@ public void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlCo
/**
* Delete all managed ledger resources and metadata.
*/
void deleteManagedLedger(String managedLedgerName, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
void deleteManagedLedger(String managedLedgerName, DeleteLedgerCallback callback, Object ctx) {
// Read the managed ledger metadata from store
asyncGetManagedLedgerInfo(managedLedgerName, new ManagedLedgerInfoCallback() {
@Override
Expand All @@ -875,7 +859,7 @@ public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
.map(e -> deleteCursor(bkc, managedLedgerName, e.getKey(), e.getValue()))
.collect(Collectors.toList());
Futures.waitForAll(futures).thenRun(() -> {
deleteManagedLedgerData(bkc, managedLedgerName, info, mlConfigFuture, callback, ctx);
deleteManagedLedgerData(bkc, managedLedgerName, info, callback, ctx);
}).exceptionally(ex -> {
callback.deleteLedgerFailed(new ManagedLedgerException(ex), ctx);
return null;
Expand All @@ -890,80 +874,22 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}

private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, ManagedLedgerInfo info,
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
final CompletableFuture<Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>>
ledgerInfosFuture = new CompletableFuture<>();
store.getManagedLedgerInfo(managedLedgerName, false, null,
new MetaStoreCallback<>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> infos = new HashMap<>();
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
infos.put(ls.getLedgerId(), ls);
}
ledgerInfosFuture.complete(infos);
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("Failed to get managed ledger info for {}", managedLedgerName, e);
ledgerInfosFuture.completeExceptionally(e);
}
});

DeleteLedgerCallback callback, Object ctx) {
Futures.waitForAll(info.ledgers.stream()
.map(li -> {
final CompletableFuture<Void> res;
if (li.isOffloaded) {
res = mlConfigFuture
.thenCombine(ledgerInfosFuture, Pair::of)
.thenCompose(pair -> {
ManagedLedgerConfig mlConfig = pair.getLeft();
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfos = pair.getRight();

if (mlConfig == null || ledgerInfos == null) {
return CompletableFuture.completedFuture(null);
}

MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = ledgerInfos.get(li.ledgerId);

if (ls.getOffloadContext().hasUidMsb()) {
MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
String driverName = OffloadUtils.getOffloadDriverName(ls,
mlConfig.getLedgerOffloader().getOffloadDriverName());
Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
mlConfig.getLedgerOffloader().getOffloadDriverMetadata());
OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);

UUID uuid = new UUID(ls.getOffloadContext().getUidMsb(),
ls.getOffloadContext().getUidLsb());
return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig,
OffloadUtils.getOffloadDriverMetadata(ls,
mlConfig.getLedgerOffloader().getOffloadDriverMetadata()),
"Deletion", managedLedgerName, scheduledExecutor);
}

return CompletableFuture.completedFuture(null);
});
} else {
res = CompletableFuture.completedFuture(null);
}
return res.thenCompose(__ -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", li.ledgerId);
return null;
}
throw new CompletionException(ex);
.filter(li -> !li.isOffloaded)
.map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", li.ledgerId);
return null;
}
return result;
}));
})
throw new CompletionException(ex);
}
return result;
}))
.collect(Collectors.toList()))
.thenRun(() -> {
// Delete the metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2450,7 +2450,7 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
if (!factory.isMetadataServiceAvailable()) {
// Defer trimming of ledger if we cannot connect to metadata service
promise.completeExceptionally(new MetaStoreException("Metadata service is not available"));
promise.complete(null);
return;
}

Expand Down Expand Up @@ -2722,30 +2722,11 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) {

@Override
public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {

// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
// ledgers
setFenced();
cancelScheduledTasks();

// Truncate to ensure the offloaded data is not orphaned.
// Also ensures the BK ledgers are deleted and not just scheduled for deletion
CompletableFuture<Void> truncateFuture = asyncTruncate();
truncateFuture.whenComplete((ignore, exc) -> {
if (exc != null) {
log.error("[{}] Error truncating ledger for deletion", name, exc);
callback.deleteLedgerFailed(exc instanceof ManagedLedgerException
? (ManagedLedgerException) exc : new ManagedLedgerException(exc),
ctx);
} else {
asyncDeleteInternal(callback, ctx);
}
});

}

private void asyncDeleteInternal(final DeleteLedgerCallback callback, final Object ctx) {

List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
if (cursors.isEmpty()) {
// No cursors to delete, proceed with next step
Expand Down Expand Up @@ -2803,9 +2784,10 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {

if (info.getOffloadContext().hasUidMsb()) {
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
OffloadUtils.cleanupOffloaded(ledgerId, uuid, config,
cleanupOffloaded(ledgerId, uuid,
OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()),
OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
"Trimming", name, scheduledExecutor);
"Trimming");
}
}

Expand Down Expand Up @@ -2860,7 +2842,7 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
default:
// Handle error
log.warn("[{}] Failed to delete ledger {} -- {}", name, ls.getLedgerId(),
BKException.getMessage(rc) + " code " + rc);
BKException.getMessage(rc));
int toDelete = ledgersToDelete.get();
if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) {
// Trigger callback only once
Expand Down Expand Up @@ -3049,17 +3031,18 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
// it is possible to get a BadVersion or other exception after retrying.
// So we don't clean up the data if it has metadata operation exception.
log.error("[{}] Failed to update offloaded metadata for the ledgerId {}, "
+ "the offloaded data will not be cleaned up",
name, ledgerId, exception);
+ "the offloaded data will not be cleaned up",
name, ledgerId, exception);
return;
} else {
log.error("[{}] Failed to offload data for the ledgerId {}, "
+ "clean up the offloaded data",
name, ledgerId, exception);
+ "clean up the offloaded data",
name, ledgerId, exception);
}
OffloadUtils.cleanupOffloaded(
ledgerId, uuid, config,
driverMetadata,
"Metastore failure", name, scheduledExecutor);
cleanupOffloaded(
ledgerId, uuid,
driverName, driverMetadata,
"Metastore failure");
}
});
})
Expand Down Expand Up @@ -3178,15 +3161,14 @@ private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUI
oldInfo.getOffloadContext().getUidLsb());
log.info("[{}] Found previous offload attempt for ledger {}, uuid {}"
+ ", cleaning up", name, ledgerId, uuid);
OffloadUtils.cleanupOffloaded(
cleanupOffloaded(
ledgerId,
oldUuid,
config,
OffloadUtils.getOffloadDriverName(oldInfo,
config.getLedgerOffloader().getOffloadDriverName()),
OffloadUtils.getOffloadDriverMetadata(oldInfo,
config.getLedgerOffloader().getOffloadDriverMetadata()),
"Previous failed offload",
name,
scheduledExecutor);
"Previous failed offload");
}
LedgerInfo.Builder builder = oldInfo.toBuilder();
builder.getOffloadContextBuilder()
Expand Down Expand Up @@ -3248,6 +3230,28 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
});
}

private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName, /*
* TODO: use driver name to
* identify offloader
*/
Map<String, String> offloadDriverMetadata, String cleanupReason) {
log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.",
name, ledgerId, uuid.toString(), cleanupReason);
Map<String, String> metadataMap = new HashMap();
metadataMap.putAll(offloadDriverMetadata);
metadataMap.put("ManagedLedgerName", name);

Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10),
Retries.NonFatalPredicate,
() -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap),
scheduledExecutor, name).whenComplete((ignored, exception) -> {
if (exception != null) {
log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})",
name, ledgerId, cleanupReason, exception);
}
});
}

/**
* Get the number of entries between a contiguous range of two positions.
*
Expand Down Expand Up @@ -3756,7 +3760,7 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod
} else if (isBkErrorNotRecoverable(bkErrorCode)) {
return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
} else {
return new ManagedLedgerException(BKException.getMessage(bkErrorCode) + " error code: " + bkErrorCode);
return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
}
}

Expand Down
Loading

0 comments on commit f0b6348

Please sign in to comment.