From 0ce297c5ffd6430b047728a779a0896d6b5fb7d9 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 13 Mar 2019 07:17:32 +0800 Subject: [PATCH] Issue #3803: Make ManagedLedger read batch size configurable (#3808) *Motivation* Fixes #3803 Hardcoding is a very bad practice. It means we have no way to alter system behavior when production issues occur. *Modifications* introduce a few read batch related settings to make them configurable --- conf/broker.conf | 11 ++++++++ .../pulsar/broker/ServiceConfiguration.java | 25 +++++++++++++++++++ ...PersistentDispatcherMultipleConsumers.java | 15 ++++++----- ...sistentDispatcherSingleActiveConsumer.java | 9 +++---- .../persistent/PersistentReplicator.java | 13 +++++----- 5 files changed, 54 insertions(+), 19 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 73e94cde4f91b..ba00cface0ded 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -186,6 +186,17 @@ dispatchThrottlingRatePerSubscribeInByte=0 # backlog. dispatchThrottlingOnNonBacklogConsumerEnabled=true +# Max number of entries to read from bookkeeper. By default it is 100 entries. +dispatcherMaxReadBatchSize=100 + +# Min number of entries to read from bookkeeper. By default it is 1 entries. +# When there is an error occurred on reading entries from bookkeeper, the broker +# will backoff the batch size to this minimum number." +dispatcherMinReadBatchSize=1 + +# Max number of entries to dispatch for a shared subscription. By default it is 20 entries. +dispatcherMaxRoundRobinBatchSize=20 + # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic maxConcurrentLookupRequest=50000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 39c3e42c79f4a..0fbe155190a09 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -377,6 +377,31 @@ public class ServiceConfiguration implements PulsarConfiguration { + " published messages and don't have backlog. This enables dispatch-throttling for " + " non-backlog consumers as well.") private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false; + + // <-- dispatcher read settings --> + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Max number of entries to read from bookkeeper. By default it is 100 entries." + ) + private int dispatcherMaxReadBatchSize = 100; + + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Min number of entries to read from bookkeeper. By default it is 1 entries." + + "When there is an error occurred on reading entries from bookkeeper, the broker" + + " will backoff the batch size to this minimum number." + ) + private int dispatcherMinReadBatchSize = 1; + + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Max number of entries to dispatch for a shared subscription. By default it is 20 entries." + ) + private int dispatcherMaxRoundRobinBatchSize = 20; + @FieldContext( dynamic = true, category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 02c3ea81b78dd..066e2b6bb3d38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -65,9 +65,6 @@ */ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback { - private static final int MaxReadBatchSize = 100; - private static final int MaxRoundRobinBatchSize = 20; - private final PersistentTopic topic; private final ManagedCursor cursor; @@ -105,7 +102,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; - this.readBatchSize = MaxReadBatchSize; + this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() .getMaxUnackedMessagesPerSubscription(); this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); @@ -386,8 +383,8 @@ public synchronized void readEntriesComplete(List entries, Object ctx) { havePendingReplayRead = false; } - if (readBatchSize < MaxReadBatchSize) { - int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize); + if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { + int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); if (log.isDebugEnabled()) { log.debug("[{}] Increasing read batch size from {} to {}", name, readBatchSize, newReadBatchSize); } @@ -423,7 +420,9 @@ public synchronized void readEntriesComplete(List entries, Object ctx) { } // round-robin dispatch batch size for this consumer - int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), MaxRoundRobinBatchSize); + int messagesForC = Math.min( + Math.min(entriesToDispatch, c.getAvailablePermits()), + serviceConfig.getDispatcherMaxRoundRobinBatchSize()); if (messagesForC > 0) { @@ -511,7 +510,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj } } - readBatchSize = 1; + readBatchSize = serviceConfig.getDispatcherMinReadBatchSize(); topic.getBrokerService().executor().schedule(() -> { synchronized (PersistentDispatcherMultipleConsumers.this) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 5bc6f32ce0fc1..4356fbe0a5a0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -59,7 +59,6 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp private volatile boolean havePendingRead = false; - private static final int MaxReadBatchSize = 100; private int readBatchSize; private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); private final ServiceConfiguration serviceConfig; @@ -74,8 +73,8 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName()) : ""/* NonDurableCursor doesn't have name */); this.cursor = cursor; - this.readBatchSize = MaxReadBatchSize; this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); + this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); } @@ -182,8 +181,8 @@ public synchronized void internalReadEntriesComplete(final List entries, havePendingRead = false; - if (readBatchSize < MaxReadBatchSize) { - int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize); + if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { + int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); if (log.isDebugEnabled()) { log.debug("[{}-{}] Increasing read batch size from {} to {}", name, readConsumer, readBatchSize, newReadBatchSize); @@ -451,7 +450,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep checkNotNull(c); // Reduce read batch size to avoid flooding bookies with retries - readBatchSize = 1; + readBatchSize = serviceConfig.getDispatcherMinReadBatchSize(); topic.getBrokerService().executor().schedule(() -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 7d529f909d712..cda58e3cfe79f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -59,8 +59,6 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat private final PersistentTopic topic; private final ManagedCursor cursor; - - private static final int MaxReadBatchSize = 100; private int readBatchSize; private final int producerQueueThreshold; @@ -98,7 +96,9 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); - readBatchSize = Math.min(producerQueueSize, MaxReadBatchSize); + readBatchSize = Math.min( + producerQueueSize, + topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); producerQueueThreshold = (int) (producerQueueSize * 0.9); startProducer(); @@ -189,8 +189,9 @@ public void readEntriesComplete(List entries, Object ctx) { entries.size()); } - if (readBatchSize < MaxReadBatchSize) { - int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize); + int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize(); + if (readBatchSize < maxReadBatchSize) { + int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize); if (log.isDebugEnabled()) { log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName, localCluster, remoteCluster, readBatchSize, newReadBatchSize); @@ -410,7 +411,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } // Reduce read batch size to avoid flooding bookies with retries - readBatchSize = 1; + readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize(); long waitTimeMillis = readFailureBackoff.next();