Skip to content

Commit

Permalink
Support get applied SubscriptionDispatchRate (apache#9827)
Browse files Browse the repository at this point in the history
Master Issue: apache#9216

### Modifications
1. Add applied API for topic policies
2. Add remove API for namespace policies

### Verifying this change
testGetSubDispatchRateApplied
  • Loading branch information
315157973 authored Mar 8, 2021
1 parent c2ebf1b commit 4084585
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,6 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S
policies.topicDispatchRate.put(cluster, dispatchRate());
}

if (policies.subscriptionDispatchRate.isEmpty()) {
policies.subscriptionDispatchRate.put(cluster, subscriptionDispatchRate());
}

if (policies.clusterSubscribeRate.isEmpty()) {
policies.clusterSubscribeRate.put(cluster, subscribeRate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,6 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);

Entry<Policies, Stat> policiesNode = null;

try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies) -> {
Expand All @@ -1215,19 +1213,29 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
}
}

protected void internalDeleteSubscriptionDispatchRate() {
validateSuperUserAccess();

try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies) -> {
policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}",
clientAppId(), namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to delete the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected DispatchRate internalGetSubscriptionDispatchRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate =
policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
"Subscription-Dispatch-rate is not configured for cluster "
+ pulsar().getConfiguration().getClusterName());
}
return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
}

protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3704,8 +3704,18 @@ protected CompletableFuture<Void> internalRemoveDispatchRate() {

}

protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied) {
DispatchRate dispatchRate = getTopicPolicies(topicName)
.map(TopicPolicies::getSubscriptionDispatchRate)
.orElseGet(() -> {
if (applied) {
DispatchRate namespacePolicy = getNamespacePolicies(namespaceName)
.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? subscriptionDispatchRate() : namespacePolicy;
}
return null;
});
return CompletableFuture.completedFuture(dispatchRate);
}

protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,16 @@ public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tena
return internalGetSubscriptionDispatchRate();
}

@DELETE
@Path("/{tenant}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Delete Subscription dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void deleteSubscriptionDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalDeleteSubscriptionDispatchRate();
}

@POST
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2580,21 +2580,21 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
if (!dispatchRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
internalGetSubscriptionDispatchRate(applied).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed get subscription dispatchRate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed get subscription dispatchRate", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(dispatchRate.get());
asyncResponse.resume(res);
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));

policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));

assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,38 @@ public void testRetentionAppliedApi() throws Exception {
-> assertEquals(admin.topics().getRetention(topic, true), brokerPolicies));
}

@Test(timeOut = 20000)
public void testGetSubDispatchRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getSubscriptionDispatchRate(topic));
assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace));
DispatchRate brokerDispatchRate = new DispatchRate(
pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(),
1
);
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12);

admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), namespaceDispatchRate);

DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22);
admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionDispatchRate(topic)));
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), topicDispatchRate);

admin.namespaces().removeSubscriptionDispatchRate(myNamespace);
admin.topics().removeSubscriptionDispatchRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscriptionDispatchRate(topic)));
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
}

@Test(timeOut = 20000)
public void testRetentionPriority() throws Exception {
final String topic = testTopic + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));

policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));

assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"), policies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,20 @@ CompletableFuture<Void> splitNamespaceBundleAsync(
*/
CompletableFuture<SubscribeRate> getSubscribeRateAsync(String namespace);

/**
* Remove subscription-message-dispatch-rate.
* @param namespace
* @throws PulsarAdminException
*/
void removeSubscriptionDispatchRate(String namespace) throws PulsarAdminException;

/**
* Remove subscription-message-dispatch-rate asynchronously.
* @param namespace
* @return
*/
CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String namespace);

/**
* Set subscription-message-dispatch-rate.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2354,6 +2354,30 @@ void setInactiveTopicPolicies(String topic
*/
CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate);

/**
* Get applied subscription-message-dispatch-rate.
* <p/>
* Subscriptions under this namespace can dispatch this many messages per second.
*
* @param namespace
* @returns DispatchRate
* number of messages per second
* @throws PulsarAdminException
* Unexpected error
*/
DispatchRate getSubscriptionDispatchRate(String namespace, boolean applied) throws PulsarAdminException;

/**
* Get applied subscription-message-dispatch-rate asynchronously.
* <p/>
* Subscriptions under this namespace can dispatch this many messages per second.
*
* @param namespace
* @returns DispatchRate
* number of messages per second
*/
CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String namespace, boolean applied);

/**
* Get subscription-message-dispatch-rate for the topic.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,29 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public void removeSubscriptionDispatchRate(String namespace) throws PulsarAdminException {
try {
removeSubscriptionDispatchRateAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
return asyncDeleteRequest(path);
}


@Override
public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2456,9 +2456,9 @@ public CompletableFuture<Void> removeDispatchRateAsync(String topic) {
}

@Override
public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
public DispatchRate getSubscriptionDispatchRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getSubscriptionDispatchRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -2470,9 +2470,10 @@ public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdmin
}

@Override
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
path = path.queryParam("applied", applied);
final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<DispatchRate>() {
Expand All @@ -2489,6 +2490,16 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
return getSubscriptionDispatchRate(topic, false);
}

@Override
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
return getSubscriptionDispatchRateAsync(topic, false);
}

@Override
public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-subscription-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getSubscriptionDispatchRate("myprop/clust/ns1");

namespaces.run(split("remove-subscription-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).removeSubscriptionDispatchRate("myprop/clust/ns1");

namespaces.run(split("get-compaction-threshold myprop/clust/ns1"));
verify(mockNamespaces).getCompactionThreshold("myprop/clust/ns1");

Expand Down Expand Up @@ -790,6 +793,13 @@ public void topics() throws Exception {
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
verify(mockTopics).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false);

cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
verify(mockTopics).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", new DispatchRate(-1, -1, 2));
cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,19 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove subscription configured message-dispatch-rate " +
"for all topics of the namespace")
private class RemoveSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeSubscriptionDispatchRate(namespace);
}
}

@Parameters(commandDescription = "Get subscription configured message-dispatch-rate for all topics of the namespace (Disabled if value < 0)")
private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
Expand Down Expand Up @@ -2078,6 +2091,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());

jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1823,10 +1823,13 @@ private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getSubscriptionDispatchRate(persistentTopic));
print(getAdmin().topics().getSubscriptionDispatchRate(persistentTopic, applied));
}
}

Expand Down

0 comments on commit 4084585

Please sign in to comment.