diff --git a/conf/broker.conf b/conf/broker.conf index e4c15c71b6812..404d8d7a70be4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -125,9 +125,12 @@ backlogQuotaCheckEnabled=true # How often to check for topics that have reached the quota backlogQuotaCheckIntervalInSeconds=60 -# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1. +# Deprecated - Use backlogQuotaDefaultLimitByte instead. backlogQuotaDefaultLimitGB=-1 +# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1. +backlogQuotaDefaultLimitBytes=-1 + # Default per-topic backlog quota time limit in second, less than 0 means no limitation. default is -1. backlogQuotaDefaultLimitSecond=-1 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index aaec952831c57..e98a5ce2ebdc1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -334,12 +334,19 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int backlogQuotaCheckIntervalInSeconds = 60; + @Deprecated + @FieldContext( + category = CATEGORY_POLICIES, + doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\"" + ) + private double backlogQuotaDefaultLimitGB = -1; + @FieldContext( category = CATEGORY_POLICIES, doc = "Default per-topic backlog quota limit by size, less than 0 means no limitation. default is -1." + " Increase it if you want to allow larger msg backlog" ) - private long backlogQuotaDefaultLimitGB = -1; + private long backlogQuotaDefaultLimitBytes = -1; @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index bbb3ddfcba660..f9ccb744b8c48 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -53,9 +53,10 @@ public class BacklogQuotaManager { public BacklogQuotaManager(PulsarService pulsar) { this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled(); + double backlogQuotaGB = pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB(); this.defaultQuota = BacklogQuotaImpl.builder() - .limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() - * BacklogQuotaImpl.BYTES_IN_GIGABYTE) + .limitSize(backlogQuotaGB > 0 ? (long) backlogQuotaGB * BacklogQuotaImpl.BYTES_IN_GIGABYTE + : pulsar.getConfiguration().getBacklogQuotaDefaultLimitBytes()) .limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond()) .retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy()) .build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java index b929636486ca7..ca8231aeeaaa4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java @@ -38,8 +38,11 @@ public static Map backlogQuotaMap(S } public static BacklogQuota sizeBacklogQuota(ServiceConfiguration configuration) { + long backlogQuotaBytes = configuration.getBacklogQuotaDefaultLimitGB() > 0 + ? ((long) configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE) + : configuration.getBacklogQuotaDefaultLimitBytes(); return BacklogQuota.builder() - .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE) + .limitSize(backlogQuotaBytes) .retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy()) .build(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 560cd9e0f552c..0dac0c2468ed5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -51,12 +51,14 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -72,6 +74,11 @@ public class BacklogQuotaManagerTest { private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3; private static final int MAX_ENTRIES_PER_LEDGER = 5; + @DataProvider(name = "backlogQuotaSizeGB") + public Object[][] backlogQuotaSizeGB() { + return new Object[][] { { true }, { false } }; + } + @BeforeMethod void setup() throws Exception { try { @@ -1211,5 +1218,44 @@ public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception { client.close(); } + @Test(dataProvider = "backlogQuotaSizeGB") + public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception { + + pulsar.close(); + long backlogQuotaByte = 10 * 1024; + if (backlogQuotaSizeGB) { + config.setBacklogQuotaDefaultLimitGB(((double) backlogQuotaByte) / BacklogQuotaImpl.BYTES_IN_GIGABYTE); + } else { + config.setBacklogQuotaDefaultLimitBytes(backlogQuotaByte); + } + config.setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction); + pulsar = new PulsarService(config); + pulsar.start(); + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String topic1 = "persistent://prop/ns-quota/topic2"; + final String subName1 = "c1"; + final String subName2 = "c2"; + final int numMsgs = 20; + + Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); + Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer1.receive(); + consumer2.receive(); + } + + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + rolloverStats(); + + TopicStats stats = admin.topics().getStats(topic1); + assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); + } private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 6b7df2ca90e22..452ee96e9e05f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -49,7 +49,7 @@ public class ServiceConfigurationTest { public void testInit() throws Exception { final String zookeeperServer = "localhost:2184"; final int brokerServicePort = 1000; - InputStream newStream = updateProp(zookeeperServer, String.valueOf(brokerServicePort), "ns1,ns2"); + InputStream newStream = updateProp(zookeeperServer, String.valueOf(brokerServicePort), "ns1,ns2", 0.05); final ServiceConfiguration config = PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class); assertTrue(isNotBlank(config.getZookeeperServers())); assertTrue(config.getBrokerServicePort().isPresent() @@ -60,6 +60,7 @@ public void testInit() throws Exception { assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1); assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1); assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first"); + assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); } @Test @@ -86,11 +87,11 @@ public void testOptionalSettingPresent() throws Exception { @Test(expectedExceptions = IllegalArgumentException.class) public void testInitFailure() throws Exception { final String zookeeperServer = "localhost:2184"; - InputStream newStream = updateProp(zookeeperServer, "invalid-string", null); + InputStream newStream = updateProp(zookeeperServer, "invalid-string", null, 0.005); PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class); } - private InputStream updateProp(String zookeeperServer, String brokerServicePort, String namespace) + private InputStream updateProp(String zookeeperServer, String brokerServicePort, String namespace, double backlogQuotaGB) throws IOException { checkNotNull(fileName); Properties properties = new Properties(); @@ -98,6 +99,7 @@ private InputStream updateProp(String zookeeperServer, String brokerServicePort, properties.load(stream); properties.setProperty("zookeeperServers", zookeeperServer); properties.setProperty("brokerServicePort", brokerServicePort); + properties.setProperty("backlogQuotaDefaultLimitGB", "" + backlogQuotaGB); if (namespace != null) properties.setProperty("bootstrapNamespaces", namespace); StringWriter writer = new StringWriter(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java index 79e0af27f8a6f..591e8b8c95a8a 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java @@ -40,7 +40,7 @@ public static BacklogQuotaImplBuilder builder() { } public static class BacklogQuotaImplBuilder implements BacklogQuota.Builder { - private long limitSize = -1 * BYTES_IN_GIGABYTE; + private long limitSize = -1; private int limitTime = -1; private RetentionPolicy retentionPolicy; diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index ae62e27b8ffb4..a96c328f98d6f 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -175,7 +175,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false| |backlogQuotaCheckEnabled| Enable backlog quota check. Enforces action on topic when the quota is reached |true| |backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60| -|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 | +|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 | |backlogQuotaDefaultRetentionPolicy|The defaulted backlog quota retention policy. By Default, it is `producer_request_hold`.
  • 'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)
  • 'producer_exception' Policy which throws `javax.jms.ResourceAllocationException` to the producer
  • 'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
  • |producer_request_hold| |allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true| |allowAutoTopicCreationType| The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) |non-partitioned| @@ -464,7 +464,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github |skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false| |backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a specified action when the quota is reached. |true| |backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the backlog quota. |60| -|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1| +|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1| |ttlDurationDefaultInSeconds|The default Time to Live (TTL) for namespaces if the TTL is not configured at namespace policies. When the value is set to `0`, TTL is disabled. By default, TTL is disabled. |0| |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics. If topics are not consumed for some while, these inactive topics might be cleaned up. Deleting inactive topics is enabled by default. The default period is 1 minute. |true| |brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics, in seconds. |60|