Skip to content

Commit

Permalink
[broker] Allow for namespace default of offload threshold (apache#5872)
Browse files Browse the repository at this point in the history
Most namespace level configurations have corresponding cluster
configuration that set a namespace default.

The offload threshold does not, which makes it more difficult to ensure
that namespaces have the cluster wide namespace defaults.

There is one small wrinkle with this commit in that `-1` is used as a
sentinel value to indicate to use the cluster default, this means that
if the cluster default is to have offloading on and it is desired to
  disable a specific namespace, the namespace needs to set this value to
  some negative number other than `-1`!
  • Loading branch information
addisonj authored and sijie committed Jan 1, 2020
1 parent a26f67a commit 1aa3299
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 5 deletions.
8 changes: 6 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ dispatchThrottlingRatePerReplicatorInMsg=0
# Using a value of 0, is disabling replication message-byte dispatch-throttling
dispatchThrottlingRatePerReplicatorInByte=0

# Dispatch rate-limiting relative to publish rate.
# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# Dispatch rate-limiting relative to publish rate.
# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
dispatchThrottlingRateRelativeToPublishRate=false

Expand Down Expand Up @@ -579,6 +579,10 @@ managedLedgerMaxLedgerRolloverTimeMinutes=240
# and the ledger being deleted from bookkeeper (default is 4 hours)
managedLedgerOffloadDeletionLagMs=14400000

# The number of bytes before triggering automatic offload to long term storage
# (default is -1, which is disabled)
managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1

# Max number of entries to append to a cursor ledger
managedLedgerCursorMaxEntriesPerLedger=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " and the ledger being deleted from bookkeeper"
)
private long managedLedgerOffloadDeletionLagMs = TimeUnit.HOURS.toMillis(4);
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The number of bytes before triggering automatic offload to long term storage"
)
private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of entries to append to a cursor ledger"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ public long getOffloadThreshold(@PathParam("tenant") String tenant,
@Path("/{tenant}/{namespace}/offloadThreshold")
@ApiOperation(value = "Set maximum number of bytes stored on the pulsar cluster for a topic,"
+ " before the broker will start offloading to longterm storage",
notes = "A negative value disables automatic offloading")
notes = "-1 will revert to using the cluster default. A negative value disables automatic offloading. ")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,12 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
if (p.offload_deletion_lag_ms != null) {
lag = p.offload_deletion_lag_ms;
}
long bytes = serviceConfig.getManagedLedgerOffloadAutoTriggerSizeThresholdBytes();
if (p.offload_threshold != -1L) {
bytes = p.offload_threshold;
}
managedLedgerConfig.setOffloadLedgerDeletionLag(lag, TimeUnit.MILLISECONDS);
managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold);
managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(bytes);
});

future.complete(managedLedgerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
Expand Down Expand Up @@ -1072,6 +1073,46 @@ public void testSubscribeRate() throws Exception {
admin.tenants().deleteTenant("my-tenants");
}

@Test
public void testSetOffloadThreshold() throws Exception {
TopicName topicName = TopicName.get("persistent", this.testTenant, "offload", "offload-topic");
String namespace = topicName.getNamespaceObject().toString();
System.out.println(namespace);
// set a default
pulsar.getConfiguration().setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(1);
// create the namespace
admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster));
admin.topics().createNonPartitionedTopic(topicName.toString());

// assert we get the default which indicates it will fall back to default
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
// the ledger config should have the expected value
ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 1);

// set an override for the namespace
admin.namespaces().setOffloadThreshold(namespace, 100);
assertEquals(100, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 100);

// set another negative value to disable
admin.namespaces().setOffloadThreshold(namespace, -2);
assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), -2);

// set back to -1 and fall back to default
admin.namespaces().setOffloadThreshold(namespace, -1);
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 1);

// cleanup
admin.topics().delete(topicName.toString(), true);
admin.namespaces().deleteNamespace(namespace);
}

private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) throws Exception {
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
.getWebServiceUrl(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(namespace)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ private class SetOffloadThreshold extends CliCommand {
@Parameter(names = { "--size", "-s" },
description = "Maximum number of bytes stored in the pulsar cluster for a topic before data will"
+ " start being automatically offloaded to longterm storage (eg: 10M, 16G, 3T, 100)."
+ " -1 falls back to the cluster's namespace default."
+ " Negative values disable automatic offload."
+ " 0 triggers offloading as soon as possible.",
required = true)
Expand Down
4 changes: 3 additions & 1 deletion site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60|
|backlogQuotaDefaultLimitGB| Default per-topic backlog quota limit |10|
|allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true|
|allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned|
|allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned|
|defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if `allowAutoTopicCreationType` is partitioned |1|
|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true|
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60|
Expand Down Expand Up @@ -224,6 +224,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|loadManagerClassName| Name of load manager to use |org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl|
|managedLedgerOffloadDriver| Driver to use to offload old data to long term storage (Possible values: S3) ||
|managedLedgerOffloadMaxThreads| Maximum number of thread pool threads for ledger offloading |2|
|managedLedgerOffloadDeletionLagMs|Delay between a ledger being successfully offloaded to long term storage and the ledger being deleted from bookkeeper | 14400000|
|managedLedgerOffloadAutoTriggerSizeThresholdBytes|The number of bytes before triggering automatic offload to long term storage |-1 (disabled)|
|s3ManagedLedgerOffloadRegion| For Amazon S3 ledger offload, AWS region ||
|s3ManagedLedgerOffloadBucket| For Amazon S3 ledger offload, Bucket to place offloaded ledger into ||
|s3ManagedLedgerOffloadServiceEndpoint| For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) ||
Expand Down

0 comments on commit 1aa3299

Please sign in to comment.