Skip to content

Commit

Permalink
[pulsar-broker] Close RateLimiter instance (apache#5155)
Browse files Browse the repository at this point in the history
* Close RateLimiter instance

* Restore AbstractDispatcherSingleActiveConsumer#close()

* Fix NonPersistentSubscription#delete()

(cherry picked from commit b687fed)
  • Loading branch information
Masahiro Sakamoto authored and wolfstudy committed Nov 19, 2019
1 parent cb6fcbd commit b1651a9
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ public synchronized CompletableFuture<Void> disconnect() {
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);
dispatcher.reset();
if (dispatcher != null) {
dispatcher.reset();
}
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
Expand All @@ -269,13 +271,27 @@ public CompletableFuture<Void> delete() {
log.info("[{}][{}] Unsubscribing", topicName, subName);

// cursor close handles pending delete (ack) operations
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> deleteFuture.complete(null))
.exceptionally(exception -> {
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
deleteFuture.complete(null);
}).exceptionally(ex -> {
IS_FENCED_UPDATER.set(this, FALSE);
log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
deleteFuture.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.reset();
}
log.error("[{}][{}] Error deleting subscription", topicName, subName, ex);
deleteFuture.completeExceptionally(ex);
return null;
});
}
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);
log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
deleteFuture.completeExceptionally(exception);
return null;
});

return deleteFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ public CompletableFuture<Void> close() {
if (delayedDeliveryTracker.isPresent()) {
delayedDeliveryTracker.get().close();
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}

return disconnectAllConsumers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -514,5 +515,14 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
}
}

@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
return disconnectAllConsumers();
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

Expand All @@ -45,6 +46,7 @@
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -680,6 +682,33 @@ private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?>
}
}

@Override
public CompletableFuture<Void> disconnect() {
return disconnect(false);
}

@Override
public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
final CompletableFuture<Void> future = new CompletableFuture<>();

super.disconnect(failIfHasBacklog).thenRun(() -> {
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
future.complete(null);
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
if (t instanceof TopicBusyException == false) {
log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", topicName, localCluster,
remoteCluster, ex.getMessage());
}
future.completeExceptionally(t);
return null;
});

return future;
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,13 +781,27 @@ public CompletableFuture<Void> delete() {
log.info("[{}][{}] Unsubscribing", topicName, subName);

// cursor close handles pending delete (ack) operations
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> deleteFuture.complete(null))
.exceptionally(exception -> {
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
deleteFuture.complete(null);
}).exceptionally(ex -> {
IS_FENCED_UPDATER.set(this, FALSE);
log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
deleteFuture.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.reset();
}
log.error("[{}][{}] Error deleting subscription", topicName, subName, ex);
deleteFuture.completeExceptionally(ex);
return null;
});
}
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);
log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
deleteFuture.completeExceptionally(exception);
return null;
});

return deleteFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,15 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}

if (subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().close();
}

log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}
Expand Down Expand Up @@ -850,6 +859,14 @@ public void closeComplete(Object ctx) {
ctrl.close();
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}

if (subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().close();
}

log.info("[{}] Topic closed", topic);
closeFuture.complete(null);
}
Expand All @@ -861,14 +878,6 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
closeFuture.complete(null);
}
}, null);

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().close();
}
if (subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().close();
}

}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
isFenced = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
Expand Down Expand Up @@ -847,6 +848,51 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "subscriptions", timeOut = 10000)
public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/closingRateLimiter" + subscription.name();
final String subName = "mySubscription" + subscription.name();

DispatchRate dispatchRate = new DispatchRate(10, 1024, 1);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setDispatchRate(namespace, dispatchRate);

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(subscription).subscribe();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

final int numProducedMessages = 10;

for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}

for (int i = 0; i < numProducedMessages; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
}

Assert.assertTrue(topic.getDispatchRateLimiter().isPresent());
DispatchRateLimiter dispatchRateLimiter = topic.getDispatchRateLimiter().get();

producer.close();
consumer.unsubscribe();
consumer.close();
topic.close().get();

// Make sure that the rate limiter is closed
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);

log.info("-- Exiting {} test --", methodName);
}

protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater");
statsUpdaterField.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.slf4j.Logger;
Expand Down Expand Up @@ -550,4 +551,50 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {

log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "subscriptions", timeOut = 10000)
public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/closingSubRateLimiter" + subscription.name();
final String subName = "mySubscription" + subscription.name();

DispatchRate dispatchRate = new DispatchRate(10, 1024, 1);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(subscription).subscribe();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
PersistentSubscription sub = topic.getSubscription(subName);

final int numProducedMessages = 10;

for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}

for (int i = 0; i < numProducedMessages; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
}

Dispatcher dispatcher = sub.getDispatcher();
Assert.assertTrue(dispatcher.getRateLimiter().isPresent());
DispatchRateLimiter dispatchRateLimiter = dispatcher.getRateLimiter().get();

producer.close();
consumer.close();
sub.disconnect().get();

// Make sure that the rate limiter is closed
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);

log.info("-- Exiting {} test --", methodName);
}
}

0 comments on commit b1651a9

Please sign in to comment.