Skip to content

Commit

Permalink
[server] Allow broker to start with default backlogquota in bytes (ap…
Browse files Browse the repository at this point in the history
…ache#11671)

* [server] Allow broker to start with default backlogquota in bytes

* remove sysout from test

* fix test
  • Loading branch information
rdhabalia authored Aug 17, 2021
1 parent 43ded59 commit 80171a7
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 11 deletions.
5 changes: 4 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ backlogQuotaCheckEnabled=true
# How often to check for topics that have reached the quota
backlogQuotaCheckIntervalInSeconds=60

# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1.
# Deprecated - Use backlogQuotaDefaultLimitByte instead.
backlogQuotaDefaultLimitGB=-1

# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1.
backlogQuotaDefaultLimitBytes=-1

# Default per-topic backlog quota time limit in second, less than 0 means no limitation. default is -1.
backlogQuotaDefaultLimitSecond=-1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,19 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int backlogQuotaCheckIntervalInSeconds = 60;

@Deprecated
@FieldContext(
category = CATEGORY_POLICIES,
doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\""
)
private double backlogQuotaDefaultLimitGB = -1;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Default per-topic backlog quota limit by size, less than 0 means no limitation. default is -1."
+ " Increase it if you want to allow larger msg backlog"
)
private long backlogQuotaDefaultLimitGB = -1;
private long backlogQuotaDefaultLimitBytes = -1;

@FieldContext(
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ public class BacklogQuotaManager {

public BacklogQuotaManager(PulsarService pulsar) {
this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
double backlogQuotaGB = pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB();
this.defaultQuota = BacklogQuotaImpl.builder()
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB()
* BacklogQuotaImpl.BYTES_IN_GIGABYTE)
.limitSize(backlogQuotaGB > 0 ? (long) backlogQuotaGB * BacklogQuotaImpl.BYTES_IN_GIGABYTE
: pulsar.getConfiguration().getBacklogQuotaDefaultLimitBytes())
.limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond())
.retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ public static Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap(S
}

public static BacklogQuota sizeBacklogQuota(ServiceConfiguration configuration) {
long backlogQuotaBytes = configuration.getBacklogQuotaDefaultLimitGB() > 0
? ((long) configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
: configuration.getBacklogQuotaDefaultLimitBytes();
return BacklogQuota.builder()
.limitSize(configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
.limitSize(backlogQuotaBytes)
.retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
Expand All @@ -72,6 +74,11 @@ public class BacklogQuotaManagerTest {
private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3;
private static final int MAX_ENTRIES_PER_LEDGER = 5;

@DataProvider(name = "backlogQuotaSizeGB")
public Object[][] backlogQuotaSizeGB() {
return new Object[][] { { true }, { false } };
}

@BeforeMethod
void setup() throws Exception {
try {
Expand Down Expand Up @@ -1211,5 +1218,44 @@ public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception {
client.close();
}

@Test(dataProvider = "backlogQuotaSizeGB")
public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception {

pulsar.close();
long backlogQuotaByte = 10 * 1024;
if (backlogQuotaSizeGB) {
config.setBacklogQuotaDefaultLimitGB(((double) backlogQuotaByte) / BacklogQuotaImpl.BYTES_IN_GIGABYTE);
} else {
config.setBacklogQuotaDefaultLimitBytes(backlogQuotaByte);
}
config.setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
pulsar = new PulsarService(config);
pulsar.start();

@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, TimeUnit.SECONDS)
.build();

final String topic1 = "persistent://prop/ns-quota/topic2";
final String subName1 = "c1";
final String subName2 = "c2";
final int numMsgs = 20;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer1.receive();
consumer2.receive();
}

Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
rolloverStats();

TopicStats stats = admin.topics().getStats(topic1);
assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
}
private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ServiceConfigurationTest {
public void testInit() throws Exception {
final String zookeeperServer = "localhost:2184";
final int brokerServicePort = 1000;
InputStream newStream = updateProp(zookeeperServer, String.valueOf(brokerServicePort), "ns1,ns2");
InputStream newStream = updateProp(zookeeperServer, String.valueOf(brokerServicePort), "ns1,ns2", 0.05);
final ServiceConfiguration config = PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class);
assertTrue(isNotBlank(config.getZookeeperServers()));
assertTrue(config.getBrokerServicePort().isPresent()
Expand All @@ -60,6 +60,7 @@ public void testInit() throws Exception {
assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1);
assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1);
assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
}

@Test
Expand All @@ -86,18 +87,19 @@ public void testOptionalSettingPresent() throws Exception {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testInitFailure() throws Exception {
final String zookeeperServer = "localhost:2184";
InputStream newStream = updateProp(zookeeperServer, "invalid-string", null);
InputStream newStream = updateProp(zookeeperServer, "invalid-string", null, 0.005);
PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class);
}

private InputStream updateProp(String zookeeperServer, String brokerServicePort, String namespace)
private InputStream updateProp(String zookeeperServer, String brokerServicePort, String namespace, double backlogQuotaGB)
throws IOException {
checkNotNull(fileName);
Properties properties = new Properties();
InputStream stream = this.getClass().getClassLoader().getResourceAsStream(fileName);
properties.load(stream);
properties.setProperty("zookeeperServers", zookeeperServer);
properties.setProperty("brokerServicePort", brokerServicePort);
properties.setProperty("backlogQuotaDefaultLimitGB", "" + backlogQuotaGB);
if (namespace != null)
properties.setProperty("bootstrapNamespaces", namespace);
StringWriter writer = new StringWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static BacklogQuotaImplBuilder builder() {
}

public static class BacklogQuotaImplBuilder implements BacklogQuota.Builder {
private long limitSize = -1 * BYTES_IN_GIGABYTE;
private long limitSize = -1;
private int limitTime = -1;
private RetentionPolicy retentionPolicy;

Expand Down
4 changes: 2 additions & 2 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|backlogQuotaCheckEnabled| Enable backlog quota check. Enforces action on topic when the quota is reached |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60|
|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 |
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 |
|backlogQuotaDefaultRetentionPolicy|The defaulted backlog quota retention policy. By Default, it is `producer_request_hold`. <li>'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)</li> <li>'producer_exception' Policy which throws `javax.jms.ResourceAllocationException` to the producer </li><li>'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog</li>|producer_request_hold|
|allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true|
|allowAutoTopicCreationType| The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) |non-partitioned|
Expand Down Expand Up @@ -464,7 +464,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a specified action when the quota is reached. |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the backlog quota. |60|
|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1|
|ttlDurationDefaultInSeconds|The default Time to Live (TTL) for namespaces if the TTL is not configured at namespace policies. When the value is set to `0`, TTL is disabled. By default, TTL is disabled. |0|
|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics. If topics are not consumed for some while, these inactive topics might be cleaned up. Deleting inactive topics is enabled by default. The default period is 1 minute. |true|
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics, in seconds. |60|
Expand Down

0 comments on commit 80171a7

Please sign in to comment.