From 4c508c586fb9dcd08d7e13dfe4aeca19a790f51c Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Tue, 22 Sep 2020 07:47:21 +0800 Subject: [PATCH] support subscription dispatch rate on topic level (#8087) ### Modifications Support set subscription dispatch rate on topic level. Support get subscription dispatch rate on topic level. Support remove subscription dispatch rate on topic level. --- .../admin/impl/PersistentTopicsBase.java | 42 ++++++ .../broker/admin/v2/PersistentTopics.java | 87 ++++++++++++ .../persistent/DispatchRateLimiter.java | 34 +++-- .../service/persistent/PersistentTopic.java | 16 ++- .../admin/TopicPoliciesDisableTest.java | 21 +++ .../broker/admin/TopicPoliciesTest.java | 128 ++++++++++++++++++ .../apache/pulsar/client/admin/Topics.java | 64 +++++++++ .../client/admin/internal/TopicsImpl.java | 76 +++++++++++ .../apache/pulsar/admin/cli/CmdTopics.java | 59 ++++++++ .../common/policies/data/TopicPolicies.java | 5 + 10 files changed, 518 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 749c493f63fb7..13c592e9de4cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3261,6 +3261,48 @@ protected CompletableFuture internalRemoveDispatchRate() { } + protected Optional internalGetSubscriptionDispatchRate() { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate); + } + + protected CompletableFuture internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + if (dispatchRate == null) { + return CompletableFuture.completedFuture(null); + } + TopicPolicies topicPolicies = getTopicPolicies(topicName) + .orElseGet(TopicPolicies::new); + topicPolicies.setSubscriptionDispatchRate(dispatchRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + + protected CompletableFuture internalRemoveSubscriptionDispatchRate() { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + Optional topicPolicies = getTopicPolicies(topicName); + if (!topicPolicies.isPresent()) { + return CompletableFuture.completedFuture(null); + } + topicPolicies.get().setSubscriptionDispatchRate(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + } + + protected Optional internalGetMaxConsumersPerSubscription() { validateAdminAccessForTenant(namespaceName.getTenant()); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 5e93c91adfcb9..67625384c8ec3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -2087,6 +2087,93 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse, }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate") + @ApiOperation(value = "Get subscription message dispatch rate configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + try { + Optional dispatchRate = internalGetSubscriptionDispatchRate(); + if (!dispatchRate.isPresent()) { + asyncResponse.resume(Response.noContent().build()); + } else { + asyncResponse.resume(dispatchRate.get()); + } + } catch (RestException e) { + asyncResponse.resume(e); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate") + @ApiOperation(value = "Set subscription message dispatch rate configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void setSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRate dispatchRate) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) -> { + if (ex instanceof RestException) { + log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName(), ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName()); + asyncResponse.resume(new RestException(ex)); + } else { + try { + log.info("[{}] Successfully set topic subscription dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + jsonMapper().writeValueAsString(dispatchRate)); + } catch (JsonProcessingException ignore) {} + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate") + @ApiOperation(value = "Remove subscription message dispatch rate configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> { + if (ex != null) { + log.error("Failed to remove topic: {} subscription dispatch rate", topicName.getLocalName(), ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/compactionThreshold") @ApiOperation(value = "Get compaction threshold configuration for specified topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 4cc711f0cb57f..29d11072e125b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -149,7 +149,7 @@ private DispatchRate createDispatchRate() { * default broker dispatch-throttling-rate */ public void updateDispatchRate() { - Optional dispatchRate = getSystemTopicDispatchRate(brokerService, topicName); + Optional dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type); if (!dispatchRate.isPresent()) { dispatchRate =Optional.ofNullable(getPoliciesDispatchRate(brokerService)); @@ -165,29 +165,37 @@ public void updateDispatchRate() { public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional policies, String topicName, Type type) { final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration(); - if (serviceConfig.isTopicLevelPoliciesEnabled() && type == Type.TOPIC) { - Optional dispatchRate = getSystemTopicDispatchRate(brokerService, topicName); - if (dispatchRate.isPresent()) { - return true; - } + Optional dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type); + if (dispatchRate.isPresent()) { + return true; } policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); return isDispatchRateNeeded(serviceConfig, policies, topicName, type); } - public static Optional getSystemTopicDispatchRate(BrokerService brokerService, String topicName) { + public static Optional getTopicPolicyDispatchRate(BrokerService brokerService, + String topicName, Type type) { Optional dispatchRate = Optional.empty(); final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration(); - if (serviceConfiguration.isTopicLevelPoliciesEnabled()) { + if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) { try { - dispatchRate = Optional.ofNullable(brokerService.pulsar() - .getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName))) - .map(TopicPolicies::getDispatchRate); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){ + switch (type) { + case TOPIC: + dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService() + .getTopicPolicies(TopicName.get(topicName))) + .map(TopicPolicies::getDispatchRate); + break; + case SUBSCRIPTION: + dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService() + .getTopicPolicies(TopicName.get(topicName))) + .map(TopicPolicies::getSubscriptionDispatchRate); + break; + } + } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { log.debug("Topic {} policies cache have not init.", topicName); } catch (Exception e) { - log.debug("[{}] Failed to get topic policies. Exception: {}", topicName, e); + log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f83677ef99617..91c8b7d222f2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1873,7 +1873,9 @@ public CompletableFuture onPoliciesUpdate(Policies data) { subscriptions.forEach((subName, sub) -> { sub.getConsumers().forEach(Consumer::checkPermissions); Dispatcher dispatcher = sub.getDispatcher(); - if (dispatcher != null) { + // If the topic-level policy already exists, the namespace-level policy cannot override + // the topic-level policy. + if (dispatcher != null && (topicPolicies == null || !topicPolicies.isSubscriptionDispatchRateSet())) { dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data)); } }); @@ -2389,6 +2391,18 @@ public void onUpdate(TopicPolicies policies) { } }); + subscriptions.forEach((subName, sub) -> { + sub.getConsumers().forEach(Consumer::checkPermissions); + Dispatcher dispatcher = sub.getDispatcher(); + if (policies.isSubscriptionDispatchRateSet()) { + dispatcher.getRateLimiter().ifPresent(rateLimiter -> + rateLimiter.updateDispatchRate(policies.getSubscriptionDispatchRate())); + } else { + dispatcher.getRateLimiter().ifPresent(rateLimiter -> + rateLimiter.updateDispatchRate()); + } + }); + if (policies.getPublishRate() != null) { topicPolicyPublishRate = policies.getPublishRate(); updateTopicPublishDispatcher(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java index f0f8d7968eb22..2309107216baa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java @@ -153,6 +153,27 @@ public void testDispatchRateDisabled() throws Exception { } } + @Test + public void testSubscriptionDispatchRateDisabled() throws Exception { + DispatchRate dispatchRate = new DispatchRate(1000, + 1020*1024, 1); + log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic); + + try { + admin.topics().setSubscriptionDispatchRate(testTopic, dispatchRate); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 405); + } + + try { + admin.topics().getSubscriptionDispatchRate(testTopic); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 405); + } + } + @Test public void testCompactionThresholdDisabled() { Long compactionThreshold = 10000L; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index c0e76552be917..803cc36e56154 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -560,6 +560,134 @@ public void testPolicyOverwrittenByNamespaceLevel() throws Exception { Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300); } + @Test + public void testGetSetSubscriptionDispatchRate() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + Producer producer = pulsarClient.newProducer().topic(topic).create(); + producer.close(); + Thread.sleep(3000); + + DispatchRate dispatchRate = new DispatchRate(1000, + 1024 * 1024, 1); + log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic); + + admin.topics().setSubscriptionDispatchRate(topic, dispatchRate); + log.info("Subscription dispatch rate set success on topic: {}", topic); + + Thread.sleep(3000); + + String subscriptionName = "test_subscription_rate"; + Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe(); + Thread.sleep(3000); + + DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) + .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); + Assert.assertNotNull(dispatchRateLimiter); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg); + + + DispatchRate getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic); + log.info("Subscription dispatch rate: {} get on topic: {}", getDispatchRate, topic); + Assert.assertEquals(getDispatchRate, dispatchRate); + + producer.close(); + admin.topics().delete(topic, true); + } + + @Test + public void testRemoveSubscriptionDispatchRate() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + Producer producer = pulsarClient.newProducer().topic(topic).create(); + producer.close(); + Thread.sleep(3000); + + DispatchRate dispatchRate = new DispatchRate(1000, + 1024 * 1024, 1); + log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic); + + admin.topics().setSubscriptionDispatchRate(topic, dispatchRate); + log.info("Subscription dispatch rate set success on topic: {}", topic); + + Thread.sleep(3000); + + String subscriptionName = "test_subscription_rate"; + Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe(); + Thread.sleep(3000); + + DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) + .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); + Assert.assertNotNull(dispatchRateLimiter); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg); + + DispatchRate getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic); + log.info("Subscription dispatch rate: {} get on topic: {}", getDispatchRate, topic); + + // remove subscription dispatch rate + admin.topics().removeSubscriptionDispatchRate(topic); + Thread.sleep(3000); + getDispatchRate = admin.topics().getSubscriptionDispatchRate(topic); + log.info("Subscription dispatch rate get on topic is {} after remove", getDispatchRate); + Assert.assertNull(getDispatchRate); + + dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) + .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); + Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg); + Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte); + + producer.close(); + admin.topics().delete(topic, true); + } + + @Test + public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + Producer producer = pulsarClient.newProducer().topic(topic).create(); + producer.close(); + Thread.sleep(3000); + + // set namespace level subscription dispatch rate + DispatchRate namespaceDispatchRate = new DispatchRate(100, 1024 * 1024, 1); + admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate); + Thread.sleep(3000); + + String subscriptionName = "test_subscription_rate"; + Consumer consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe(); + + // get subscription dispatch Rate limiter + DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) + .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte); + + // set topic level subscription dispatch rate + DispatchRate topicDispatchRate = new DispatchRate(200, 2 * 1024 * 1024, 1); + admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate); + Thread.sleep(3000); + + // get subscription dispatch rate limiter + dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get() + .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), topicDispatchRate.dispatchThrottlingRateInByte); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), topicDispatchRate.dispatchThrottlingRateInMsg); + + // remove topic level subscription dispatch rate limiter + admin.topics().removeSubscriptionDispatchRate(topic); + Thread.sleep(3000); + + // get subscription dispatch rate limiter + dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get() + .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte); + Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg); + + admin.topics().delete(topic, true); + } + @Test public void testGetSetCompactionThreshold() throws Exception { long compactionThreshold = 100000; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 1994ee2601582..e039083d98276 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2007,6 +2007,70 @@ void setInactiveTopicPolicies(String topic */ CompletableFuture removeDispatchRateAsync(String topic) throws PulsarAdminException; + /** + * Set subscription-message-dispatch-rate for the topic. + *

+ * Subscriptions under this namespace can dispatch this many messages per second + * + * @param topic + * @param dispatchRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException; + + /** + * Set subscription-message-dispatch-rate for the topic asynchronously. + *

+ * Subscriptions under this namespace can dispatch this many messages per second. + * + * @param topic + * @param dispatchRate + * number of messages per second + */ + CompletableFuture setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate); + + /** + * Get subscription-message-dispatch-rate for the topic. + *

+ * Subscriptions under this namespace can dispatch this many messages per second. + * + * @param topic + * @returns DispatchRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException; + + /** + * Get subscription-message-dispatch-rate asynchronously. + *

+ * Subscriptions under this namespace can dispatch this many messages per second. + * + * @param topic + * @returns DispatchRate + * number of messages per second + */ + CompletableFuture getSubscriptionDispatchRateAsync(String topic); + + /** + * Remove subscription-message-dispatch-rate for a topic. + * @param topic + * Topic name + * @throws PulsarAdminException + * Unexpected error + */ + void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException; + + /** + * Remove subscription-message-dispatch-rate for a topic asynchronously. + * @param topic + * Topic name + */ + CompletableFuture removeSubscriptionDispatchRateAsync(String topic); + /** * Get the compactionThreshold for a topic. The maximum number of bytes * can have before compaction is triggered. 0 disables. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index eb30d9c55c494..9238c076d9f7f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2164,6 +2164,82 @@ public CompletableFuture removeDispatchRateAsync(String topic) { return asyncDeleteRequest(path); } + @Override + public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException { + try { + return getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getSubscriptionDispatchRateAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "subscriptionDispatchRate"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(DispatchRate dispatchRate) { + future.complete(dispatchRate); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException { + try { + setSubscriptionDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "subscriptionDispatchRate"); + return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException { + try { + removeSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeSubscriptionDispatchRateAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "subscriptionDispatchRate"); + return asyncDeleteRequest(path); + } + @Override public Long getCompactionThreshold(String topic) throws PulsarAdminException { try { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 9649cad7fbfdb..1c6a131161000 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -133,9 +133,15 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("get-offload-policies", new GetOffloadPolicies()); jcommander.addCommand("set-offload-policies", new SetOffloadPolicies()); jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies()); + jcommander.addCommand("get-dispatch-rate", new GetDispatchRate()); jcommander.addCommand("set-dispatch-rate", new SetDispatchRate()); jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate()); + + jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate()); + jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate()); + jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate()); + jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold()); jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold()); jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold()); @@ -1478,6 +1484,59 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic") + private class GetSubscriptionDispatchRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getSubscriptionDispatchRate(persistentTopic)); + } + } + + @Parameters(commandDescription = "Set subscription message-dispatch-rate for a topic") + private class SetSubscriptionDispatchRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--msg-dispatch-rate", + "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false) + private int msgDispatchRate = -1; + + @Parameter(names = { "--byte-dispatch-rate", + "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false) + private long byteDispatchRate = -1; + + @Parameter(names = { "--dispatch-rate-period", + "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false) + private int dispatchRatePeriodSec = 1; + + @Parameter(names = { "--relative-to-publish-rate", + "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false) + private boolean relativeToPublishRate = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setSubscriptionDispatchRate(persistentTopic, + new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate)); + } + } + + @Parameters(commandDescription = "Remove subscription message-dispatch-rate for a topic") + private class RemoveSubscriptionDispatchRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeSubscriptionDispatchRate(persistentTopic); + } + } + @Parameters(commandDescription = "Get max number of producers for a topic") private class GetMaxProducers extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index fb785b6dff650..0e3440bf25f02 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -52,6 +52,7 @@ public class TopicPolicies { private OffloadPolicies offloadPolicies; private InactiveTopicPolicies inactiveTopicPolicies = null; private DispatchRate dispatchRate = null; + private DispatchRate subscriptionDispatchRate = null; private Long compactionThreshold = null; private PublishRate publishRate = null; private SubscribeRate subscribeRate = null; @@ -116,6 +117,10 @@ public boolean isDispatchRateSet() { return dispatchRate != null; } + public boolean isSubscriptionDispatchRateSet() { + return subscriptionDispatchRate != null; + } + public boolean isCompactionThresholdSet() { return compactionThreshold != null; }