Skip to content

Commit

Permalink
[ISSUE 7758] Support set Max Producer on topic level. (apache#7914)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanghaou authored Sep 3, 2020
1 parent 8377396 commit 2e88fdb
Show file tree
Hide file tree
Showing 8 changed files with 477 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,7 @@ protected void internalGetPersistence(AsyncResponse asyncResponse){
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<PersistencePolicies> persistencePolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getPersistence);
if (!persistencePolicies.isPresent()) {
Expand All @@ -2346,6 +2347,7 @@ protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies per
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
validatePersistencePolicies(persistencePolicies);

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
Expand All @@ -2359,6 +2361,7 @@ protected CompletableFuture<Void> internalRemovePersistence() {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -2367,6 +2370,53 @@ protected CompletableFuture<Void> internalRemovePersistence() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected void internalGetMaxProducers(AsyncResponse asyncResponse) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<Integer> maxProducers = getTopicPolicies(topicName)
.map(TopicPolicies::getMaxProducerPerTopic);
if (!maxProducers.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxProducers.get());
}
}

protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
if (maxProducers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducers must be 0 or more");
}
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxProducerPerTopic(maxProducers);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemoveMaxProducers() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.get().setMaxProducerPerTopic(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected MessageId internalTerminate(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,85 @@ public void removePersistence(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Get maxProducers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxProducers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
internalGetMaxProducers(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Set maxProducers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of maxProducers")})
public void setMaxProducers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The max producers of the topic") int maxProducers) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetMaxProducers(maxProducers).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
maxProducers);
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Remove maxProducers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalRemoveMaxProducers().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxProducers", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove max producers: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}


@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,24 @@ public AbstractTopic(String topic, BrokerService brokerService) {
}

protected boolean isProducersExceeded() {
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of producers: {}", topic,
e.getMessage());
policies = new Policies();
Integer maxProducers = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null) {
maxProducers = topicPolicies.getMaxProducerPerTopic();
}

if (maxProducers == null) {
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
policies = new Policies();
}
maxProducers = policies.max_producers_per_topic;
}
final int maxProducers = policies.max_producers_per_topic > 0 ?
policies.max_producers_per_topic :
brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
maxProducers = maxProducers > 0 ? maxProducers : brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,22 @@ public void testPublishRateDisabled() throws Exception {
Assert.assertEquals(e.getStatusCode(), 405);
}
}

@Test
public void testMaxProducersDisabled() {
log.info("MaxProducers will set to the topic: {}", testTopic);
try {
admin.topics().setMaxProducers(testTopic, 2);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}

try {
admin.topics().getMaxProducers(testTopic);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
}
Loading

0 comments on commit 2e88fdb

Please sign in to comment.