diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 063120b686cf6..7170f0f262196 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -339,9 +339,6 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) { BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles); policies.bundles = bundleData != null ? bundleData : policies.bundles; - // hydrate the namespace polices - mergeNamespaceWithDefaults(policies, namespace, policyPath); - return policies; } catch (RestException re) { throw re; @@ -371,8 +368,6 @@ protected CompletableFuture getNamespacePoliciesAsync(NamespaceName na return FutureUtil.failedFuture(new RestException(e)); } policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles; - // hydrate the namespace polices - mergeNamespaceWithDefaults(policies.get(), namespace, policyPath); return CompletableFuture.completedFuture(policies.get()); }); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 7f582eac80475..0b5383823ffd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2089,17 +2089,17 @@ protected void internalSetMaxConsumersPerTopic(Integer maxConsumersPerTopic) { } } - protected int internalGetMaxConsumersPerSubscription() { + protected Integer internalGetMaxConsumersPerSubscription() { validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); return getNamespacePolicies(namespaceName).max_consumers_per_subscription; } - protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) { + protected void internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) { validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); try { - if (maxConsumersPerSubscription < 0) { + if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxConsumersPerSubscription must be 0 or more"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 72fc84078bd68..9afcb24043ef0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1138,7 +1138,7 @@ public void removeMaxConsumersPerTopic(@PathParam("tenant") String tenant, @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public int getMaxConsumersPerSubscription(@PathParam("tenant") String tenant, + public Integer getMaxConsumersPerSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); return internalGetMaxConsumersPerSubscription(); @@ -1160,6 +1160,19 @@ public void setMaxConsumersPerSubscription(@PathParam("tenant") String tenant, internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription); } + @DELETE + @Path("/{tenant}/{namespace}/maxConsumersPerSubscription") + @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid")}) + public void removeMaxConsumersPerSubscription(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalSetMaxConsumersPerSubscription(null); + } + @GET @Path("/{tenant}/{namespace}/maxUnackedMessagesPerConsumer") @ApiOperation(value = "Get maxUnackedMessagesPerConsumer config on a namespace.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index d57fc7bbac7de..76106773fa0e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -179,7 +179,9 @@ protected boolean isConsumersExceededOnSubscription(BrokerService brokerService, } if (maxConsumersPerSubscription == null) { - maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0 + maxConsumersPerSubscription = policies != null + && policies.max_consumers_per_subscription != null + && policies.max_consumers_per_subscription >= 0 ? policies.max_consumers_per_subscription : brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 7087c1a220ea4..cca01e0a7ab21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -609,6 +609,21 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages, in producer.close(); } + + @Test(timeOut = 20000) + public void testMaxConsumersOnSubApi() throws Exception { + final String namespace = "prop-xyz/ns1"; + assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace)); + admin.namespaces().setMaxConsumersPerSubscription(namespace, 10); + Awaitility.await().untilAsserted(() -> { + assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace)); + assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(), 10); + }); + admin.namespaces().removeMaxConsumersPerSubscription(namespace); + Awaitility.await().untilAsserted(() -> + admin.namespaces().getMaxConsumersPerSubscription(namespace)); + } + /** * It verifies that pulsar with different load-manager generates different load-report and returned by admin-api * diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 35527e1153773..023e604213b1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -1381,6 +1381,72 @@ public void testGetSubscribeRateApplied() throws Exception { assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy); } + @Test(timeOut = 30000) + public void testPriorityAndDisableMaxConsumersOnSub() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + int maxConsumerInBroker = 1; + int maxConsumerInNs = 2; + int maxConsumerInTopic = 4; + String mySub = "my-sub"; + conf.setMaxConsumersPerSubscription(maxConsumerInBroker); + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().until(() -> + pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + List> consumerList = new ArrayList<>(); + ConsumerBuilder builder = pulsarClient.newConsumer(Schema.STRING) + .subscriptionType(SubscriptionType.Shared) + .topic(topic).subscriptionName(mySub); + consumerList.add(builder.subscribe()); + try { + builder.subscribe(); + fail("should fail"); + } catch (PulsarClientException ignored) { + } + + admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerInNs); + Awaitility.await().untilAsserted(() -> + assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace))); + consumerList.add(builder.subscribe()); + try { + builder.subscribe(); + fail("should fail"); + } catch (PulsarClientException ignored) { + } + //disabled + admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0); + Awaitility.await().untilAsserted(() -> + assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), 0)); + consumerList.add(builder.subscribe()); + //set topic-level + admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerInTopic); + Awaitility.await().untilAsserted(() -> + assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic))); + consumerList.add(builder.subscribe()); + try { + builder.subscribe(); + fail("should fail"); + } catch (PulsarClientException ignored) { + } + //remove topic policies + admin.topics().removeMaxConsumersPerSubscription(topic); + Awaitility.await().untilAsserted(() -> + assertNull(admin.topics().getMaxConsumersPerSubscription(topic))); + consumerList.add(builder.subscribe()); + //remove namespace policies, then use broker-level + admin.namespaces().removeMaxConsumersPerSubscription(myNamespace); + Awaitility.await().untilAsserted(() -> + assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace))); + try { + builder.subscribe(); + fail("should fail"); + } catch (PulsarClientException ignored) { + } + + for (Consumer consumer : consumerList) { + consumer.close(); + } + } + @Test public void testRemoveSubscribeRate() throws Exception { admin.topics().createPartitionedTopic(persistenceTopic, 2); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 3c61ce897b855..703269a3477dd 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -2903,7 +2903,7 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription * @throws PulsarAdminException * Unexpected error */ - int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException; + Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException; /** * Get the maxConsumersPerSubscription for a namespace asynchronously. @@ -2958,6 +2958,20 @@ void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscription */ CompletableFuture setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription); + /** + * Remove maxConsumersPerSubscription for a namespace. + * @param namespace + * @throws PulsarAdminException + */ + void removeMaxConsumersPerSubscription(String namespace) throws PulsarAdminException; + + /** + * Remove maxConsumersPerSubscription for a namespace asynchronously. + * @param namespace + * @return + */ + CompletableFuture removeMaxConsumersPerSubscriptionAsync(String namespace); + /** * Get the maxUnackedMessagesPerConsumer for a namespace. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index fd2c4224a65e2..e1106450129cb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -2543,7 +2543,7 @@ public CompletableFuture removeMaxConsumersPerTopicAsync(String namespace) } @Override - public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException { + public Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException { try { return getMaxConsumersPerSubscriptionAsync(namespace). get(this.readTimeoutMs, TimeUnit.MILLISECONDS); @@ -2601,6 +2601,30 @@ public CompletableFuture setMaxConsumersPerSubscriptionAsync( return asyncPostRequest(path, Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON)); } + @Override + public void removeMaxConsumersPerSubscription(String namespace) + throws PulsarAdminException { + try { + removeMaxConsumersPerSubscriptionAsync(namespace) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeMaxConsumersPerSubscriptionAsync( + String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "maxConsumersPerSubscription"); + return asyncDeleteRequest(path); + } + @Override public Integer getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 7c03f2f0fcba2..93d547f9a5905 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -536,6 +536,9 @@ public void namespaces() throws Exception { namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1")); verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1"); + namespaces.run(split("remove-max-consumers-per-subscription myprop/clust/ns1")); + verify(mockNamespaces).removeMaxConsumersPerSubscription("myprop/clust/ns1"); + namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3")); verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index a749271a6b7b8..7a2e52df9a4f9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -1475,6 +1475,18 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Remove maxConsumersPerSubscription for a namespace") + private class RemoveMaxConsumersPerSubscription extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().removeMaxConsumersPerSubscription(namespace); + } + } + @Parameters(commandDescription = "Set maxConsumersPerSubscription for a namespace") private class SetMaxConsumersPerSubscription extends CliCommand { @Parameter(description = "tenant/namespace", required = true) @@ -2190,6 +2202,7 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription()); jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription()); + jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription()); jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription()); jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 184c58df1b643..72d6f6fc87f05 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -86,7 +86,7 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public Integer max_consumers_per_topic = null; @SuppressWarnings("checkstyle:MemberName") - public int max_consumers_per_subscription = 0; + public Integer max_consumers_per_subscription = null; @SuppressWarnings("checkstyle:MemberName") public Integer max_unacked_messages_per_consumer = null; @SuppressWarnings("checkstyle:MemberName") @@ -178,8 +178,8 @@ public boolean equals(Object obj) { && Objects.equals(max_consumers_per_topic, other.max_consumers_per_topic) && Objects.equals(max_unacked_messages_per_consumer, other.max_unacked_messages_per_consumer) && Objects.equals(max_unacked_messages_per_subscription, max_unacked_messages_per_subscription) - && max_consumers_per_subscription == other.max_consumers_per_subscription - && compaction_threshold == other.compaction_threshold + && Objects.equals(max_consumers_per_subscription, max_consumers_per_subscription) + && Objects.equals(compaction_threshold, compaction_threshold) && offload_threshold == other.offload_threshold && Objects.equals(offload_deletion_lag_ms, other.offload_deletion_lag_ms) && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy