Skip to content

Commit

Permalink
[pulsar-broker] Add support of default ttl for namespace (apache#3898)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and jiazhai committed Mar 26, 2019
1 parent 8db0f86 commit fd8aa4f
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 4 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ backlogQuotaDefaultLimitGB=10
# 'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
backlogQuotaDefaultRetentionPolicy=producer_request_hold

# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0

# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ backlogQuotaCheckIntervalInSeconds=60
# Default per-topic backlog quota limit
backlogQuotaDefaultLimitGB=10

# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0

# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog"
)
private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy.producer_request_hold;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Default ttl for namespaces if ttl is not already configured at namespace policies. "
+ "(disable default-ttl with value 0)"
)
private int ttlDurationDefaultInSeconds = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable the deletion of inactive topics"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,12 @@ public void checkMessageExpiry() {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
if (policies.message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) -> sub.expireMessages(policies.message_ttl_in_seconds));
replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(policies.message_ttl_in_seconds));
int defaultTTL = brokerService.pulsar().getConfiguration().getTtlDurationDefaultInSeconds();
int message_ttl_in_seconds = (policies.message_ttl_in_seconds <= 0 && defaultTTL > 0) ? defaultTTL
: policies.message_ttl_in_seconds;
if (message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) -> sub.expireMessages(message_ttl_in_seconds));
replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.netty.util.Timeout;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -760,4 +767,42 @@ public void testTopicAutoUpdatePartitions() throws Exception {

consumer.close();
}

@Test(timeOut = testTimeout)
public void testDefaultBacklogTTL() throws Exception {

int defaultTTLSec = 1;
int totalMessages = 10;
this.conf.setTtlDurationDefaultInSeconds(defaultTTLSec);

final String namespace = "prop/use/expiry";
final String topicName = "persistent://" + namespace + "/expiry";
final String subName = "expiredSub";

admin.tenants().createTenant("prop", new TenantInfo(null, Sets.newHashSet("use")));
admin.namespaces().createNamespace(namespace);

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscribe();
consumer.close();

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
for (int i = 0; i < totalMessages; i++) {
producer.send(("" + i).getBytes());
}

Optional<Topic> topic = pulsar.getBrokerService().getTopic(topicName, false).get();
assertTrue(topic.isPresent());
PersistentSubscription subscription = (PersistentSubscription) topic.get().getSubscription(subName);

Thread.sleep((defaultTTLSec + 5) * 1000);

topic.get().checkMessageExpiry();

retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog() == 0, 5, 200);

assertEquals(subscription.getNumberOfEntriesInBacklog(), 0);
}

}
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a specified action when the quota is reached. |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the backlog quota. |60|
|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. |10|
|ttlDurationDefaultInSeconds| Default ttl for namespaces if ttl is not already configured at namespace policies. |0|
|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics. |true|
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics, in seconds. |60|
|messageExpiryCheckIntervalInMinutes| How often to proactively check and purged expired messages. |5|
Expand Down

0 comments on commit fd8aa4f

Please sign in to comment.