Skip to content

Commit

Permalink
[Issue 8599] Fix DispatchRateLimiter does not take effect (apache#8611)
Browse files Browse the repository at this point in the history
Fixes apache#8599
Fixes apache#4777

### Motivation

Pulsar current support topic level and subscription level dispatch rate limiter by using `DispatchRateLimiter`.  When consumers connected to broker and start reading entry, broker judge whether rate limit is exceeded before reading, and increasing the permits after reading finished by call tryAcquire().  When there are multi consumers using one `DispatchRateLimiter`, these consumers could start reading together and may increasing the `acquiredPermits` far more than `permits` after reading finished. As `acquiredPermits` will reset to 0 every second, all consumers could start reading in the next second and dispatch rate limiter will take no effect in such case.

### Modifications

This PR change the behaviour of `DispatchRateLimiter`, minus `permits` every second instead of reset `acquiredPermits` to 0, and the reading will stop for a while until `acquiredPermits` return to a value less than  `permits` .

### Verifying this change
RateLimiterTest.testDispatchRate()
  • Loading branch information
wangjialing218 authored May 23, 2021
1 parent d94c3f0 commit 02fc06e
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true);
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterMsg);
Expand All @@ -362,7 +362,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true);
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterByte);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
* verify rate-limiting should throttle message-dispatching based on byte-rate
*
* <pre>
* 1. dispatch-byte-rate = 100 bytes/sec
* 2. send 30 msgs : each with 10 byte
* 1. dispatch-byte-rate = 1000 bytes/sec
* 2. send 30 msgs : each with 100 byte
* 3. it should take up to 2 second to receive all messages
* </pre>
*
Expand All @@ -227,7 +227,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;

final int byteRate = 100;
final int byteRate = 1000;
DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class RateLimiter implements AutoCloseable{
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;
private RateLimitFunction rateLimitFunction;
private boolean isDispatchRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
Expand All @@ -72,14 +73,20 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchRateLimiter) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;
this.isDispatchRateLimiter = isDispatchRateLimiter;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -172,15 +179,22 @@ public synchronized boolean tryAcquire(long acquirePermit) {
renewTask = createTask();
}

// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
acquiredPermits = this.permits;
return false;
}
boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (canAcquire) {
if (isDispatchRateLimiter) {
// for dispatch rate limiter just add acquirePermit
acquiredPermits += acquirePermit;
} else {
// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
acquiredPermits = this.permits;
return false;
}

if (canAcquire) {
acquiredPermits += acquirePermit;
}
}

return canAcquire;
}

Expand Down Expand Up @@ -243,7 +257,7 @@ protected ScheduledFuture<?> createTask() {
}

synchronized void renew() {
acquiredPermits = 0;
acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,27 @@ public void testResetRate() throws Exception {
rate.close();
}

@Test
public void testDispatchRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true);
rate.tryAcquire(100);
rate.tryAcquire(100);
rate.tryAcquire(100);
assertEquals(rate.getAvailablePermits(), 0);

Thread.sleep(rateTimeMSec * 2);
// check after two rate-time: acquiredPermits is 100
assertEquals(rate.getAvailablePermits(), 0);

Thread.sleep(rateTimeMSec);
// check after three rate-time: acquiredPermits is 0
assertEquals(rate.getAvailablePermits() > 0, true);

rate.close();
}

@Test
public void testRateLimiterWithPermitUpdater() throws Exception {
long permits = 10;
Expand Down

0 comments on commit 02fc06e

Please sign in to comment.