Skip to content

Commit

Permalink
[pulsar-broker] provide option to disable redelivery-tracker to reduc…
Browse files Browse the repository at this point in the history
…e in memory positionImpl footprint and gc improvement (apache#3584)

* [pulsar-broker] provide option to disable redelivery-tracker to reduce in memory positionImpl footprint and gc improvement

* change config name
  • Loading branch information
rdhabalia authored Feb 13, 2019
1 parent 200d175 commit 14a5962
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 2 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ activeConsumerFailoverDelayTimeMillis=1000
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0

# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled)
subscriptionRedeliveryTrackerEnabled=true

# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ activeConsumerFailoverDelayTimeMillis=1000
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0

# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled)
subscriptionRedeliveryTrackerEnabled=true

# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " When it is 0, inactive subscriptions are not deleted automatically"
)
private long subscriptionExpirationTimeMinutes = 0;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Enable subscription message redelivery tracker to send redelivery "
+ "count to consumer (default is enabled)"
)
private boolean subscriptionRedeliveryTrackerEnabled = true;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired subscription"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
Expand Down Expand Up @@ -97,15 +98,17 @@ enum ReadType {
}

public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) {
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.cursor = cursor;
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
this.topic = topic;
this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
this.redeliveryTracker = new InMemoryRedeliveryTracker();
this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
? new InMemoryRedeliveryTracker()
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = MaxReadBatchSize;
this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
.getMaxUnackedMessagesPerSubscription();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
}

Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||
|maxUnackedMessagesPerConsumer| Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back. Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction |50000|
|maxUnackedMessagesPerSubscription| Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit check and dispatcher can dispatch messages without any restriction |200000|
|subscriptionRedeliveryTrackerEnabled| Enable subscription message redelivery tracker |true|
|maxConcurrentLookupRequest| Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic |50000|
|maxConcurrentTopicLoadRequest| Max number of concurrent topic loading request broker allows to control number of zk-operations |5000|
|authenticationEnabled| Enable authentication |false|
Expand Down

0 comments on commit 14a5962

Please sign in to comment.