Skip to content

Commit

Permalink
[proxy] Fix deadlock in pulsar proxy (apache#7690)
Browse files Browse the repository at this point in the history
### Motivation

When a broker server is restarted, the Pulsar proxy outputs the following warning:

```
15:09:01.486 [main-EventThread] INFO  o.a.p.z.ZooKeeperChildrenCache       - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers
15:09:31.487 [main-EventThread] WARN  o.a.p.p.s.util.ZookeeperCacheLoader  - Error updating broker info after broker list changed.
java.util.concurrent.TimeoutException: null
        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
        at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118)
        at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82)
        at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415)
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508)
```

This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states:
```
"main-EventThread" apache#26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
        at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123)
        at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84)
        at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source)
        at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89)
        at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415)
        at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source)
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508)
```

`ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`.

The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK.

`main-EventThread` is blocked at the following line, so deadlock occurs.
https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97

### Modifications

~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~

Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked.
  • Loading branch information
Masahiro Sakamoto authored Aug 6, 2020
1 parent 28505e4 commit d7c1cfa
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
Expand Down Expand Up @@ -80,8 +83,9 @@ public void setWatcher(ZooKeeperSessionWatcher watcher) {
}
});

this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
(int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
int zkOperationTimeoutSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs);
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), zkOperationTimeoutSeconds,
this.orderedExecutor);
localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() {
@Override
public void onSessionExpired() {
Expand All @@ -107,15 +111,16 @@ public LoadManagerReport deserialize(String key, byte[] content) throws Exceptio

this.availableBrokersCache = new ZooKeeperChildrenCache(getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
this.availableBrokersCache.registerListener((path, brokerNodes, stat) -> {
try {
updateBrokerList(brokerNodes);
} catch (Exception e) {
log.warn("Error updating broker info after broker list changed.", e);
}
updateBrokerList(brokerNodes).thenRun(() -> {
log.info("Successfully updated broker info {}", brokerNodes);
}).exceptionally(ex -> {
log.warn("Error updating broker info after broker list changed", ex);
return null;
});
});

// Do initial fetch of brokers list
updateBrokerList(availableBrokersCache.get());
updateBrokerList(availableBrokersCache.get()).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
}

public List<LoadManagerReport> getAvailableBrokers() {
Expand All @@ -133,13 +138,43 @@ public void close() throws IOException {
orderedExecutor.shutdown();
}

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
List<LoadManagerReport> availableBrokers = new ArrayList<>(brokerNodes.size());
private CompletableFuture<Void> updateBrokerList(Set<String> brokerNodes) {
CompletableFuture<Void> future = new CompletableFuture<>();

if (brokerNodes.isEmpty()) {
availableBrokers = new ArrayList<>();
future.complete(null);
return future;
}

List<CompletableFuture<Optional<LoadManagerReport>>> loadReportFutureList = new ArrayList<>();
for (String broker : brokerNodes) {
availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
loadReportFutureList.add(brokerInfo.getAsync(LOADBALANCE_BROKERS_ROOT + '/' + broker));
}

this.availableBrokers = availableBrokers;
FutureUtil.waitForAll(loadReportFutureList).thenRun(() -> {
List<LoadManagerReport> newAvailableBrokers = new ArrayList<>(brokerNodes.size());

for (CompletableFuture<Optional<LoadManagerReport>> loadReportFuture : loadReportFutureList) {
try {
Optional<LoadManagerReport> loadReport = loadReportFuture.get();
if (loadReport.isPresent()) {
newAvailableBrokers.add(loadReport.get());
}
} catch (Exception e) {
future.completeExceptionally(e);
return;
}
}

availableBrokers = newAvailableBrokers;
future.complete(null);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});

return future;
}

private static final Logger log = LoggerFactory.getLogger(ZookeeperCacheLoader.class);
Expand Down
6 changes: 6 additions & 0 deletions pulsar-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
Expand Down Expand Up @@ -66,8 +69,8 @@ public class ZookeeperCacheLoader implements Closeable {
*/
public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception {
this.zkClient = factory.create(zookeeperServers, SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get();
this.localZkCache = new LocalZooKeeperCache(zkClient,
(int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
int zkOperationTimeoutSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs);
this.localZkCache = new LocalZooKeeperCache(zkClient, zkOperationTimeoutSeconds, this.orderedExecutor);

this.brokerInfo = new ZooKeeperDataCache<LoadManagerReport>(localZkCache) {
@Override
Expand All @@ -78,18 +81,19 @@ public LoadManagerReport deserialize(String key, byte[] content) throws Exceptio

this.availableBrokersCache = new ZooKeeperChildrenCache(getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
this.availableBrokersCache.registerListener((path, brokerNodes, stat) -> {
try {
updateBrokerList(brokerNodes);
} catch (Exception e) {
log.warn("Error updating broker info after broker list changed.", e);
}
updateBrokerList(brokerNodes).thenRun(() -> {
log.info("Successfully updated broker info {}", brokerNodes);
}).exceptionally(ex -> {
log.warn("Error updating broker info after broker list changed", ex);
return null;
});
});

// Do initial fetch of brokers list
try {
updateBrokerList(availableBrokersCache.get());
updateBrokerList(availableBrokersCache.get()).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
} catch (NoNodeException nne) { // can happen if no broker started yet
updateBrokerList(Collections.emptySet());
updateBrokerList(Collections.emptySet()).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
}
}

Expand All @@ -112,13 +116,43 @@ public void close() throws IOException {
orderedExecutor.shutdown();
}

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
List<LoadManagerReport> availableBrokers = new ArrayList<>(brokerNodes.size());
private CompletableFuture<Void> updateBrokerList(Set<String> brokerNodes) {
CompletableFuture<Void> future = new CompletableFuture<>();

if (brokerNodes.isEmpty()) {
availableBrokers = new ArrayList<>();
future.complete(null);
return future;
}

List<CompletableFuture<Optional<LoadManagerReport>>> loadReportFutureList = new ArrayList<>();
for (String broker : brokerNodes) {
availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
loadReportFutureList.add(brokerInfo.getAsync(LOADBALANCE_BROKERS_ROOT + '/' + broker));
}

this.availableBrokers = availableBrokers;
FutureUtil.waitForAll(loadReportFutureList).thenRun(() -> {
List<LoadManagerReport> newAvailableBrokers = new ArrayList<>(brokerNodes.size());

for (CompletableFuture<Optional<LoadManagerReport>> loadReportFuture : loadReportFutureList) {
try {
Optional<LoadManagerReport> loadReport = loadReportFuture.get();
if (loadReport.isPresent()) {
newAvailableBrokers.add(loadReport.get());
}
} catch (Exception e) {
future.completeExceptionally(e);
return;
}
}

availableBrokers = newAvailableBrokers;
future.complete(null);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});

return future;
}

private static final Logger log = LoggerFactory.getLogger(ZookeeperCacheLoader.class);
Expand Down

0 comments on commit d7c1cfa

Please sign in to comment.