Skip to content

Commit

Permalink
[Issue 7759] Support set Max Consumer on topic level. (apache#7968)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanghaou authored Sep 8, 2020
1 parent 97ba09e commit 6eefee7
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2407,20 +2407,14 @@ protected CompletableFuture<Void> internalRemoveRetention() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected void internalGetPersistence(AsyncResponse asyncResponse){
protected Optional<PersistencePolicies> internalGetPersistence(){
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<PersistencePolicies> persistencePolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getPersistence);
if (!persistencePolicies.isPresent()) {
asyncResponse.resume(Response.noContent().build());
}else {
asyncResponse.resume(persistencePolicies.get());
}
return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
}

protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) {
Expand Down Expand Up @@ -2452,20 +2446,14 @@ protected CompletableFuture<Void> internalRemovePersistence() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected void internalGetMaxProducers(AsyncResponse asyncResponse) {
protected Optional<Integer> internalGetMaxProducers() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<Integer> maxProducers = getTopicPolicies(topicName)
.map(TopicPolicies::getMaxProducerPerTopic);
if (!maxProducers.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxProducers.get());
}
return getTopicPolicies(topicName).map(TopicPolicies::getMaxProducerPerTopic);
}

protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers) {
Expand Down Expand Up @@ -2499,6 +2487,47 @@ protected CompletableFuture<Void> internalRemoveMaxProducers() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected Optional<Integer> internalGetMaxConsumers() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumerPerTopic);
}

protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
if (maxConsumers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumers must be 0 or more");
}
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxConsumerPerTopic(maxConsumers);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemoveMaxConsumers() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.get().setMaxConsumerPerTopic(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected MessageId internalTerminate(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -1558,7 +1558,12 @@ public void getPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
internalGetPersistence(asyncResponse);
Optional<PersistencePolicies> persistencePolicies = internalGetPersistence();
if (!persistencePolicies.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(persistencePolicies.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
Expand Down Expand Up @@ -1640,7 +1645,12 @@ public void getMaxProducers(@Suspended final AsyncResponse asyncResponse,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
internalGetMaxProducers(asyncResponse);
Optional<Integer> maxProducers = internalGetMaxProducers();
if (!maxProducers.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxProducers.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
Expand Down Expand Up @@ -1706,6 +1716,90 @@ public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxConsumers")
@ApiOperation(value = "Get maxConsumers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<Integer> maxConsumers = internalGetMaxConsumers();
if (!maxConsumers.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxConsumers.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/maxConsumers")
@ApiOperation(value = "Set maxConsumers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
public void setMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The max consumers of the topic") int maxConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetMaxConsumers(maxConsumers).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
maxConsumers);
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxConsumers")
@ApiOperation(value = "Remove maxConsumers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalRemoveMaxConsumers().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxConsumers", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove max consumers: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}


@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,29 @@ protected boolean isProducersExceeded() {
}

protected boolean isConsumersExceededOnTopic() {
Policies policies;
try {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
Integer maxConsumers = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null) {
maxConsumers = topicPolicies.getMaxConsumerPerTopic();
}
if (maxConsumers == null) {
Policies policies;
try {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));

if (policies == null) {
if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = policies.max_consumers_per_topic > 0 ? policies.max_consumers_per_topic
final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers
: brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,22 @@ public void testMaxProducersDisabled() {
Assert.assertEquals(e.getStatusCode(), 405);
}
}

@Test
public void testMaxConsumersDisabled() {
log.info("MaxConsumers will set to the topic: {}", testTopic);
try {
admin.topics().setMaxConsumers(testTopic, 2);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}

try {
admin.topics().getMaxConsumers(testTopic);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
}
Loading

0 comments on commit 6eefee7

Please sign in to comment.