Skip to content

Commit

Permalink
[pulsar-broker] Support configuration to rate-limit dispatching on ba…
Browse files Browse the repository at this point in the history
…tch message (apache#12294)
  • Loading branch information
rdhabalia authored Oct 20, 2021
1 parent 7c219b1 commit 5523604
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 14 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Apply dispatch rate limiting on batch message instead individual
# messages with in batch message. (Default is disabled)
dispatchThrottlingOnBatchMessageEnabled=false

# Default number of message dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message dispatch-throttling.
dispatchThrottlingRatePerSubscriptionInMsg=0
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Apply dispatch rate limiting on batch message instead individual
# messages with in batch message. (Default is disabled)
dispatchThrottlingOnBatchMessageEnabled=false

# Dispatch rate-limiting relative to publish rate.
# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
+ "Using a value of 0, is disabling default message-byte dispatch-throttling")
private long dispatchThrottlingRatePerTopicInByte = 0;
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Apply dispatch rate limiting on batch message instead individual "
+ "messages with in batch message. (Default is disabled)")
private boolean dispatchThrottlingOnBatchMessageEnabled = false;

@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
protected final Subscription subscription;

protected final ServiceConfiguration serviceConfig;
protected final boolean dispatchThrottlingOnBatchMessageEnabled;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
}

/**
Expand Down Expand Up @@ -97,24 +99,26 @@ protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<
* @param sendMessageInfo
* an object where the total size in messages and bytes will be returned back to the caller
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead) {
filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
isReplayRead);
}

public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
totalEntries++;
ByteBuf metadataAndPayload = entry.getDataBuffer();
int entryWrapperIndex = i + entryWrapperOffset;
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
Expand Down Expand Up @@ -182,6 +186,7 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

int firstAvailableConsumerPermits, currentTotalAvailablePermits;
Expand Down Expand Up @@ -541,8 +542,9 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
totalEntries += filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay);

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
Expand Down Expand Up @@ -571,13 +573,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}

// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,16 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
redeliveryTracker)
.addListener(future -> {
if (future.isSuccess()) {
int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
: sendMessageInfo.getTotalMessages();
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(sendMessageInfo.getTotalMessages(),
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
}

dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(sendMessageInfo.getTotalMessages(),
rateLimiter.tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int entriesCount = entries.size();

// Trigger read more messages
Expand Down Expand Up @@ -229,8 +230,8 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay);
totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay);

consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
Expand All @@ -252,12 +253,13 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,84 @@ public void testRateLimitingMultipleConsumers() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testRateLimitingWithBatchMsgEnabled() throws Exception {
log.info("-- Starting {} test --", methodName);

conf.setDispatchThrottlingOnBatchMessageEnabled(true);

final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers";

final int messageRate = 5;
DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(messageRate)
.dispatchThrottlingRateInByte(-1).ratePeriodInSecond(360).build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setDispatchRate(namespace, dispatchRate);

final int messagesPerBatch = 100;
final int numProducedMessages = messageRate * messagesPerBatch;
// create producer and topic
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(messagesPerBatch).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
boolean isMessageRateUpdate = false;
int retry = 5;
for (int i = 0; i < retry; i++) {
if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);

final AtomicInteger totalReceived = new AtomicInteger(0);

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
});
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
Consumer<byte[]> consumer5 = consumerBuilder.subscribe();

// deactive cursors
deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());

// Asynchronously produce messages
CountDownLatch latch = new CountDownLatch(numProducedMessages);
for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.sendAsync(message.getBytes()).thenAccept(__ -> latch.countDown());
}

latch.await();

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> totalReceived.get() == numProducedMessages);

// consumer should not have received all published message due to message-rate throttling
Assert.assertEquals(totalReceived.get(), numProducedMessages);

consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
consumer5.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "subscriptions", timeOut = 5000)
public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|subscribeRatePeriodPerConsumerInSecond|Rate period for {subscribeThrottlingRatePerConsumer}. By default, it is 30s.|30|
| dispatchThrottlingRatePerTopicInMsg | Default messages (per second) dispatch throttling-limit for every topic. When the value is set to 0, default message dispatch throttling-limit is disabled. |0 |
| dispatchThrottlingRatePerTopicInByte | Default byte (per second) dispatch throttling-limit for every topic. When the value is set to 0, default byte dispatch throttling-limit is disabled. | 0|
| dispatchThrottlingOnBatchMessageEnabled |Apply dispatch rate limiting on batch message instead individual messages with in batch message. (Default is disabled). | false|

| dispatchThrottlingRateRelativeToPublishRate | Enable dispatch rate-limiting relative to publish rate. | false |
|dispatchThrottlingRatePerSubscriptionInMsg|The defaulted number of message dispatching throttling-limit for a subscription. The value of 0 disables message dispatch-throttling.|0|
|dispatchThrottlingRatePerSubscriptionInByte|The default number of message-bytes dispatching throttling-limit for a subscription. The value of 0 disables message-byte dispatch-throttling.|0|
Expand Down

0 comments on commit 5523604

Please sign in to comment.