Skip to content

Commit

Permalink
Add backlog quota retention policy to server config. (apache#2460)
Browse files Browse the repository at this point in the history
* Add backlog quota retention policy to server config.

* Rename backlogQuotaRetentionPolicy to backlogQuotaDefaultRetentionPolicy

* Change backlogQuotaDefaultRetentionPolicy String to enum.
  • Loading branch information
codelipenghui authored and merlimat committed Aug 28, 2018
1 parent 50caedc commit 6c5f2f6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ backlogQuotaCheckIntervalInSeconds=60
# Default per-topic backlog quota limit
backlogQuotaDefaultLimitGB=10

# Default backlog quota retention policy. Default is producer_request_hold
# 'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)
# 'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer
# 'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
backlogQuotaDefaultRetentionPolicy=producer_request_hold

# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.common.configuration.PulsarConfiguration;

import com.google.common.collect.Sets;
import org.apache.pulsar.common.policies.data.BacklogQuota;

/**
* Pulsar service configuration object.
Expand Down Expand Up @@ -89,6 +90,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int backlogQuotaCheckIntervalInSeconds = 60;
// Default per-topic backlog quota limit
private long backlogQuotaDefaultLimitGB = 50;
//Default backlog quota retention policy. Default is producer_request_hold
//'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)
//'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer
//'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy.producer_request_hold;
// Enable the deletion of inactive topics
private boolean brokerDeleteInactiveTopicsEnabled = true;
// How often to check for inactive topics
Expand Down Expand Up @@ -1729,4 +1735,12 @@ public void setBrokerServiceCompactionMonitorIntervalInSeconds(int interval) {
public int getBrokerServiceCompactionMonitorIntervalInSeconds() {
return this.brokerServiceCompactionMonitorIntervalInSeconds;
}

public BacklogQuota.RetentionPolicy getBacklogQuotaDefaultRetentionPolicy() {
return backlogQuotaDefaultRetentionPolicy;
}

public void setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy) {
this.backlogQuotaDefaultRetentionPolicy = backlogQuotaDefaultRetentionPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class BacklogQuotaManager {
public BacklogQuotaManager(PulsarService pulsar) {
this.defaultQuota = new BacklogQuota(
pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024,
RetentionPolicy.producer_request_hold);
pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
this.zkCache = pulsar.getConfigurationCache().policiesCache();
}

Expand Down

0 comments on commit 6c5f2f6

Please sign in to comment.