Skip to content

Commit

Permalink
[Broker] Fix broker dispatch byte rate limiter. (apache#11135)
Browse files Browse the repository at this point in the history
## Motivation
fix apache#11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that  dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
  • Loading branch information
congbobo184 authored Jul 6, 2021
1 parent a7e40ea commit ce6be12
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -47,8 +48,11 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {

protected final Subscription subscription;

protected AbstractBaseDispatcher(Subscription subscription) {
protected final ServiceConfiguration serviceConfig;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
}

/**
Expand Down Expand Up @@ -234,6 +238,19 @@ public void resetCloseFuture() {
// noop
}

protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg,
long bytesToRead, long availablePermitsOnByte) {
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
}

if (availablePermitsOnByte > 0) {
bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
}

return Pair.of(messagesToRead, bytesToRead);
}

protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.slf4j.Logger;
Expand All @@ -46,8 +47,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi

private Random random = new Random(42);

protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
protected AbstractDispatcherMultipleConsumers(Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}

public boolean isConsumerConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -55,8 +56,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
private volatile int isClosed = FALSE;

public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName, Subscription subscription) {
super(subscription);
String topicName, Subscription subscription,
ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
@SuppressWarnings("unused")
private volatile int totalAvailablePermits = 0;

private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
super(subscription);
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
Expand All @@ -40,16 +39,15 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final NonPersistentTopic topic;
private final Rate msgDrop;
private final Subscription subscription;
private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ public long getAvailableDispatchRateLimitOnMsg() {
return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits();
}

/**
* returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnByte() {
return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits();
}

/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -106,7 +106,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnUnackedMsgs");
protected final ServiceConfiguration serviceConfig;
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();

protected enum ReadType {
Expand All @@ -115,8 +114,7 @@ protected enum ReadType {

public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription) {
super(subscription);
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.cursor = cursor;
this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
Expand Down Expand Up @@ -224,9 +222,11 @@ public synchronized void readMoreEntries() {
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();

if (-1 == messagesToRead) {
if (messagesToRead == -1 || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
return;
}
Expand Down Expand Up @@ -263,8 +263,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
this,
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
Expand All @@ -276,8 +275,10 @@ public synchronized void readMoreEntries() {
}
}

protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
// left pair is messagesToRead, right pair is bytesToRead
protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits) {
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();

Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record to read
Expand Down Expand Up @@ -310,13 +311,15 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
}
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
return -1;
return Pair.of(-1, -1L);
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();

}
}

Expand All @@ -331,13 +334,14 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
}
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
return -1;
return Pair.of(-1, -1L);
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
(int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();
}
}

Expand All @@ -347,11 +351,11 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
}
return -1;
return Pair.of(-1, -1L);
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
return Math.max(messagesToRead, 1);
return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
}

protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
Expand Down
Loading

0 comments on commit ce6be12

Please sign in to comment.