From f0d339e67b551a63224c537cfe9518dc5b99574b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Tue, 21 Jan 2020 22:57:07 +0800 Subject: [PATCH] Allow to enable/disable delayed delivery for messages on namespace (#5915) * Allow to enable/disable delyed delivery for messages on namespace Signed-off-by: xiaolong.ran * add isDelayedDeliveryEnabled function Signed-off-by: xiaolong.ran * add delayed_delivery_time process logic Signed-off-by: xiaolong.ran * add test case Signed-off-by: xiaolong.ran * update admin cli docs Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran * update import lib Signed-off-by: xiaolong.ran * avoid import * Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran * remove unuse code Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran * add test case for delayed delivery messages Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran * fix comments Signed-off-by: xiaolong.ran --- .../broker/admin/impl/NamespacesBase.java | 48 +++++++- .../pulsar/broker/admin/v2/Namespaces.java | 28 ++++- .../delayed/DelayedDeliveryTracker.java | 8 ++ .../InMemoryDelayedDeliveryTracker.java | 9 +- ...InMemoryDelayedDeliveryTrackerFactory.java | 1 + ...PersistentDispatcherMultipleConsumers.java | 7 +- .../service/persistent/PersistentTopic.java | 10 ++ .../broker/admin/AdminApiDelayedDelivery.java | 115 ++++++++++++++++++ .../pulsar/client/admin/Namespaces.java | 65 +++++++++- .../client/admin/internal/NamespacesImpl.java | 23 ++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 7 ++ .../pulsar/admin/cli/CmdNamespaces.java | 44 +++++++ .../data/DelayedDeliveryPolicies.java | 38 ++++++ .../pulsar/common/policies/data/Policies.java | 7 +- site2/docs/reference-pulsar-admin.md | 34 ++++++ 15 files changed, 434 insertions(+), 10 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index adc7bd4d502b6..11409cde663db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -75,6 +75,7 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -1475,6 +1476,51 @@ protected void internalModifyEncryptionRequired(boolean encryptionRequired) { } } + protected DelayedDeliveryPolicies internalGetDelayedDelivery() { + validateAdminAccessForTenant(namespaceName.getTenant()); + + Policies policies = getNamespacePolicies(namespaceName); + if (policies.delayed_delivery_policies == null) { + return new DelayedDeliveryPolicies(config().getDelayedDeliveryTickTimeMillis(), + config().isDelayedDeliveryEnabled()); + } else { + return policies.delayed_delivery_policies; + } + } + + protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) { + validateSuperUserAccess(); + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + + try { + Stat nodeStat = new Stat(); + final String path = path(POLICIES, namespaceName.toString()); + byte[] content = globalZk().getData(path, null, nodeStat); + Policies policies = jsonMapper().readValue(content, Policies.class); + + policies.delayed_delivery_policies = delayedDeliveryPolicies; + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); + policiesCache().invalidate(path(POLICIES, namespaceName.toString())); + log.info("[{}] Successfully updated delayed delivery messages configuration: namespace={}, map={}", clientAppId(), + namespaceName, jsonMapper().writeValueAsString(policies.retention_policies)); + + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: does not exist", clientAppId(), + namespaceName); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: concurrent modification", + clientAppId(), namespaceName); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (RestException pfe) { + throw pfe; + } catch (Exception e) { + log.error("[{}] Failed to update delayed delivery messages configuration for namespace {}", clientAppId(), namespaceName, + e); + throw new RestException(e); + } + } protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { validateAdminAccessForTenant(namespaceName.getTenant()); @@ -1700,7 +1746,7 @@ private void unsubscribe(NamespaceName nsName, String bundleRange, String subscr * * @param clusterName: * given cluster whose peer-clusters can't be present into replication-cluster list - * @param clusters: + * @param replicationClusters: * replication-cluster list */ private void validatePeerClusterConflict(String clusterName, Set replicationClusters) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 5bc71fc84ebf1..6bc5c234adf6e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -59,6 +59,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,7 +170,7 @@ public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 409, message = "Namespace is not empty") }) public Map> getPermissions(@PathParam("tenant") String tenant, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + @PathParam("namespace") String namespace) { validateAdminAccessForTenant(tenant); validateNamespaceName(tenant, namespace); @@ -652,7 +653,7 @@ public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") Stri @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist") }) public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { @@ -702,6 +703,29 @@ public void modifyEncryptionRequired(@PathParam("tenant") String tenant, internalModifyEncryptionRequired(encryptionRequired); } + @GET + @Path("/{tenant}/{namespace}/delayedDelivery") + @ApiOperation(value = "Get delayed delivery messages config on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), }) + public DelayedDeliveryPolicies getDelayedDeliveryPolicies(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + return internalGetDelayedDelivery(); + } + + @POST + @Path("/{tenant}/{namespace}/delayedDelivery") + @ApiOperation(value = "Set delayed delivery messages config on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), }) + public void setDelayedDeliveryPolicies(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, DelayedDeliveryPolicies deliveryPolicies) { + validateNamespaceName(tenant, namespace); + internalSetDelayedDelivery(deliveryPolicies); + } + @GET @Path("/{tenant}/{namespace}/maxProducersPerTopic") @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index 836a2ff57d528..e772d228f150f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -60,6 +60,14 @@ public interface DelayedDeliveryTracker extends AutoCloseable { */ Set getScheduledMessages(int maxMessages); + + /** + * Reset tick time use zk policies cache + * @param tickTime + * The tick time for when retrying on delayed delivery messages + */ + void resetTickTime(long tickTime); + /** * Close the subscription tracker and release all resources. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 447928486ab10..a702a010833c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -49,7 +49,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T // Timestamp at which the timeout is currently set private long currentTimeoutTarget; - private final long tickTimeMillis; + private long tickTimeMillis; private final Clock clock; @@ -127,6 +127,13 @@ public Set getScheduledMessages(int maxMessages) { return positions; } + @Override + public void resetTickTime(long tickTime) { + if (this.tickTimeMillis != tickTime){ + this.tickTimeMillis = tickTime; + } + } + @Override public long getNumberOfDelayedMessages() { return priorityQueue.size(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java index 71f0fc293d2e1..0ac27779d4d83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -22,6 +22,7 @@ import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.ServiceConfiguration; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 3660ce00c3c72..fee9cfe34522a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -82,7 +82,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected final RedeliveryTracker redeliveryTracker; private Optional delayedDeliveryTracker = Optional.empty(); - private final boolean isDelayedDeliveryEnabled; private volatile boolean havePendingRead = false; private volatile boolean havePendingReplayRead = false; @@ -119,8 +118,6 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() .getMaxUnackedMessagesPerSubscription(); - this.isDelayedDeliveryEnabled = topic.getBrokerService().pulsar().getConfiguration() - .isDelayedDeliveryEnabled(); this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); } @@ -724,7 +721,7 @@ public void initializeDispatchRateLimiterIfNeeded(Optional policies) { @Override public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { - if (!isDelayedDeliveryEnabled) { + if (!topic.delayedDeliveryEnabled) { // If broker has the feature disabled, always deliver messages immediately return false; } @@ -736,6 +733,7 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); } + delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis); return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); } } @@ -745,6 +743,7 @@ private synchronized Set getMessagesToReplayNow(int maxMessagesToR return messagesToRedeliver.items(maxMessagesToRead, (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { + delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis); return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); } else { return Collections.emptySet(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8d8f9644f5658..b177b116a735b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -99,6 +99,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -154,6 +155,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private Optional dispatchRateLimiter = Optional.empty(); private Optional subscribeRateLimiter = Optional.empty(); + public volatile long delayedDeliveryTickTimeMillis = 1000; + public volatile boolean delayedDeliveryEnabled = false; public static final int MESSAGE_RATE_BACKOFF_MS = 1000; protected final MessageDeduplication messageDeduplication; @@ -209,6 +212,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.subscriptions = new ConcurrentOpenHashMap<>(16, 1); this.replicators = new ConcurrentOpenHashMap<>(16, 1); USAGE_COUNT_UPDATER.set(this, 0); + this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled(); + this.delayedDeliveryTickTimeMillis = brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis(); initializeDispatchRateLimiterIfNeeded(Optional.empty()); @@ -1737,6 +1742,11 @@ public CompletableFuture onPoliciesUpdate(Policies data) { schemaValidationEnforced = data.schema_validation_enforced; + if (data.delayed_delivery_policies != null) { + delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime(); + delayedDeliveryEnabled = data.delayed_delivery_policies.isActive(); + } + initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data)); this.updateMaxPublishRate(data); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java new file mode 100644 index 0000000000000..f532d1d3b8bc6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDelayedDelivery.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import com.google.common.collect.Sets; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +@Slf4j +public class AdminApiDelayedDelivery extends MockedPulsarServiceBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(AdminApiDelayedDelivery.class); + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); + TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("delayed-delivery-messages", tenantInfo); + } + + @AfterMethod + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testDisableDelayedDelivery() throws Exception { + admin.namespaces().createNamespace("delayed-delivery-messages/default-ns"); + String namespace = "delayed-delivery-messages/default-ns"; + assertTrue(admin.namespaces().getDelayedDelivery(namespace).isActive()); + + DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, false); + admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies); + + assertFalse(admin.namespaces().getDelayedDelivery(namespace).isActive()); + assertEquals(2000, admin.namespaces().getDelayedDelivery(namespace).getTickTime()); + } + + @Test + public void testEnableDelayedDeliveryMessages() throws Exception { + admin.namespaces().createNamespace("delayed-delivery-messages/default-enable-service-conf"); + String namespace = "delayed-delivery-messages/default-enable-service-conf"; + String topicName = "persistent://delayed-delivery-messages/default-enable-service-conf/test"; + assertTrue(admin.namespaces().getDelayedDelivery(namespace).isActive()); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("test-sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + + for (int i = 0; i < 10; i++) { + producer.newMessage() + .value("delayed-msg-" + i) + .deliverAfter(5, TimeUnit.SECONDS) + .sendAsync(); + } + + producer.flush(); + + // Delayed messages might not come in same exact order + Set delayedMessages = new TreeSet<>(); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + delayedMessages.add(msg.getValue()); + consumer.acknowledge(msg); + } + + for (int i = 0; i < 10; i++) { + assertTrue(delayedMessages.contains("delayed-msg-" + i)); + } + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 8b858ba53f93f..19dea3ea455f4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -33,6 +33,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1152,7 +1153,69 @@ CompletableFuture clearNamespaceBundleBacklogForSubscriptionAsync(String n */ void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException; - /** + /** + * Get the delayed delivery messages for all topics within a namespace. + *

+ * If disabled, messages will be immediately delivered and there will + * be no tracking overhead. + *

+ * Request example: + * + *

+     * 
+     * {
+     *     "active" : true,   // Enable or disable delayed delivery for messages on a namespace
+     *     "tickTime" : 1000, // The tick time for when retrying on delayed delivery messages
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * @return delayedDeliveryPolicies + * Whether to enable the delayed delivery for messages. + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + DelayedDeliveryPolicies getDelayedDelivery(String namespace) throws PulsarAdminException; + + /** + * Set the delayed delivery messages for all topics within a namespace. + *

+ * If disabled, messages will be immediately delivered and there will + * be no tracking overhead. + *

+ * Request example: + * + *

+     * 
+     * {
+     *     "tickTime" : 1000, // Enable or disable delayed delivery for messages on a namespace
+     *     "active" : true,   // The tick time for when retrying on delayed delivery messages
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * @param delayedDeliveryPolicies + * Whether to enable the delayed delivery for messages. + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void setDelayedDeliveryMessages(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException; + + /** * Set the given subscription auth mode on all topics on a namespace * * @param namespace diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7ab7762c7612d..1d237eb5a0352 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -766,6 +767,28 @@ public void setEncryptionRequiredStatus(String namespace, boolean encryptionRequ } } + @Override + public DelayedDeliveryPolicies getDelayedDelivery(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "delayedDelivery"); + return request(path).get(DelayedDeliveryPolicies.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void setDelayedDeliveryMessages(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "delayedDelivery"); + request(path).post(Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index b62e4f473ffde..30c52cd7b2aed 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -62,6 +62,7 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -375,6 +376,12 @@ void namespaces() throws Exception { namespaces.run(split("get-retention myprop/clust/ns1")); verify(mockNamespaces).getRetention("myprop/clust/ns1"); + namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s")); + verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1", new DelayedDeliveryPolicies(1000, true)); + + namespaces.run(split("get-delayed-delivery myprop/clust/ns1")); + verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1"); + namespaces.run(split("clear-backlog myprop/clust/ns1 -force")); verify(mockNamespaces).clearNamespaceBacklog("myprop/clust/ns1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 3c56c732033ae..0913d651c28a2 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -883,6 +884,46 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get the delayed delivery policy for a namespace") + private class GetDelayedDelivery extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(admin.namespaces().getDelayedDelivery(namespace)); + } + } + + @Parameters(commandDescription = "Set the delayed delivery policy on a namespace") + private class SetDelayedDelivery extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Parameter(names = { "--enable", "-e" }, description = "Enable delayed delivery messages") + private boolean enable = false; + + @Parameter(names = { "--disable", "-d" }, description = "Disable delayed delivery messages") + private boolean disable = false; + + @Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on delayed delivery messages, " + + "affecting the accuracy of the delivery time compared to the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)") + private String delayedDeliveryTimeStr = "1s"; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + long delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr)); + + if (enable == disable) { + throw new ParameterException("Need to specify either --enable or --disable"); + } + + admin.namespaces().setDelayedDeliveryMessages(namespace, new DelayedDeliveryPolicies(delayedDeliveryTimeInMills, enable)); + } + } + @Parameters(commandDescription = "Set subscription auth mode on a namespace") private class SetSubscriptionAuthMode extends CliCommand { @Parameter(description = "tenant/namespace", required = true) @@ -1317,6 +1358,9 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("set-encryption-required", new SetEncryptionRequired()); jcommander.addCommand("set-subscription-auth-mode", new SetSubscriptionAuthMode()); + jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery()); + jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery()); + jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic()); jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic()); jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java new file mode 100644 index 0000000000000..797f090d83e7c --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * Definition of the delayed delivery policy. + */ +@Data +@AllArgsConstructor +@Setter +@Getter +@NoArgsConstructor +public class DelayedDeliveryPolicies { + private long tickTime; + private boolean active; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 8e55280fc8de9..8f247a682fc08 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -64,6 +64,8 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public boolean encryption_required = false; @SuppressWarnings("checkstyle:MemberName") + public DelayedDeliveryPolicies delayed_delivery_policies = null; + @SuppressWarnings("checkstyle:MemberName") public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None; @SuppressWarnings("checkstyle:MemberName") @@ -102,7 +104,8 @@ public int hashCode() { clusterSubscribeRate, deduplicationEnabled, persistence, bundles, latency_stats_sample_rate, message_ttl_in_seconds, retention_policies, - encryption_required, subscription_auth_mode, + encryption_required, delayed_delivery_policies, + subscription_auth_mode, antiAffinityGroup, max_producers_per_topic, max_consumers_per_topic, max_consumers_per_subscription, compaction_threshold, offload_threshold, @@ -132,6 +135,7 @@ public boolean equals(Object obj) { other.message_ttl_in_seconds) && Objects.equals(retention_policies, other.retention_policies) && Objects.equals(encryption_required, other.encryption_required) + && Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies) && Objects.equals(subscription_auth_mode, other.subscription_auth_mode) && Objects.equals(antiAffinityGroup, other.antiAffinityGroup) && max_producers_per_topic == other.max_producers_per_topic @@ -181,6 +185,7 @@ public String toString() { .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies) .add("deleted", deleted) .add("encryption_required", encryption_required) + .add("delayed_delivery_policies", delayed_delivery_policies) .add("subscription_auth_mode", subscription_auth_mode) .add("max_producers_per_topic", max_producers_per_topic) .add("max_consumers_per_topic", max_consumers_per_topic) diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index af8c88b50ad69..3cd80c6b1640c 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -860,6 +860,8 @@ Subcommands * `clear-backlog` * `unsubscribe` * `set-encryption-required` +* `set-delayed-delivery` +* `get-delayed-delivery` * `set-subscription-auth-mode` * `get-max-producers-per-topic` * `set-max-producers-per-topic` @@ -1338,6 +1340,38 @@ Options |`-d`, `--disable`|Disable message encryption required|false| |`-e`, `--enable`|Enable message encryption required|false| +### `set-delayed-delivery` +Set the delayed delivery policy on a namespace + +Usage +```bash +$ pulsar-admin namespaces set-delayed-delivery tenant/namespace options +``` + +Options + +|Flag|Description|Default| +|----|---|---| +|`-d`, `--disable`|Disable delayed delivery messages|false| +|`-e`, `--enable`|Enable delayed delivery messages|false| +|`-t`, `--time`|The tick time for when retrying on delayed delivery messages|1s| + + +### `get-delayed-delivery` +Get the delayed delivery policy on a namespace + +Usage +```bash +$ pulsar-admin namespaces get-delayed-delivery-time tenant/namespace +``` + +Options + +|Flag|Description|Default| +|----|---|---| +|`-t`, `--time`|The tick time for when retrying on delayed delivery messages|1s| + + ### `set-subscription-auth-mode` Set subscription auth mode on a namespace