Skip to content

Commit

Permalink
Add max per-broker unacked message limit (apache#420)
Browse files Browse the repository at this point in the history
* Add max per-broker unacked message limit

* default disable perBrokerMaxDispatcher limit

* replace LongAdder with atomicInteger to track per-broker unack messages
  • Loading branch information
rdhabalia authored Jun 20, 2017
1 parent 02128c6 commit a45ffe2
Show file tree
Hide file tree
Showing 6 changed files with 575 additions and 5 deletions.
11 changes: 11 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ maxUnackedMessagesPerConsumer=50000
# check and dispatcher can dispatch messages without any restriction
maxUnackedMessagesPerSubscription=200000

# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
# messages to all shared subscription which has higher number of unack messages until subscriptions start
# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
# unackedMessage-limit check and broker doesn't block dispatchers
maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000

Expand Down
11 changes: 11 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ maxUnackedMessagesPerConsumer=50000
# check and dispatcher can dispatch messages without any restriction
maxUnackedMessagesPerSubscription=200000

# Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
# messages to all shared subscription which has higher number of unack messages until subscriptions start
# acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
# unackedMessage-limit check and broker doesn't block dispatchers
maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
// unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit
// check and dispatcher can dispatch messages without any restriction
private int maxUnackedMessagesPerSubscription = 4 * 50000;
// Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
// messages to all shared subscription which has higher number of unack messages until subscriptions start
// acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
// unackedMessage-limit check and broker doesn't block dispatchers
private int maxUnackedMessagesPerBroker = 0;
// Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
// than this percentage limit and subscription will not receive any new messages until that subscription acks back
// limit/2 messages
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
// Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
@FieldContext(dynamic = true)
private int maxConcurrentLookupRequest = 10000;
Expand Down Expand Up @@ -452,6 +461,23 @@ public void setMaxUnackedMessagesPerSubscription(int maxUnackedMessagesPerSubscr
this.maxUnackedMessagesPerSubscription = maxUnackedMessagesPerSubscription;
}

public int getMaxUnackedMessagesPerBroker() {
return maxUnackedMessagesPerBroker;
}

public void setMaxUnackedMessagesPerBroker(int maxUnackedMessagesPerBroker) {
this.maxUnackedMessagesPerBroker = maxUnackedMessagesPerBroker;
}

public double getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() {
return maxUnackedMessagesPerSubscriptionOnBrokerBlocked;
}

public void setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(
double maxUnackedMessagesPerSubscriptionOnBrokerBlocked) {
this.maxUnackedMessagesPerSubscriptionOnBrokerBlocked = maxUnackedMessagesPerSubscriptionOnBrokerBlocked;
}

public int getMaxConcurrentLookupRequest() {
return maxConcurrentLookupRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,20 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -66,9 +69,11 @@
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.authentication.AuthenticationService;
import com.yahoo.pulsar.broker.authorization.AuthorizationManager;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.stats.ClusterReplicationMetrics;
Expand All @@ -94,6 +99,7 @@
import com.yahoo.pulsar.common.util.FieldParser;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

Expand Down Expand Up @@ -159,6 +165,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies

public static final String BROKER_SERVICE_CONFIGURATION_PATH = "/admin/configuration";
private final ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache;

private static final LongAdder totalUnackedMessages = new LongAdder();
private final int maxUnackedMessages;
public final int maxUnackedMsgsPerDispatcher;
private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

public BrokerService(PulsarService pulsar) throws Exception {
this.pulsar = pulsar;
Expand Down Expand Up @@ -221,12 +234,30 @@ public Map<String, String> deserialize(String key, byte[] content) throws Except
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
this.blockedDispatchers = new ConcurrentOpenHashSet<>();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false));
this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0
&& pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) {
this.maxUnackedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
this.maxUnackedMsgsPerDispatcher = (int) ((maxUnackedMessages
* pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()) / 100);
log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker",
maxUnackedMessages, maxUnackedMsgsPerDispatcher);
// block misbehaving dispatcher by checking periodically
pulsar.getExecutor().scheduleAtFixedRate(() -> checkUnAckMessageDispatching(), 600, 30, TimeUnit.SECONDS);
} else {
this.maxUnackedMessages = 0;
this.maxUnackedMsgsPerDispatcher = 0;
log.info(
"Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ",
pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked());
}


PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}
Expand Down Expand Up @@ -1089,4 +1120,115 @@ private void createPendingLoadTopic() {
public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, PersistentTopic>>> getMultiLayerTopicMap() {
return multiLayerTopicsMap;
}
}

/**
* If per-broker unack message reached to limit then it blocks dispatcher if its unack message limit has been
* reached to {@link #maxUnackedMsgsPerDispatcher}
*
* @param dispatcher
* @param numberOfMessages
*/
public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
// don't block dispatchers if maxUnackedMessages = 0
if (maxUnackedMessages > 0) {
totalUnackedMessages.add(numberOfMessages);

// block dispatcher: if broker is already blocked and dispatcher reaches to max dispatcher limit when broker
// is blocked
if (blockedDispatcherOnHighUnackedMsgs.get() && !dispatcher.isBlockedDispatcherOnUnackedMsgs()
&& dispatcher.getTotalUnackedMessages() > maxUnackedMsgsPerDispatcher) {
lock.readLock().lock();
try {
log.info("[{}] dispatcher reached to max unack msg limit on blocked-broker {}",
dispatcher.getName(), dispatcher.getTotalUnackedMessages());
dispatcher.blockDispatcherOnUnackedMsgs();
blockedDispatchers.add(dispatcher);
} finally {
lock.readLock().unlock();
}
}
}
}

/**
* Adds given dispatcher's unackMessage count to broker-unack message count and if it reaches to the
* {@link #maxUnackedMessages} then it blocks all the dispatchers which has unack-messages higher than
* {@link #maxUnackedMsgsPerDispatcher}. It unblocks all dispatchers once broker-unack message counts decreased to
* ({@link #maxUnackedMessages}/2)
*
*/
public void checkUnAckMessageDispatching() {

// don't block dispatchers if maxUnackedMessages = 0
if (maxUnackedMessages <= 0) {
return;
}
long unAckedMessages = totalUnackedMessages.sum();
if (unAckedMessages >= maxUnackedMessages && blockedDispatcherOnHighUnackedMsgs.compareAndSet(false, true)) {
// block dispatcher with higher unack-msg when it reaches broker-unack msg limit
log.info("[{}] Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}",
maxUnackedMessages, maxUnackedMsgsPerDispatcher);
executor().submit(() -> blockDispatchersWithLargeUnAckMessages());
} else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) {
// unblock broker-dispatching if received enough acked messages back
if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) {
unblockDispatchersOnUnAckMessages(blockedDispatchers.values());
}
}

}

public boolean isBrokerDispatchingBlocked() {
return blockedDispatcherOnHighUnackedMsgs.get();
}

private void blockDispatchersWithLargeUnAckMessages() {
lock.readLock().lock();
try {
topics.forEach((name, topicFuture) -> {
if (topicFuture.isDone()) {
try {
topicFuture.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
if (persistentSubscription
.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
.getDispatcher();
int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
dispatcher.getName(), dispatcher.getTotalUnackedMessages());
dispatcher.blockDispatcherOnUnackedMsgs();
blockedDispatchers.add(dispatcher);
}
}
});
} catch (Exception e) {
log.warn("Failed to get topic from future ", e);
}
}
});
} finally {
lock.readLock().unlock();
}
}

/**
* Unblocks the dispatchers and removes it from the {@link #blockedDispatchers} list
*
* @param dispatcherList
*/
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().submit(() -> dispatcher.readMoreEntries());
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
} finally {
lock.writeLock().unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.Dispatcher;
Expand Down Expand Up @@ -170,7 +171,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
readMoreEntries();
}

private void readMoreEntries() {
public void readMoreEntries() {
if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);

Expand Down Expand Up @@ -586,30 +587,56 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List
@Override
public void addUnAckedMessages(int numberOfMessages) {
// don't block dispatching if maxUnackedMessages = 0
if(maxUnackedMessages <= 0) {
if (maxUnackedMessages <= 0) {
return;
}
int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
if (unAckedMessages >= maxUnackedMessages
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, FALSE, TRUE)) {
// block dispatcher if it reaches maxUnAckMsg limit
log.info("[{}] Dispatcher is blocked due to unackMessages {} reached to max {}", name,
TOTAL_UNACKED_MESSAGES_UPDATER.get(this), maxUnackedMessages);
} else if (topic.getBrokerService().isBrokerDispatchingBlocked()
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
// unblock dispatcher: if dispatcher is blocked due to broker-unackMsg limit and if it ack back enough
// messages
if (TOTAL_UNACKED_MESSAGES_UPDATER.get(this) < (topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2)) {
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
// it removes dispatcher from blocked list and unblocks dispatcher by scheduling read
topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList(this));
}
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE
&& unAckedMessages < maxUnackedMessages / 2) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
topic.getBrokerService().executor().submit(() -> readMoreEntries());
}
}
// increment broker-level count
topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
}

public boolean isBlockedDispatcherOnUnackedMsgs() {
return BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE;
}

public void blockDispatcherOnUnackedMsgs() {
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.set(this, TRUE);
}

public void unBlockDispatcherOnUnackedMsgs() {
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.set(this, FALSE);
}

public int getTotalUnackedMessages() {
return TOTAL_UNACKED_MESSAGES_UPDATER.get(this);
}

public String getName() {
return name;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Loading

0 comments on commit a45ffe2

Please sign in to comment.