Skip to content

Commit

Permalink
Improved in max-pending-bytes mechanism for broker (apache#7406)
Browse files Browse the repository at this point in the history
* Improved in max-pending-bytes mechanism for broker

* Fixed imports

* Switched to LongAdder
  • Loading branch information
merlimat authored May 2, 2021
1 parent 900547f commit 2a522c8
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 174 deletions.
4 changes: 0 additions & 4 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,6 @@ replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
# Use -1 to disable the memory limitation. Default is 1/2 of direct memory.
maxMessagePublishBufferSizeInMB=

# Interval between checks to see if message publish buffer size is exceed the max message publish buffer size
# Use 0 or negative number to disable the max publish buffer limiting.
messagePublishBufferCheckIntervalInMillis=100

# Check between intervals to see if consumed ledgers need to be trimmed
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "requests in memory. Default: 1000"
)
private int maxPendingPublishRequestsPerConnection = 1000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired messages"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,12 @@ protected void addConsumerToSubscription(Subscription subscription, Consumer con

@Override
public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}

@Override
public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}

protected boolean hasLocalProducers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -67,7 +66,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -232,7 +230,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
Expand Down Expand Up @@ -269,18 +266,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private Channel listenChannelTls;

private boolean preciseTopicPublishRateLimitingEnable;
private final long maxMessagePublishBufferBytes;
private final long resumeProducerReadMessagePublishBufferBytes;
private volatile boolean reachMessagePublishBufferThreshold;
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

public BrokerService(PulsarService pulsar) throws Exception {
this.pulsar = pulsar;
this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0
? pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L : -1;
this.resumeProducerReadMessagePublishBufferBytes = this.maxMessagePublishBufferBytes / 2;
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
Expand Down Expand Up @@ -322,9 +314,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-compaction-monitor"));
this.messagePublishBufferMonitor =
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));

Expand Down Expand Up @@ -474,7 +463,6 @@ public void start() throws Exception {
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
Expand Down Expand Up @@ -552,14 +540,6 @@ protected void startCompactionMonitor() {
}
}

protected void startMessagePublishBufferMonitor() {
int interval = pulsar().getConfiguration().getMessagePublishBufferCheckIntervalInMillis();
if (interval > 0 && maxMessagePublishBufferBytes > 0) {
messagePublishBufferMonitor.scheduleAtFixedRate(safeRun(this::checkMessagePublishBuffer),
interval, interval, TimeUnit.MILLISECONDS);
}
}

protected void startConsumedLedgersMonitor() {
int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
if (interval > 0) {
Expand Down Expand Up @@ -757,7 +737,6 @@ public CompletableFuture<Void> closeAsync() {
inactivityMonitor,
messageExpiryMonitor,
compactionMonitor,
messagePublishBufferMonitor,
consumedLedgersMonitor,
backlogQuotaChecker,
topicOrderedExecutor,
Expand Down Expand Up @@ -2487,22 +2466,6 @@ public Optional<Integer> getListenPortTls() {
}
}

@VisibleForTesting
void checkMessagePublishBuffer() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
if (currentMessagePublishBufferBytes.get() >= maxMessagePublishBufferBytes
&& !reachMessagePublishBufferThreshold) {
reachMessagePublishBufferThreshold = true;
forEachTopic(topic -> ((AbstractTopic) topic).disableProducerRead());
}
if (currentMessagePublishBufferBytes.get() < resumeProducerReadMessagePublishBufferBytes
&& reachMessagePublishBufferThreshold) {
reachMessagePublishBufferThreshold = false;
forEachTopic(topic -> ((AbstractTopic) topic).enableProducerReadForPublishBufferLimiting());
}
}

private void foreachCnx(Consumer<TransportCnx> consumer) {
Set<TransportCnx> cnxSet = new HashSet<>();
topics.forEach((n, t) -> {
Expand All @@ -2512,17 +2475,6 @@ private void foreachCnx(Consumer<TransportCnx> consumer) {
cnxSet.forEach(consumer);
}

public boolean isReachMessagePublishBufferThreshold() {
return reachMessagePublishBufferThreshold;
}

@VisibleForTesting
long getCurrentMessagePublishBufferSize() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
return currentMessagePublishBufferBytes.get();
}

public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
Expand Down Expand Up @@ -2691,7 +2643,18 @@ public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors()
}

public boolean isBrokerEntryMetadataEnabled() {
return brokerEntryMetadataInterceptors.size() > 0;
return !brokerEntryMetadataInterceptors.isEmpty();
}

public void pausedConnections(int numberOfConnections) {
pausedConnections.add(numberOfConnections);
}

public void resumedConnections(int numberOfConnections) {
pausedConnections.add(-numberOfConnections);
}

public long getPausedConnections() {
return pausedConnections.longValue();
}
}
Loading

0 comments on commit 2a522c8

Please sign in to comment.