Skip to content

Commit

Permalink
Allow ML offload immediately after ledger closed (apache#1965)
Browse files Browse the repository at this point in the history
The documentation said that only negative values disabled. If a user
wants data to be offloaded as soon as possible, the obvious thing is
to set the threshold to 0. Previously this disabled as the check
was > 0, rather than >=.

This patch changes the ML implementation to accept 0 as a threshold
and adds a test to ensure if specified, the ledgers are offloaded as
soon as possible.

Master Issue: apache#1511
  • Loading branch information
ivankelly authored and sijie committed Jun 14, 2018
1 parent 468242e commit 77502c8
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ public long getOffloadLedgerDeletionLagMillis() {

/**
* Size, in bytes, at which the managed ledger will start to automatically offload ledgers to longterm storage.
* A negative value disables autotriggering.
* A negative value disables autotriggering. A threshold of 0 offloads data as soon as possible.
* Offloading will not occur if no offloader has been set {@link #setLedgerOffloader(LedgerOffloader)}.
* Automatical offloading occurs when the ledger is rolled, and the ledgers up to that point exceed the threshold.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getOffloadAutoTriggerSizeThresholdBytes() > 0) {
if (config.getOffloadAutoTriggerSizeThresholdBytes() >= 0) {
executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,37 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
ledger.getLedgersInfoAsList().get(2).getLedgerId()));
}

@Test
public void offloadAsSoonAsClosed() throws Exception {

MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(0);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);

for (int i = 0; i < 11; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
Assert.assertEquals(offloader.offloadedLedgers(),
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));

for (int i = 0; i < 10; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
Assert.assertEquals(offloader.offloadedLedgers(),
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
ledger.getLedgersInfoAsList().get(1).getLedgerId()));
}


static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
// wait up to 3 seconds
for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
Expand Down

0 comments on commit 77502c8

Please sign in to comment.