Skip to content

Commit

Permalink
Offload threshold policy for namespaces (apache#1973)
Browse files Browse the repository at this point in the history
Allow administrators to specify a offload threshold policy on a
namespace, which stipulates that once a topic in the namespace has a
certain amount of data on the pulsar cluster, start offloading some of
this data to longterm storage.

This patch also cleans up TestS3Offload, and adds offload status each
ledger in topic internal stats.

Master Issue: apache#1511
  • Loading branch information
ivankelly authored and sijie committed Jun 18, 2018
1 parent f221b26 commit b2a373d
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1570,5 +1570,42 @@ protected void internalSetCompactionThreshold(long newThreshold) {
}
}

protected long internalGetOffloadThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
return getNamespacePolicies(namespaceName).offload_threshold;
}

protected void internalSetOffloadThreshold(long newThreshold) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
policies.offload_threshold = newThreshold;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}",
clientAppId(), namespaceName, policies.compaction_threshold);

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update offloadThreshold configuration for namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update offloadThreshold configuration for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update offloadThreshold configuration for namespace {}",
clientAppId(), namespaceName, e);
throw new RestException(e);
}
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -746,5 +746,36 @@ public void setCompactionThreshold(@PathParam("property") String property,
internalSetCompactionThreshold(newThreshold);
}

@GET
@Path("/{property}/{cluster}/{namespace}/offloadThreshold")
@ApiOperation(value = "Maximum number of bytes stored on the pulsar cluster for a topic,"
+ " before the broker will start offloading to longterm storage",
notes = "A negative value disables automatic offloading")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
public long getOffloadThreshold(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetOffloadThreshold();
}

@PUT
@Path("/{property}/{cluster}/{namespace}/offloadThreshold")
@ApiOperation(value = "Set maximum number of bytes stored on the pulsar cluster for a topic,"
+ " before the broker will start offloading to longterm storage",
notes = "A negative value disables automatic offloading")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "offloadThreshold value is not valid") })
public void setOffloadThreshold(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
long newThreshold) {
validateNamespaceName(property, cluster, namespace);
internalSetOffloadThreshold(newThreshold);
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -686,5 +686,34 @@ public void setCompactionThreshold(@PathParam("property") String property,
internalSetCompactionThreshold(newThreshold);
}

@GET
@Path("/{property}/{namespace}/offloadThreshold")
@ApiOperation(value = "Maximum number of bytes stored on the pulsar cluster for a topic,"
+ " before the broker will start offloading to longterm storage",
notes = "A negative value disables automatic offloading")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
public long getOffloadThreshold(@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
return internalGetOffloadThreshold();
}

@PUT
@Path("/{property}/{namespace}/offloadThreshold")
@ApiOperation(value = "Set maximum number of bytes stored on the pulsar cluster for a topic,"
+ " before the broker will start offloading to longterm storage",
notes = "A negative value disables automatic offloading")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "offloadThreshold value is not valid") })
public void setOffloadThreshold(@PathParam("property") String property,
@PathParam("namespace") String namespace,
long newThreshold) {
validateNamespaceName(property, namespace);
internalSetOffloadThreshold(newThreshold);
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setOffloadLedgerDeletionLag(serviceConfig.getManagedLedgerOffloadDeletionLagMs(),
TimeUnit.MILLISECONDS);

policies.ifPresent(p -> managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold));

future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,7 @@ public PersistentTopicInternalStats getInternalStats() {
info.ledgerId = li.getLedgerId();
info.entries = li.getEntries();
info.size = li.getSize();
info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
stats.ledgers.add(info);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,4 +1164,52 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle,
*/
void setCompactionThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;

/**
* Get the offloadThreshold for a namespace. The maximum number of bytes stored on the pulsar cluster for topics
* in the namespace before data starts being offloaded to longterm storage.
*
* <p>
* Response example:
*
* <pre>
* <code>10000000</code>
* </pre>
*
* @param namespace
* Namespace name
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
long getOffloadThreshold(String namespace) throws PulsarAdminException;

/**
* Set the offloadThreshold for a namespace. The maximum number of bytes stored on the pulsar cluster for topics
* in the namespace before data starts being offloaded to longterm storage.
*
* Negative values disabled automatic offloading. Setting a threshold of 0 will offload data as soon as possible.
* <p>
* Request example:
*
* <pre>
* <code>10000000</code>
* </pre>
*
* @param namespace
* Namespace name
* @param offloadThreshold
* maximum number of bytes stored before offloading is triggered
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
void setOffloadThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,28 @@ public void setCompactionThreshold(String namespace, long compactionThreshold) t
}
}

@Override
public long getOffloadThreshold(String namespace) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "offloadThreshold");
return request(path).get(Long.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void setOffloadThreshold(String namespace, long offloadThreshold) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "offloadThreshold");
request(path).put(Entity.entity(offloadThreshold, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

private WebTarget namespacePath(NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
WebTarget namespacePath = base.path(namespace.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,38 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get offloadThreshold for a namespace")
private class GetOffloadThreshold extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(admin.namespaces().getOffloadThreshold(namespace));
}
}

@Parameters(commandDescription = "Set offloadThreshold for a namespace")
private class SetOffloadThreshold extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "--size", "-s" },
description = "Maximum number of bytes stored in the pulsar cluster for a topic before data will"
+ " start being automatically offloaded to longterm storage (eg: 10M, 16G, 3T, 100)."
+ " Negative values disable automatic offload."
+ " 0 triggers offloading as soon as possible.",
required = true)
private String threshold = "-1";

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().setOffloadThreshold(namespace, validateSizeString(threshold));
}
}

public CmdNamespaces(PulsarAdmin admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
Expand Down Expand Up @@ -857,5 +889,9 @@ public CmdNamespaces(PulsarAdmin admin) {

jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());

jcommander.addCommand("get-offload-threshold", new GetOffloadThreshold());
jcommander.addCommand("set-offload-threshold", new SetOffloadThreshold());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static class LedgerInfo {
public long ledgerId;
public long entries;
public long size;
public boolean offloaded;
}

public static class CursorStats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class Policies {
public int max_consumers_per_subscription = 0;

public long compaction_threshold = 0;
public long offload_threshold = -1;

@Override
public boolean equals(Object obj) {
Expand All @@ -78,7 +79,8 @@ public boolean equals(Object obj) {
&& max_producers_per_topic == other.max_producers_per_topic
&& max_consumers_per_topic == other.max_consumers_per_topic
&& max_consumers_per_subscription == other.max_consumers_per_subscription
&& compaction_threshold == other.compaction_threshold;
&& compaction_threshold == other.compaction_threshold
&& offload_threshold == other.offload_threshold;
}

return false;
Expand Down Expand Up @@ -109,6 +111,7 @@ public String toString() {
.add("max_producers_per_topic", max_producers_per_topic)
.add("max_consumers_per_topic", max_consumers_per_topic)
.add("max_consumers_per_subscription", max_consumers_per_topic)
.add("compaction_threshold", compaction_threshold).toString();
.add("compaction_threshold", compaction_threshold)
.add("offload_threshold", offload_threshold).toString();
}
}
Loading

0 comments on commit b2a373d

Please sign in to comment.