Skip to content

Commit

Permalink
[Issue 6283][tiered-storage] Offload policies per namespace (apache#6422
Browse files Browse the repository at this point in the history
)

Fixes apache#6283 

### Modifications

Define and use custom deletionLag and threshold for offloadpolicies per ns.
All is stuff is required for apache#6354.
  • Loading branch information
KannarFr authored Mar 28, 2020
1 parent 181e5e7 commit 347d385
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ public class ManagedLedgerConfig {
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
private long offloadAutoTriggerSizeThresholdBytes = -1;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
private long addEntryTimeoutSeconds = 120;
Expand Down Expand Up @@ -408,48 +406,6 @@ public long getRetentionSizeInMB() {
return retentionSizeInMB;
}

/**
* When a ledger is offloaded from bookkeeper storage to longterm storage, the bookkeeper ledger
* is not deleted immediately. Instead we wait for a grace period before deleting from bookkeeper.
* The offloadLedgerDeleteLag sets this grace period.
*
* @param lagTime period to wait before deleting offloaded ledgers from bookkeeper
* @param unit timeunit for lagTime
*/
public ManagedLedgerConfig setOffloadLedgerDeletionLag(long lagTime, TimeUnit unit) {
this.offloadLedgerDeletionLagMs = unit.toMillis(lagTime);
return this;
}

/**
* Number of milliseconds before an offloaded ledger will be deleted from bookkeeper.
*
* @return the offload ledger deletion lag time in milliseconds
*/
public long getOffloadLedgerDeletionLagMillis() {
return offloadLedgerDeletionLagMs;
}

/**
* Size, in bytes, at which the managed ledger will start to automatically offload ledgers to longterm storage.
* A negative value disables autotriggering. A threshold of 0 offloads data as soon as possible.
* Offloading will not occur if no offloader has been set {@link #setLedgerOffloader(LedgerOffloader)}.
* Automatical offloading occurs when the ledger is rolled, and the ledgers up to that point exceed the threshold.
*
* @param threshold Threshold in bytes at which offload is automatically triggered
*/
public ManagedLedgerConfig setOffloadAutoTriggerSizeThresholdBytes(long threshold) {
this.offloadAutoTriggerSizeThresholdBytes = threshold;
return this;
}

/**
* Size, in bytes, at which offloading will automatically be triggered for this managed ledger.
* @return the trigger threshold, in bytes
*/
public long getOffloadAutoTriggerSizeThresholdBytes() {
return this.offloadAutoTriggerSizeThresholdBytes;
}

/**
* Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1856,8 +1856,11 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getOffloadAutoTriggerSizeThresholdBytes() >= 0) {
executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null) {
if (config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
}
}
}

Expand All @@ -1876,39 +1879,43 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
}
});

long threshold = config.getOffloadAutoTriggerSizeThresholdBytes();
long sizeSummed = 0;
long alreadyOffloadedSize = 0;
long toOffloadSize = 0;

ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque();

// go through ledger list from newest to oldest and build a list to offload in oldest to newest order
for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
long size = e.getValue().getSize();
sizeSummed += size;
boolean alreadyOffloaded = e.getValue().hasOffloadContext()
&& e.getValue().getOffloadContext().getComplete();
if (alreadyOffloaded) {
alreadyOffloadedSize += size;
} else if (sizeSummed > threshold) {
toOffloadSize += size;
toOffload.addFirst(e.getValue());
if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null) {
long threshold = config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes();

long sizeSummed = 0;
long alreadyOffloadedSize = 0;
long toOffloadSize = 0;

ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque();

// go through ledger list from newest to oldest and build a list to offload in oldest to newest order
for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
long size = e.getValue().getSize();
sizeSummed += size;
boolean alreadyOffloaded = e.getValue().hasOffloadContext()
&& e.getValue().getOffloadContext().getComplete();
if (alreadyOffloaded) {
alreadyOffloadedSize += size;
} else if (sizeSummed > threshold) {
toOffloadSize += size;
toOffload.addFirst(e.getValue());
}
}
}

if (toOffload.size() > 0) {
log.info("[{}] Going to automatically offload ledgers {}"
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
name, sizeSummed, alreadyOffloadedSize, threshold);
}
if (toOffload.size() > 0) {
log.info("[{}] Going to automatically offload ledgers {}"
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
name, sizeSummed, alreadyOffloadedSize, threshold);
}

offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
}
}
}

Expand All @@ -1930,8 +1937,15 @@ private boolean isLedgerRetentionOverSizeQuota() {

private boolean isOffloadedNeedsDelete(OffloadContext offload) {
long elapsedMs = clock.millis() - offload.getTimestamp();
return offload.getComplete() && !offload.getBookkeeperDeleted()
&& elapsedMs > config.getOffloadLedgerDeletionLagMillis();

if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null) {
return offload.getComplete() && !offload.getBookkeeperDeleted()
&& elapsedMs > config.getLedgerOffloader()
.getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
} else {
return false;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testLaggedDelete() throws Exception {
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES);
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
config.setLedgerOffloader(offloader);
config.setClock(clock);

Expand Down Expand Up @@ -109,8 +109,8 @@ public void testLaggedDeleteRetentionSetLower() throws Exception {
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(5, TimeUnit.MINUTES);
config.setOffloadLedgerDeletionLag(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(600000));
config.setLedgerOffloader(offloader);
config.setClock(clock);

Expand Down Expand Up @@ -157,7 +157,7 @@ public void testLaggedDeleteSlowConsumer() throws Exception {
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES);
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
config.setLedgerOffloader(offloader);
config.setClock(clock);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,12 @@ public void testOffloadDelete() throws Exception {
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(0, TimeUnit.MINUTES);
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(100));
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
ManagedCursor cursor = ledger.openCursor("foobar");

for (int i = 0; i < 15; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
Expand Down Expand Up @@ -746,9 +749,9 @@ public void testAutoTriggerOffload() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -782,9 +785,9 @@ public CompletableFuture<Void> offload(ReadHandle ledger,

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -843,9 +846,9 @@ public CompletableFuture<Void> offload(ReadHandle ledger,

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -894,9 +897,9 @@ public CompletableFuture<Void> offload(ReadHandle ledger,

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -926,13 +929,12 @@ public CompletableFuture<Void> offload(ReadHandle ledger,

@Test
public void offloadAsSoonAsClosed() throws Exception {

MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(0);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -988,6 +990,12 @@ Set<Long> deletedOffloads() {
return deletes.keySet();
}

OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS);

@Override
public String getOffloadDriverName() {
return "mock";
Expand Down Expand Up @@ -1029,7 +1037,7 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,

@Override
public OffloadPolicies getOffloadPolicies() {
return null;
return offloadPolicies;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2221,7 +2221,12 @@ protected void internalSetCompactionThreshold(long newThreshold) {

protected long internalGetOffloadThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).offload_threshold;
Policies policies = getNamespacePolicies(namespaceName);
if (policies.offload_policies == null) {
return policies.offload_threshold;
} else {
return policies.offload_policies.getManagedLedgerOffloadThresholdInBytes();
}
}

protected void internalSetOffloadThreshold(long newThreshold) {
Expand All @@ -2232,8 +2237,13 @@ protected void internalSetOffloadThreshold(long newThreshold) {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);

Policies policies = jsonMapper().readValue(content, Policies.class);
if (policies.offload_policies != null) {
policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(newThreshold);
}
policies.offload_threshold = newThreshold;

globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}",
Expand All @@ -2258,7 +2268,12 @@ protected void internalSetOffloadThreshold(long newThreshold) {

protected Long internalGetOffloadDeletionLag() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).offload_deletion_lag_ms;
Policies policies = getNamespacePolicies(namespaceName);
if (policies.offload_policies == null) {
return policies.offload_deletion_lag_ms;
} else {
return policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis();
}
}

protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
Expand All @@ -2269,8 +2284,13 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);

Policies policies = jsonMapper().readValue(content, Policies.class);
if (policies.offload_policies != null) {
policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
}
policies.offload_deletion_lag_ms = newDeletionLagMs;

globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}",
Expand Down Expand Up @@ -2409,6 +2429,20 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);

if (offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis()
.equals(OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
} else {
policies.offload_deletion_lag_ms = offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis();
}
if (offloadPolicies.getManagedLedgerOffloadThresholdInBytes() ==
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES) {
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
} else {
policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
}

policies.offload_policies = offloadPolicies;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(),
(rc, path1, ctx, stat) -> {
Expand Down
Loading

0 comments on commit 347d385

Please sign in to comment.