Skip to content

Commit

Permalink
Support get applied ReplicatorDispatchRate (apache#9833)
Browse files Browse the repository at this point in the history
Master Issue: apache#9216

### Modifications
1. Add applied API for ReplicatorDispatchRate in topic-level
2. Add remove for ReplicatorDispatchRate in namespace-level

### Verifying this change
Verify the applied API and CMD
  • Loading branch information
315157973 authored Mar 8, 2021
1 parent 64ff4f3 commit bf60b4c
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,14 @@ protected DispatchRate subscriptionDispatchRate() {
);
}

protected DispatchRate replicatorDispatchRate() {
return new DispatchRate(
pulsar().getConfiguration().getDispatchThrottlingRatePerReplicatorInMsg(),
pulsar().getConfiguration().getDispatchThrottlingRatePerReplicatorInByte(),
1
);
}

protected SubscribeRate subscribeRate() {
return new SubscribeRate(
pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,23 @@ protected SubscribeRate internalGetSubscribeRate() {
}
}

protected void internalRemoveReplicatorDispatchRate() {
validateSuperUserAccess();
try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies) -> {
policies.replicatorDispatchRate.remove(pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
Expand All @@ -1290,14 +1307,7 @@ protected DispatchRate internalGetReplicatorDispatchRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
"replicator-Dispatch-rate is not configured for cluster "
+ pulsar().getConfiguration().getClusterName());
}
return policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2808,8 +2808,18 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<DispatchRate> internalGetReplicatorDispatchRate() {
return getTopicPolicies(topicName).map(TopicPolicies::getReplicatorDispatchRate);
protected CompletableFuture<DispatchRate> internalGetReplicatorDispatchRate(boolean applied) {
DispatchRate dispatchRate = getTopicPolicies(topicName)
.map(TopicPolicies::getReplicatorDispatchRate)
.orElseGet(() -> {
if (applied) {
DispatchRate namespacePolicy = getNamespacePolicies(namespaceName)
.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? replicatorDispatchRate() : namespacePolicy;
}
return null;
});
return CompletableFuture.completedFuture(dispatchRate);
}

protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,17 @@ public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
return internalGetSubscribeRate();
}

@DELETE
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Remove replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void removeReplicatorDispatchRate(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalRemoveReplicatorDispatchRate();
}

@POST
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1926,21 +1926,21 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR
public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<DispatchRate> dispatchRate = internalGetReplicatorDispatchRate();
if (dispatchRate.isPresent()) {
asyncResponse.resume(dispatchRate.get());
internalGetReplicatorDispatchRate(applied).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed get replicator dispatchRate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed get replicator dispatchRate", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
asyncResponse.resume(res);
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,38 @@ public void testReplicatorRateApi() throws Exception {
-> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
}

@Test(timeOut = 20000)
public void testGetReplicatorRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getReplicatorDispatchRate(topic));
assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace));
DispatchRate brokerDispatchRate = new DispatchRate(
pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInMsg(),
pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInByte(),
1
);
assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), brokerDispatchRate);
DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12);

admin.namespaces().setReplicatorDispatchRate(myNamespace, namespaceDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)));
assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), namespaceDispatchRate);

DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22);
admin.topics().setReplicatorDispatchRate(topic, topicDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getReplicatorDispatchRate(topic)));
assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), topicDispatchRate);

admin.namespaces().removeReplicatorDispatchRate(myNamespace);
admin.topics().removeReplicatorDispatchRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), brokerDispatchRate);
}

@Test(timeOut = 30000)
public void testAutoCreationDisabled() throws Exception {
cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,22 @@ CompletableFuture<Void> splitNamespaceBundleAsync(
*/
CompletableFuture<Void> setReplicatorDispatchRateAsync(String namespace, DispatchRate dispatchRate);

/**
* Remove replicator-message-dispatch-rate.
*
* @param namespace
* @throws PulsarAdminException
* Unexpected error
*/
void removeReplicatorDispatchRate(String namespace) throws PulsarAdminException;

/**
* Set replicator-message-dispatch-rate asynchronously.
*
* @param namespace
*/
CompletableFuture<Void> removeReplicatorDispatchRateAsync(String namespace);

/**
* Get replicator-message-dispatch-rate.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2466,6 +2466,23 @@ void setInactiveTopicPolicies(String topic
*/
CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic);

/**
* Get applied replicatorDispatchRate for the topic.
* @param topic
* @param applied
* @return
* @throws PulsarAdminException
*/
DispatchRate getReplicatorDispatchRate(String topic, boolean applied) throws PulsarAdminException;

/**
* Get applied replicatorDispatchRate asynchronously.
* @param topic
* @param applied
* @return
*/
CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic, boolean applied);

/**
* Remove replicatorDispatchRate for a topic.
* @param topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,27 @@ public CompletableFuture<Void> setReplicatorDispatchRateAsync(String namespace,
return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
}

@Override
public void removeReplicatorDispatchRate(String namespace) throws PulsarAdminException {
try {
removeReplicatorDispatchRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeReplicatorDispatchRateAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "replicatorDispatchRate");
return asyncDeleteRequest(path);
}

@Override
public DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3237,8 +3237,18 @@ public void failed(Throwable throwable) {

@Override
public DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException {
return getReplicatorDispatchRate(topic, false);
}

@Override
public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic) {
return getReplicatorDispatchRateAsync(topic, false);
}

@Override
public DispatchRate getReplicatorDispatchRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getReplicatorDispatchRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -3250,9 +3260,10 @@ public DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminEx
}

@Override
public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic) {
public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "replicatorDispatchRate");
path = path.queryParam("applied", applied);
final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<DispatchRate>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,15 @@ public void namespaces() throws Exception {
namespaces.run(split("delete-bookie-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).deleteBookieAffinityGroup("myprop/clust/ns1");

namespaces.run(split("set-replicator-dispatch-rate myprop/clust/ns1 -md 10 -bd 11 -dt 12"));
verify(mockNamespaces).setReplicatorDispatchRate("myprop/clust/ns1", new DispatchRate(10, 11, 12));

namespaces.run(split("get-replicator-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getReplicatorDispatchRate("myprop/clust/ns1");

namespaces.run(split("remove-replicator-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).removeReplicatorDispatchRate("myprop/clust/ns1");

namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");

Expand Down Expand Up @@ -803,8 +812,8 @@ public void topics() throws Exception {
cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1", true);

cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover"));
verify(mockTopics).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,19 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove replicator configured message-dispatch-rate " +
"for all topics of the namespace")
private class RemoveReplicatorDispatchRate extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeReplicatorDispatchRate(namespace);
}
}

@Parameters(commandDescription = "Get the backlog quota policies for a namespace")
private class GetBacklogQuotaMap extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
Expand Down Expand Up @@ -2099,6 +2112,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate());
jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate());
jcommander.addCommand("remove-replicator-dispatch-rate", new RemoveReplicatorDispatchRate());

jcommander.addCommand("clear-backlog", new ClearBacklog());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1879,10 +1879,13 @@ private class GetReplicatorDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"-ap", "--applied"}, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String topic = validatePersistentTopic(params);
print(getAdmin().topics().getReplicatorDispatchRate(topic));
print(getAdmin().topics().getReplicatorDispatchRate(topic, applied));
}
}

Expand Down

0 comments on commit bf60b4c

Please sign in to comment.