Skip to content

Commit

Permalink
Allow to enable/disable delayed delivery for messages on namespace (a…
Browse files Browse the repository at this point in the history
…pache#5915)

* Allow to enable/disable delyed delivery for messages on namespace

Signed-off-by: xiaolong.ran <[email protected]>

* add isDelayedDeliveryEnabled function

Signed-off-by: xiaolong.ran <[email protected]>

* add delayed_delivery_time process logic

Signed-off-by: xiaolong.ran <[email protected]>

* add test case

Signed-off-by: xiaolong.ran <[email protected]>

* update admin cli docs

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* update import lib

Signed-off-by: xiaolong.ran <[email protected]>

* avoid import *

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* remove unuse code

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* add test case for delayed delivery messages

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored Jan 21, 2020
1 parent d1afdf9 commit f0d339e
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -1475,6 +1476,51 @@ protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
}
}

protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
if (policies.delayed_delivery_policies == null) {
return new DelayedDeliveryPolicies(config().getDelayedDeliveryTickTimeMillis(),
config().isDelayedDeliveryEnabled());
} else {
return policies.delayed_delivery_policies;
}
}

protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
validateSuperUserAccess();
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);

policies.delayed_delivery_policies = delayedDeliveryPolicies;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated delayed delivery messages configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(policies.retention_policies));

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update delayed delivery messages configuration for namespace {}", clientAppId(), namespaceName,
e);
throw new RestException(e);
}
}

protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateAdminAccessForTenant(namespaceName.getTenant());
Expand Down Expand Up @@ -1700,7 +1746,7 @@ private void unsubscribe(NamespaceName nsName, String bundleRange, String subscr
*
* @param clusterName:
* given cluster whose peer-clusters can't be present into replication-cluster list
* @param clusters:
* @param replicationClusters:
* replication-cluster list
*/
private void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -169,7 +170,7 @@ public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
public Map<String, Set<AuthAction>> getPermissions(@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace) {
validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);

Expand Down Expand Up @@ -652,7 +653,7 @@ public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") Stri
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
Expand Down Expand Up @@ -702,6 +703,29 @@ public void modifyEncryptionRequired(@PathParam("tenant") String tenant,
internalModifyEncryptionRequired(encryptionRequired);
}

@GET
@Path("/{tenant}/{namespace}/delayedDelivery")
@ApiOperation(value = "Get delayed delivery messages config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"), })
public DelayedDeliveryPolicies getDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetDelayedDelivery();
}

@POST
@Path("/{tenant}/{namespace}/delayedDelivery")
@ApiOperation(value = "Set delayed delivery messages config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
public void setDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, DelayedDeliveryPolicies deliveryPolicies) {
validateNamespaceName(tenant, namespace);
internalSetDelayedDelivery(deliveryPolicies);
}

@GET
@Path("/{tenant}/{namespace}/maxProducersPerTopic")
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
*/
Set<PositionImpl> getScheduledMessages(int maxMessages);


/**
* Reset tick time use zk policies cache
* @param tickTime
* The tick time for when retrying on delayed delivery messages
*/
void resetTickTime(long tickTime);

/**
* Close the subscription tracker and release all resources.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;

private final long tickTimeMillis;
private long tickTimeMillis;

private final Clock clock;

Expand Down Expand Up @@ -127,6 +127,13 @@ public Set<PositionImpl> getScheduledMessages(int maxMessages) {
return positions;
}

@Override
public void resetTickTime(long tickTime) {
if (this.tickTimeMillis != tickTime){
this.tickTimeMillis = tickTime;
}
}

@Override
public long getNumberOfDelayedMessages() {
return priorityQueue.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected final RedeliveryTracker redeliveryTracker;

private Optional<DelayedDeliveryTracker> delayedDeliveryTracker = Optional.empty();
private final boolean isDelayedDeliveryEnabled;

private volatile boolean havePendingRead = false;
private volatile boolean havePendingReplayRead = false;
Expand Down Expand Up @@ -119,8 +118,6 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
.getMaxUnackedMessagesPerSubscription();
this.isDelayedDeliveryEnabled = topic.getBrokerService().pulsar().getConfiguration()
.isDelayedDeliveryEnabled();
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
}

Expand Down Expand Up @@ -724,7 +721,7 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (!isDelayedDeliveryEnabled) {
if (!topic.delayedDeliveryEnabled) {
// If broker has the feature disabled, always deliver messages immediately
return false;
}
Expand All @@ -736,6 +733,7 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}

delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
}
}
Expand All @@ -745,6 +743,7 @@ private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToR
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
} else {
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -154,6 +155,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal

private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public volatile long delayedDeliveryTickTimeMillis = 1000;
public volatile boolean delayedDeliveryEnabled = false;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;

protected final MessageDeduplication messageDeduplication;
Expand Down Expand Up @@ -209,6 +212,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
USAGE_COUNT_UPDATER.set(this, 0);
this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
this.delayedDeliveryTickTimeMillis = brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();

initializeDispatchRateLimiterIfNeeded(Optional.empty());

Expand Down Expand Up @@ -1737,6 +1742,11 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

schemaValidationEnforced = data.schema_validation_enforced;

if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}

initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));

this.updateMaxPublishRate(data);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.*;

@Slf4j
public class AdminApiDelayedDelivery extends MockedPulsarServiceBaseTest {

private static final Logger LOG = LoggerFactory.getLogger(AdminApiDelayedDelivery.class);

@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();

admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("delayed-delivery-messages", tenantInfo);
}

@AfterMethod
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testDisableDelayedDelivery() throws Exception {
admin.namespaces().createNamespace("delayed-delivery-messages/default-ns");
String namespace = "delayed-delivery-messages/default-ns";
assertTrue(admin.namespaces().getDelayedDelivery(namespace).isActive());

DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, false);
admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);

assertFalse(admin.namespaces().getDelayedDelivery(namespace).isActive());
assertEquals(2000, admin.namespaces().getDelayedDelivery(namespace).getTickTime());
}

@Test
public void testEnableDelayedDeliveryMessages() throws Exception {
admin.namespaces().createNamespace("delayed-delivery-messages/default-enable-service-conf");
String namespace = "delayed-delivery-messages/default-enable-service-conf";
String topicName = "persistent://delayed-delivery-messages/default-enable-service-conf/test";
assertTrue(admin.namespaces().getDelayedDelivery(namespace).isActive());

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();

for (int i = 0; i < 10; i++) {
producer.newMessage()
.value("delayed-msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}

producer.flush();

// Delayed messages might not come in same exact order
Set<String> delayedMessages = new TreeSet<>();
for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
delayedMessages.add(msg.getValue());
consumer.acknowledge(msg);
}

for (int i = 0; i < 10; i++) {
assertTrue(delayedMessages.contains("delayed-msg-" + i));
}
}
}
Loading

0 comments on commit f0d339e

Please sign in to comment.