Skip to content

Commit

Permalink
[Broker] Avoid thread deadlock problem when creating topic policy rea…
Browse files Browse the repository at this point in the history
…der (apache#13837)

### Motivation

Currently, the topic policy reader creation thread is the `executor` of the `PulsarService`, this thread is also used to search the candidate broker. If they use the same thread in some conditions, the lookup request will be blocked and result in a lookup request timeout.

![image](https://user-images.githubusercontent.com/15029908/150124902-c4318182-56f2-4b31-9149-ae64a9919aa4.png)

![image](https://user-images.githubusercontent.com/15029908/150124952-66a9f095-bab2-40fa-9370-76d3f2158dac.png)

We could find out that the lookup request was blocked 1 minute until the lookup request timeout. The thread `pulsar-2-8` was blocked by topic policy reader creation.

### Modifications

Change the topic policy reader creation to be asynchronous. Modify the method `RetryUtil.retryAsynchronously` to handle asynchronous execution.

### Verifying this change

Add a new test to verify consumer creations can be successful when enabling the topic policy feature.
  • Loading branch information
gaoran10 authored Jan 20, 2022
1 parent dc20ef0 commit 760bfec
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
creatSystemTopicClientWithRetry(namespace);
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
readerCompletableFuture.whenComplete((reader, ex) -> {
if (ex != null) {
Expand All @@ -265,7 +265,7 @@ private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture
}
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry(
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
try {
Expand All @@ -277,13 +277,7 @@ protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTo
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
RetryUtil.retryAsynchronously(() -> {
try {
return systemTopicClient.newReader();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}, backoff, pulsarService.getExecutor(), result);
RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, pulsarService.getExecutor(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetr
.create() : backoff;
try {
RetryUtil.retryAsynchronously(() -> {
CompletableFuture<Optional<TopicPolicies>> future = new CompletableFuture<>();
try {
return Optional.ofNullable(getTopicPolicies(topicName, isGlobal));
future.complete(Optional.ofNullable(getTopicPolicies(topicName, isGlobal)));
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException exception) {
throw new RuntimeException(exception);
future.completeExceptionally(exception);
}
return future;
}, usedBackoff, scheduledExecutorService, response);
} catch (Exception e) {
response.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
Expand Down Expand Up @@ -53,6 +52,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testGetPolicyTimeout() throws Exception {
try {
service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor(), false).get();
} catch (Exception e) {
assertTrue(e.getCause().getCause() instanceof TopicPoliciesCacheNotInitException);
assertTrue(e.getCause() instanceof TopicPoliciesCacheNotInitException);
}
long cost = System.currentTimeMillis() - start;
assertTrue("actual:" + cost, cost >= 5000 - 1000);
Expand All @@ -330,9 +330,10 @@ public void testCreatSystemTopicClientWithRetry() throws Exception {

SystemTopicClient.Reader<PulsarEvent> reader = mock(SystemTopicClient.Reader.class);
// Throw an exception first, create successfully after retrying
doThrow(new PulsarClientException("test")).doReturn(reader).when(client).newReader();
doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();

SystemTopicClient.Reader<PulsarEvent> reader1 = service.creatSystemTopicClientWithRetry(null).get();
SystemTopicClient.Reader<PulsarEvent> reader1 = service.createSystemTopicClientWithRetry(null).get();

assertEquals(reader1, reader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@
*/
package org.apache.pulsar.broker.systopic;

import com.google.common.collect.Sets;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Test(groups = "broker")
public class PartitionedSystemTopicTest extends BrokerTestBase {

Expand Down Expand Up @@ -66,4 +75,28 @@ public void testAutoCreatedPartitionedSystemTopic() throws Exception {
Assert.assertEquals(partitions, PARTITIONS);
Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
}

@Test(timeOut = 1000 * 60)
public void testConsumerCreationWhenEnablingTopicPolicy() throws Exception {
String tenant = "tenant-" + RandomStringUtils.randomAlphabetic(4).toLowerCase();
admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
int namespaceCount = 30;
for (int i = 0; i < namespaceCount; i++) {
String ns = tenant + "/ns-" + i;
admin.namespaces().createNamespace(ns, 4);
String topic = ns + "/t1";
admin.topics().createPartitionedTopic(topic, 2);
}

List<CompletableFuture<Consumer<byte[]>>> futureList = new ArrayList<>();
for (int i = 0; i < namespaceCount; i++) {
String topic = tenant + "/ns-" + i + "/t1";
futureList.add(pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribeAsync());
}
FutureUtil.waitForAll(futureList).get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;
Expand All @@ -45,11 +46,14 @@ public void testFailAndRetry() throws Exception {
.setMandatoryStop(5000, TimeUnit.MILLISECONDS)
.create();
RetryUtil.retryAsynchronously(() -> {
CompletableFuture<Boolean> future = new CompletableFuture<>();
atomicInteger.incrementAndGet();
if (atomicInteger.get() < 5) {
throw new RuntimeException("fail");
future.completeExceptionally(new RuntimeException("fail"));
} else {
future.complete(true);
}
return true;
return future;
}, backoff, executor, callback);
assertTrue(callback.get());
assertEquals(atomicInteger.get(), 5);
Expand All @@ -66,9 +70,8 @@ public void testFail() throws Exception {
.setMandatoryStop(5000, TimeUnit.MILLISECONDS)
.create();
long start = System.currentTimeMillis();
RetryUtil.retryAsynchronously(() -> {
throw new RuntimeException("fail");
}, backoff, executor, callback);
RetryUtil.retryAsynchronously(() ->
FutureUtil.failedFuture(new RuntimeException("fail")), backoff, executor, callback);
try {
callback.get();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class RetryUtil {
private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);

public static <T> void retryAsynchronously(Supplier<T> supplier, Backoff backoff,
public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>> supplier, Backoff backoff,
ScheduledExecutorService scheduledExecutorService,
CompletableFuture<T> callback) {
if (backoff.getMax() <= 0) {
Expand All @@ -43,26 +43,25 @@ public static <T> void retryAsynchronously(Supplier<T> supplier, Backoff backoff
executeWithRetry(supplier, backoff, scheduledExecutorService, callback));
}

private static <T> void executeWithRetry(Supplier<T> supplier, Backoff backoff,
private static <T> void executeWithRetry(Supplier<CompletableFuture<T>> supplier, Backoff backoff,
ScheduledExecutorService scheduledExecutorService,
CompletableFuture<T> callback) {
try {
T result = supplier.get();
callback.complete(result);
} catch (Exception e) {
long next = backoff.next();
boolean isMandatoryStop = backoff.isMandatoryStopMade();
if (isMandatoryStop) {
callback.completeExceptionally(e);
} else {
if (log.isDebugEnabled()) {
log.debug("execute with retry fail, will retry in {} ms", next, e);
supplier.get().whenComplete((result, e) -> {
if (e != null) {
long next = backoff.next();
boolean isMandatoryStop = backoff.isMandatoryStopMade();
if (isMandatoryStop) {
callback.completeExceptionally(e);
} else {
log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next);
scheduledExecutorService.schedule(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback),
next, TimeUnit.MILLISECONDS);
}
log.info("Because of {}, will retry in {} ms", e.getMessage(), next);
scheduledExecutorService.schedule(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback),
next, TimeUnit.MILLISECONDS);
return;
}
}
callback.complete(result);
});
}

}

0 comments on commit 760bfec

Please sign in to comment.