Skip to content

Commit

Permalink
Fix maxConsumersPerTopic cannot be disabled at the namespace level (a…
Browse files Browse the repository at this point in the history
…pache#9214)

Master Issue: apache#9146

### Motivation
`maxConsumersPerTopic` cannot be disabled at the namespace level

### Modifications
1.fixbug
2.add remove API

### Verifying this change
unit test
testMaxConsumersPerTopicUnlimited
  • Loading branch information
315157973 authored Jan 19, 2021
1 parent aaa27c3 commit 6c8c127
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,6 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S

final ServiceConfiguration config = pulsar().getConfiguration();

if (policies.max_consumers_per_topic < 1) {
policies.max_consumers_per_topic = config.getMaxConsumersPerTopic();
}

if (policies.max_consumers_per_subscription < 1) {
policies.max_consumers_per_subscription = config.getMaxConsumersPerSubscription();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2610,12 +2610,12 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) {
}
}

protected int internalGetMaxConsumersPerTopic() {
protected Integer internalGetMaxConsumersPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_topic;
}

protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
protected void internalSetMaxConsumersPerTopic(Integer maxConsumersPerTopic) {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

Expand All @@ -2624,7 +2624,7 @@ protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
if (maxConsumersPerTopic < 0) {
if (maxConsumersPerTopic != null && maxConsumersPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumersPerTopic must be 0 or more");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ public void setMaxProducersPerTopic(@PathParam("property") String property, @Pat
@ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public int getMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
public Integer getMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetMaxConsumersPerTopic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ public void setDeduplicationSnapshotInterval(@PathParam("tenant") String tenant
@ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public int getMaxConsumersPerTopic(@PathParam("tenant") String tenant,
public Integer getMaxConsumersPerTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxConsumersPerTopic();
Expand All @@ -1026,6 +1026,18 @@ public void setMaxConsumersPerTopic(@PathParam("tenant") String tenant, @PathPar
internalSetMaxConsumersPerTopic(maxConsumersPerTopic);
}

@DELETE
@Path("/{tenant}/{namespace}/maxConsumersPerTopic")
@ApiOperation(value = "Remove maxConsumersPerTopic configuration on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeMaxConsumersPerTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetMaxConsumersPerTopic(null);
}

@GET
@Path("/{tenant}/{namespace}/maxConsumersPerSubscription")
@ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ protected boolean isConsumersExceededOnTopic() {
}
maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
: brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void setup() throws Exception {
public void cleanup() throws Exception {
super.internalCleanup();
mockPulsarSetup.cleanup();
resetConfig();
}

@DataProvider(name = "topicType")
Expand Down Expand Up @@ -1742,5 +1743,62 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
}
}

@Test
public void testMaxConsumersPerTopicUnlimited() throws Exception {
final int maxConsumersPerTopic = 1;
super.internalCleanup();
mockPulsarSetup.cleanup();
conf.setMaxConsumersPerTopic(maxConsumersPerTopic);
super.internalSetup();
//init namespace
admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited";

assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
}

admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null);
try {
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException ignore) {
assertTrue(ignore.getMessage().contains("Topic reached max consumers limit"));
}
//set the limit to 3
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3);
// should success
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
try {
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException ignore) {
assertTrue(ignore.getMessage().contains("Topic reached max consumers limit"));
}

//clean up
for (Consumer<byte[]> subConsumer : consumers) {
subConsumer.close();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ public void testCheckMaxConsumers() throws Exception {
public void testSetMaxConsumers() throws Exception {
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1);
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxConsumersPerTopic(myNamespace), 1));
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxConsumersPerTopic(myNamespace).intValue(), 1));
log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace);
Integer maxConsumers = 2;
log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2615,7 +2615,7 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription
* @throws PulsarAdminException
* Unexpected error
*/
int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException;
Integer getMaxConsumersPerTopic(String namespace) throws PulsarAdminException;

/**
* Get the maxProducersPerTopic for a namespace asynchronously.
Expand Down Expand Up @@ -2670,6 +2670,20 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription
*/
CompletableFuture<Void> setMaxConsumersPerTopicAsync(String namespace, int maxConsumersPerTopic);

/**
* Remove maxConsumersPerTopic for a namespace.
* @param namespace
* @throws PulsarAdminException
*/
void removeMaxConsumersPerTopic(String namespace) throws PulsarAdminException;

/**
* Remove maxConsumersPerTopic for a namespace asynchronously.
* @param namespace
* @return
*/
CompletableFuture<Void> removeMaxConsumersPerTopicAsync(String namespace);

/**
* Get the maxConsumersPerSubscription for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2197,7 +2197,7 @@ public CompletableFuture<Void> removeMaxProducersPerTopicAsync(String namespace)
}

@Override
public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
public Integer getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
try {
return getMaxConsumersPerTopicAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -2253,6 +2253,28 @@ public CompletableFuture<Void> setMaxConsumersPerTopicAsync(String namespace, in
return asyncPostRequest(path, Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON));
}

@Override
public void removeMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
try {
removeMaxConsumersPerTopicAsync(namespace)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeMaxConsumersPerTopicAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
return asyncDeleteRequest(path);
}

@Override
public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ public void namespaces() throws Exception {
namespaces.run(split("set-max-consumers-per-topic myprop/clust/ns1 -c 2"));
verify(mockNamespaces).setMaxConsumersPerTopic("myprop/clust/ns1", 2);

namespaces.run(split("remove-max-consumers-per-topic myprop/clust/ns1"));
verify(mockNamespaces).removeMaxConsumersPerTopic("myprop/clust/ns1");

namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,18 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove max consumers per topic for a namespace")
private class RemoveMaxConsumersPerTopic extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().removeMaxConsumersPerTopic(namespace);
}
}

@Parameters(commandDescription = "Get maxConsumersPerSubscription for a namespace")
private class GetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
Expand Down Expand Up @@ -1995,6 +2007,8 @@ public CmdNamespaces(PulsarAdmin admin) {

jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
jcommander.addCommand("set-max-consumers-per-topic", new SetMaxConsumersPerTopic());
jcommander.addCommand("remove-max-consumers-per-topic", new RemoveMaxConsumersPerTopic());

jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public Integer max_producers_per_topic = null;
@SuppressWarnings("checkstyle:MemberName")
public int max_consumers_per_topic = 0;
public Integer max_consumers_per_topic = null;
@SuppressWarnings("checkstyle:MemberName")
public int max_consumers_per_subscription = 0;
@SuppressWarnings("checkstyle:MemberName")
Expand Down Expand Up @@ -169,7 +169,7 @@ public boolean equals(Object obj) {
&& Objects.equals(inactive_topic_policies, other.inactive_topic_policies)
&& Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
&& Objects.equals(max_producers_per_topic, other.max_producers_per_topic)
&& max_consumers_per_topic == other.max_consumers_per_topic
&& Objects.equals(max_consumers_per_topic, other.max_consumers_per_topic)
&& max_consumers_per_subscription == other.max_consumers_per_subscription
&& max_unacked_messages_per_consumer == other.max_unacked_messages_per_consumer
&& max_unacked_messages_per_subscription == other.max_unacked_messages_per_subscription
Expand Down

0 comments on commit 6c8c127

Please sign in to comment.