Skip to content

Commit

Permalink
[Offload] Make the field name in OffloadPolicies match with config …
Browse files Browse the repository at this point in the history
…file (apache#8310)

Fixes apache#8220 

### Motivation

Currently, the fields' values couldn't be set properly by the Pulsar broker config file(`broker.conf` or `standalone.conf`). 

### Modifications

changed fields in `OffloadPolicies`

1. managedLedgerOffloadAutoTriggerSizeThresholdBytes
2. managedLedgerOffloadDeletionLagMs
  • Loading branch information
gaoran10 authored Oct 28, 2020
1 parent 54d6811 commit a8eaeb5
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1928,11 +1928,12 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null) {
if (config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
}
if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
}
}

Expand All @@ -1951,8 +1952,10 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
}
});

if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null) {
if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null) {
long threshold = config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes();

long sizeSummed = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ public void testOffloadDelete() throws Exception {
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(0, TimeUnit.MINUTES);
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(100L);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
ManagedCursor cursor = ledger.openCursor("foobar");
Expand Down Expand Up @@ -751,7 +751,7 @@ public void testAutoTriggerOffload() throws Exception {
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -787,7 +787,7 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -848,7 +848,7 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -899,7 +899,7 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down Expand Up @@ -934,7 +934,7 @@ public void offloadAsSoonAsClosed() throws Exception {
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
Expand Down Expand Up @@ -2766,11 +2767,7 @@ protected void internalSetOffloadThreshold(long newThreshold) {

Policies policies = jsonMapper().readValue(content, Policies.class);
if (policies.offload_policies == null) {
OffloadPolicies defaultPolicy = pulsar().getDefaultOffloader().getOffloadPolicies();
policies.offload_policies = defaultPolicy == null ? new OffloadPolicies() : defaultPolicy;
if (policies.offload_deletion_lag_ms != null) {
policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
}
policies.offload_policies = new OffloadPolicies();
}
policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(newThreshold);
policies.offload_threshold = newThreshold;
Expand Down Expand Up @@ -2817,13 +2814,8 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
byte[] content = globalZk().getData(path, null, nodeStat);

Policies policies = jsonMapper().readValue(content, Policies.class);

if (policies.offload_policies == null) {
OffloadPolicies defaultPolicy = pulsar().getDefaultOffloader().getOffloadPolicies();
policies.offload_policies = defaultPolicy == null ? new OffloadPolicies() : defaultPolicy;
if (policies.offload_threshold != -1) {
policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
}
policies.offload_policies = new OffloadPolicies();
}
policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
policies.offload_deletion_lag_ms = newDeletionLagMs;
Expand Down Expand Up @@ -2964,15 +2956,14 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
if (offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis() == null && OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS == null
|| offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis() != null
&& offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis().equals(OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
} else {
policies.offload_deletion_lag_ms = offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis();
}
if (offloadPolicies.getManagedLedgerOffloadThresholdInBytes() ==
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES) {
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) {
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
} else {
policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,10 +1215,15 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);

OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
topicLevelOffloadPolicies,
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(topicLevelOffloadPolicies);
LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
} catch (PulsarServerException e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,12 @@ public void testOffloadPolicies() throws Exception {
String region = "test-region";
String bucket = "test-bucket";
String endpoint = "test-endpoint";
long offloadThresholdInBytes = 0;
long offloadDeletionLagInMillis = 100L;

OffloadPolicies offload1 = OffloadPolicies.create(
driver, region, bucket, endpoint, 100, 100, -1, null);
driver, region, bucket, endpoint, 100, 100,
offloadThresholdInBytes, offloadDeletionLagInMillis);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
Expand Down Expand Up @@ -240,7 +243,7 @@ public void testOffload(boolean isPartitioned) throws Exception {
offloadPolicies.setOffloadersDirectory(".");
offloadPolicies.setManagedLedgerOffloadDriver("mock");
offloadPolicies.setManagedLedgerOffloadPrefetchRounds(10);
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(1024);
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(1024L);

LedgerOffloader topicOffloader = mock(LedgerOffloader.class);
when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ public void testSetOffloadThreshold() throws Exception {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-1);
new Long(-1));

// set an override for the namespace
admin.namespaces().setOffloadThreshold(namespace, 100);
Expand All @@ -1260,7 +1260,7 @@ public void testSetOffloadThreshold() throws Exception {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
100);
new Long(100));

// set another negative value to disable
admin.namespaces().setOffloadThreshold(namespace, -2);
Expand All @@ -1273,7 +1273,7 @@ public void testSetOffloadThreshold() throws Exception {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-2);
new Long(-2));

// set back to -1 and fall back to default
admin.namespaces().setOffloadThreshold(namespace, -1);
Expand All @@ -1286,7 +1286,7 @@ public void testSetOffloadThreshold() throws Exception {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-1);
new Long(-1));

// cleanup
admin.topics().delete(topicName.toString(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public void namespaces() throws Exception {
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024, 10000L));
10L * 1024 * 1024, 10000L));

namespaces.run(split("get-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1672,16 +1672,16 @@ && maxValueCheck("ReadBufferSize", readBufferSize, Integer.MAX_VALUE)) {
Long offloadAfterElapsed = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
if (positiveCheck("OffloadAfterElapsed", offloadAfterElapsed)
&& maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {
offloadAfterElapsedInMillis = new Long(offloadAfterElapsed);
offloadAfterElapsedInMillis = offloadAfterElapsed;
}
}

long offloadAfterThresholdInBytes = OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
Long offloadAfterThresholdInBytes = OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) {
long offloadAfterThreshold = validateSizeString(offloadAfterThresholdStr);
if (positiveCheck("OffloadAfterThreshold", offloadAfterThreshold)
&& maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)) {
offloadAfterThresholdInBytes = new Long(offloadAfterThreshold);
offloadAfterThresholdInBytes = offloadAfterThreshold;
}
}

Expand Down
Loading

0 comments on commit a8eaeb5

Please sign in to comment.