Skip to content

Commit

Permalink
add parameter to control publish message check policy frequency (apac…
Browse files Browse the repository at this point in the history
…he#11141)

### Motivation
In current `isExceedMaximumMessageSize` implementation, it get topic policy and compare maxMessageSize value.
However, for each message to publish, it should call isExceedMaximumMessageSize once, which cost too much resources.

### Modifiction
1. add `maxMessageSizeCheckIntervalInSeconds` parameter to control maxMessageSize refresh frequency
2. reduce getTopicPolicies call frequency to reduce resources cost.
3. add test to cover this case.
  • Loading branch information
hangc0276 authored Jul 14, 2021
1 parent 27797d9 commit 6384f94
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 10 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ maxMessagePublishBufferSizeInMB=
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120

# Control the frequency of checking the max message size in the topic policy.
# The default interval is 60 seconds.
maxMessageSizeCheckIntervalInSeconds=60

# Max number of partitions per partitioned topic
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ maxPendingPublishRequestsPerConnection=1000
# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

# Check between intervals to see if max message size in topic policies has been updated.
# Default is 60s
maxMessageSizeCheckIntervalInSeconds=60

# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

Expand Down
4 changes: 4 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ messagePublishBufferCheckIntervalInMillis=100
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120

# Check between intervals to see if max message size in topic policies has been updated.
# Default is 60s
maxMessageSizeCheckIntervalInSeconds=60

# Max number of partitions per partitioned topic
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int retentionCheckIntervalInSeconds = 120;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Check between intervals to see if max message size of topic policy has updated. default is 60s"
)
private int maxMessageSizeCheckIntervalInSeconds = 60;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The number of partitions per partitioned topic.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public abstract class AbstractTopic implements Topic {
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
private volatile long usageCount = 0;

private volatile int topicMaxMessageSize = 0;
private volatile long lastTopicMaxMessageSizeCheckTimeStamp = 0;
private final long topicMaxMessageSizeCheckIntervalMs;

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand All @@ -132,6 +136,8 @@ public AbstractTopic(String topic, BrokerService brokerService) {
.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration()
.getBrokerDeleteInactiveTopicsMode());
this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration()
.getMaxMessageSizeCheckIntervalInSeconds());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
Expand Down Expand Up @@ -847,14 +853,16 @@ protected int getWaitingProducersCount() {
}

protected boolean isExceedMaximumMessageSize(int size) {
return getTopicPolicies()
.map(TopicPolicies::getMaxMessageSize)
.map(maxMessageSize -> {
if (maxMessageSize == 0) {
return false;
}
return size > maxMessageSize;
}).orElse(false);
if (lastTopicMaxMessageSizeCheckTimeStamp + topicMaxMessageSizeCheckIntervalMs < System.currentTimeMillis()) {
// refresh topicMaxMessageSize from topic policies
topicMaxMessageSize = getTopicPolicies().map(TopicPolicies::getMaxMessageSize).orElse(0);
lastTopicMaxMessageSizeCheckTimeStamp = System.currentTimeMillis();
}

if (topicMaxMessageSize == 0) {
return false;
}
return size > topicMaxMessageSize;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setMaxMessageSizeCheckIntervalInSeconds(1);
super.internalSetup();

admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
Expand Down Expand Up @@ -1780,8 +1781,14 @@ private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
assertEquals(e.getStatusCode(), 412);
}

MessageId messageId = producer.send(new byte[1024]);
assertNotNull(messageId);
Awaitility.await().untilAsserted(() -> {
try {
MessageId messageId = producer.send(new byte[1024]);
assertNotNull(messageId);
} catch (PulsarClientException e) {
fail("failed to send message");
}
});
producer.close();
}

Expand Down

0 comments on commit 6384f94

Please sign in to comment.