Skip to content

Commit

Permalink
Fix can not disable and remove max consumer per subscription (apache#…
Browse files Browse the repository at this point in the history
…10070)

### Motivation
1)Now, we cannot disable `MaxConsumersPerSubscription` in any level of Policy
2) The Namespace level MaxConsumersPerSubscription cannot be deleted as long as it is set
3)The default value of the namespace level is incorrect, and the broker level data will be returned
  • Loading branch information
315157973 authored Mar 31, 2021
1 parent 722a50e commit 4709f3a
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,6 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;

// hydrate the namespace polices
mergeNamespaceWithDefaults(policies, namespace, policyPath);

return policies;
} catch (RestException re) {
throw re;
Expand Down Expand Up @@ -371,8 +368,6 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
// hydrate the namespace polices
mergeNamespaceWithDefaults(policies.get(), namespace, policyPath);
return CompletableFuture.completedFuture(policies.get());
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2089,17 +2089,17 @@ protected void internalSetMaxConsumersPerTopic(Integer maxConsumersPerTopic) {
}
}

protected int internalGetMaxConsumersPerSubscription() {
protected Integer internalGetMaxConsumersPerSubscription() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
}

protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
protected void internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

try {
if (maxConsumersPerSubscription < 0) {
if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumersPerSubscription must be 0 or more");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ public void removeMaxConsumersPerTopic(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public int getMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
public Integer getMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxConsumersPerSubscription();
Expand All @@ -1160,6 +1160,19 @@ public void setMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
}

@DELETE
@Path("/{tenant}/{namespace}/maxConsumersPerSubscription")
@ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid")})
public void removeMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetMaxConsumersPerSubscription(null);
}

@GET
@Path("/{tenant}/{namespace}/maxUnackedMessagesPerConsumer")
@ApiOperation(value = "Get maxUnackedMessagesPerConsumer config on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
}

if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0
maxConsumersPerSubscription = policies != null
&& policies.max_consumers_per_subscription != null
&& policies.max_consumers_per_subscription >= 0
? policies.max_consumers_per_subscription :
brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,21 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages, in
producer.close();
}


@Test(timeOut = 20000)
public void testMaxConsumersOnSubApi() throws Exception {
final String namespace = "prop-xyz/ns1";
assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
admin.namespaces().setMaxConsumersPerSubscription(namespace, 10);
Awaitility.await().untilAsserted(() -> {
assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(), 10);
});
admin.namespaces().removeMaxConsumersPerSubscription(namespace);
Awaitility.await().untilAsserted(() ->
admin.namespaces().getMaxConsumersPerSubscription(namespace));
}

/**
* It verifies that pulsar with different load-manager generates different load-report and returned by admin-api
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,72 @@ public void testGetSubscribeRateApplied() throws Exception {
assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy);
}

@Test(timeOut = 30000)
public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
final String topic = testTopic + UUID.randomUUID();
int maxConsumerInBroker = 1;
int maxConsumerInNs = 2;
int maxConsumerInTopic = 4;
String mySub = "my-sub";
conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().until(() ->
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
List<Consumer<String>> consumerList = new ArrayList<>();
ConsumerBuilder<String> builder = pulsarClient.newConsumer(Schema.STRING)
.subscriptionType(SubscriptionType.Shared)
.topic(topic).subscriptionName(mySub);
consumerList.add(builder.subscribe());
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}

admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerInNs);
Awaitility.await().untilAsserted(() ->
assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
consumerList.add(builder.subscribe());
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}
//disabled
admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0);
Awaitility.await().untilAsserted(() ->
assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), 0));
consumerList.add(builder.subscribe());
//set topic-level
admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerInTopic);
Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic)));
consumerList.add(builder.subscribe());
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}
//remove topic policies
admin.topics().removeMaxConsumersPerSubscription(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin.topics().getMaxConsumersPerSubscription(topic)));
consumerList.add(builder.subscribe());
//remove namespace policies, then use broker-level
admin.namespaces().removeMaxConsumersPerSubscription(myNamespace);
Awaitility.await().untilAsserted(() ->
assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
try {
builder.subscribe();
fail("should fail");
} catch (PulsarClientException ignored) {
}

for (Consumer<String> consumer : consumerList) {
consumer.close();
}
}

@Test
public void testRemoveSubscribeRate() throws Exception {
admin.topics().createPartitionedTopic(persistenceTopic, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2903,7 +2903,7 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription
* @throws PulsarAdminException
* Unexpected error
*/
int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;

/**
* Get the maxConsumersPerSubscription for a namespace asynchronously.
Expand Down Expand Up @@ -2958,6 +2958,20 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription
*/
CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription);

/**
* Remove maxConsumersPerSubscription for a namespace.
* @param namespace
* @throws PulsarAdminException
*/
void removeMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;

/**
* Remove maxConsumersPerSubscription for a namespace asynchronously.
* @param namespace
* @return
*/
CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String namespace);

/**
* Get the maxUnackedMessagesPerConsumer for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2543,7 +2543,7 @@ public CompletableFuture<Void> removeMaxConsumersPerTopicAsync(String namespace)
}

@Override
public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
public Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
try {
return getMaxConsumersPerSubscriptionAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -2601,6 +2601,30 @@ public CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(
return asyncPostRequest(path, Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON));
}

@Override
public void removeMaxConsumersPerSubscription(String namespace)
throws PulsarAdminException {
try {
removeMaxConsumersPerSubscriptionAsync(namespace)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(
String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
return asyncDeleteRequest(path);
}

@Override
public Integer getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");

namespaces.run(split("remove-max-consumers-per-subscription myprop/clust/ns1"));
verify(mockNamespaces).removeMaxConsumersPerSubscription("myprop/clust/ns1");

namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3"));
verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,18 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove maxConsumersPerSubscription for a namespace")
private class RemoveMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeMaxConsumersPerSubscription(namespace);
}
}

@Parameters(commandDescription = "Set maxConsumersPerSubscription for a namespace")
private class SetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
Expand Down Expand Up @@ -2190,6 +2202,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());

jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public Integer max_consumers_per_topic = null;
@SuppressWarnings("checkstyle:MemberName")
public int max_consumers_per_subscription = 0;
public Integer max_consumers_per_subscription = null;
@SuppressWarnings("checkstyle:MemberName")
public Integer max_unacked_messages_per_consumer = null;
@SuppressWarnings("checkstyle:MemberName")
Expand Down Expand Up @@ -178,8 +178,8 @@ public boolean equals(Object obj) {
&& Objects.equals(max_consumers_per_topic, other.max_consumers_per_topic)
&& Objects.equals(max_unacked_messages_per_consumer, other.max_unacked_messages_per_consumer)
&& Objects.equals(max_unacked_messages_per_subscription, max_unacked_messages_per_subscription)
&& max_consumers_per_subscription == other.max_consumers_per_subscription
&& compaction_threshold == other.compaction_threshold
&& Objects.equals(max_consumers_per_subscription, max_consumers_per_subscription)
&& Objects.equals(compaction_threshold, compaction_threshold)
&& offload_threshold == other.offload_threshold
&& Objects.equals(offload_deletion_lag_ms, other.offload_deletion_lag_ms)
&& schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy
Expand Down

0 comments on commit 4709f3a

Please sign in to comment.