Skip to content

Commit

Permalink
support subscription dispatch rate on topic level (apache#8087)
Browse files Browse the repository at this point in the history
### Modifications
Support set subscription dispatch rate on topic level.
Support get subscription dispatch rate on topic level.
Support remove subscription dispatch rate on topic level.
  • Loading branch information
hangc0276 authored Sep 21, 2020
1 parent 1abb2b4 commit 4c508c5
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3261,6 +3261,48 @@ protected CompletableFuture<Void> internalRemoveDispatchRate() {

}

protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
}

protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
topicPolicies.setSubscriptionDispatchRate(dispatchRate);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.get().setSubscriptionDispatchRate(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}


protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,93 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
@ApiOperation(value = "Get subscription message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
if (!dispatchRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(dispatchRate.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
@ApiOperation(value = "Set subscription message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRate dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName());
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic subscription dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(dispatchRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
@ApiOperation(value = "Remove subscription message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Get compaction threshold configuration for specified topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private DispatchRate createDispatchRate() {
* default broker dispatch-throttling-rate
*/
public void updateDispatchRate() {
Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (!dispatchRate.isPresent()) {
dispatchRate =Optional.ofNullable(getPoliciesDispatchRate(brokerService));

Expand All @@ -165,29 +165,37 @@ public void updateDispatchRate() {
public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
String topicName, Type type) {
final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
if (serviceConfig.isTopicLevelPoliciesEnabled() && type == Type.TOPIC) {
Optional<DispatchRate> dispatchRate = getSystemTopicDispatchRate(brokerService, topicName);
if (dispatchRate.isPresent()) {
return true;
}
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (dispatchRate.isPresent()) {
return true;
}

policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
}

public static Optional<DispatchRate> getSystemTopicDispatchRate(BrokerService brokerService, String topicName) {
public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService,
String topicName, Type type) {
Optional<DispatchRate> dispatchRate = Optional.empty();
final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (serviceConfiguration.isTopicLevelPoliciesEnabled()) {
if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
try {
dispatchRate = Optional.ofNullable(brokerService.pulsar()
.getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getDispatchRate);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
switch (type) {
case TOPIC:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getDispatchRate);
break;
case SUBSCRIPTION:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getSubscriptionDispatchRate);
break;
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies cache have not init.", topicName);
} catch (Exception e) {
log.debug("[{}] Failed to get topic policies. Exception: {}", topicName, e);
log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,9 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
// If the topic-level policy already exists, the namespace-level policy cannot override
// the topic-level policy.
if (dispatcher != null && (topicPolicies == null || !topicPolicies.isSubscriptionDispatchRateSet())) {
dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
}
});
Expand Down Expand Up @@ -2389,6 +2391,18 @@ public void onUpdate(TopicPolicies policies) {
}
});

subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
if (policies.isSubscriptionDispatchRateSet()) {
dispatcher.getRateLimiter().ifPresent(rateLimiter ->
rateLimiter.updateDispatchRate(policies.getSubscriptionDispatchRate()));
} else {
dispatcher.getRateLimiter().ifPresent(rateLimiter ->
rateLimiter.updateDispatchRate());
}
});

if (policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
updateTopicPublishDispatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,27 @@ public void testDispatchRateDisabled() throws Exception {
}
}

@Test
public void testSubscriptionDispatchRateDisabled() throws Exception {
DispatchRate dispatchRate = new DispatchRate(1000,
1020*1024, 1);
log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);

try {
admin.topics().setSubscriptionDispatchRate(testTopic, dispatchRate);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}

try {
admin.topics().getSubscriptionDispatchRate(testTopic);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}
}

@Test
public void testCompactionThresholdDisabled() {
Long compactionThreshold = 10000L;
Expand Down
Loading

0 comments on commit 4c508c5

Please sign in to comment.