From d654d5e0d2661592654204a0594fcb0a73f14a4e Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Tue, 9 Feb 2021 08:25:37 -0800 Subject: [PATCH] [pulsar-function] fix possible deadlock on broker-function service startup (#9499) ### Motivation Standalone pulsar broker and function service can have deadlock which also sometime causes failures in function unit-test case. Pulsar function tries to create a subscription which blocks the zk thread and can be cause of possible deadlock. Below is the thread-dump when the deadlock happens and function service start up fails. So, remove blocking call while creating subscription using admin-api. ``` "pulsar-load-manager-139-1" #717 prio=5 os_prio=31 tid=0x00007f8a68698000 nid=0x3610f waiting on condition [0x00007000156e9000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000792a07020> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationFromZK(SimpleLoadManagerImpl.java:391) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationDouble(SimpleLoadManagerImpl.java:401) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getLoadBalancerBrokerOverloadedThresholdPercentage(SimpleLoadManagerImpl.java:450) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.generateLoadReportForcefully(SimpleLoadManagerImpl.java:1139) - locked <0x000000078e53f2d0> (a java.util.HashSet) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.writeLoadReportOnZookeeper(SimpleLoadManagerImpl.java:1295) at org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask.run(LoadReportUpdaterTask.java:39) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) "pulsar-ordered-OrderedExecutor-1-0-EventThread" #621 daemon prio=5 os_prio=31 tid=0x00007f89fd8e1800 nid=0x2fe07 waiting on condition [0x000070000f7ce000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000792ecb8b0> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:2060) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$69(PersistentTopicsBase.java:2034) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$676/1709042625.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$5(ZooKeeperCache.java:249) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$154/5998675.processResult(Unknown Source) at org.apache.bookkeeper.zookeeper.ZooKeeperClient$15$1.processResult(ZooKeeperClient.java:879) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:583) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510) main" #1 prio=5 os_prio=31 tid=0x00007f8aa0013000 nid=0x1603 waiting on condition [0x0000700004160000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007930efe68> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:178) at org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:297) at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:632) at org.glassfish.jersey.client.JerseyInvocation$$Lambda$672/237755480.call(Unknown Source) at org.glassfish.jersey.client.JerseyInvocation.call(JerseyInvocation.java:654) at org.glassfish.jersey.client.JerseyInvocation.lambda$runInScope$3(JerseyInvocation.java:648) at org.glassfish.jersey.client.JerseyInvocation$$Lambda$673/197134325.call(Unknown Source) at org.glassfish.jersey.internal.Errors.process(Errors.java:292) at org.glassfish.jersey.internal.Errors.process(Errors.java:274) at org.glassfish.jersey.internal.Errors.process(Errors.java:205) at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390) at org.glassfish.jersey.client.JerseyInvocation.runInScope(JerseyInvocation.java:648) at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:631) at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:434) at org.glassfish.jersey.client.JerseyInvocation$Builder.put(JerseyInvocation.java:318) at org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:1143) at org.apache.pulsar.functions.worker.PulsarWorkerService.start(PulsarWorkerService.java:454) at org.apache.pulsar.broker.PulsarService.startWorkerService(PulsarService.java:1343) at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:671) at org.apache.pulsar.io.PulsarFunctionE2ETest.setup(PulsarFunctionE2ETest.java:209) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ``` --- .../admin/impl/PersistentTopicsBase.java | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a55f6830a5f0f..82a0e5c85dd25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2124,30 +2124,34 @@ private void internalCreateSubscriptionForNonPartitionedTopic( asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic")); return; } - PersistentSubscription subscription = (PersistentSubscription) topic - .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get(); - // Mark the cursor as "inactive" as it was created without a real consumer connected - subscription.deactivateCursor(); - subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId())) - .thenRun(() -> { - log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), - topicName, subscriptionName, targetMessageId); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); - log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, - subscriptionName, targetMessageId, t); - if (t instanceof SubscriptionInvalidCursorPosition) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Unable to find position for position specified: " + t.getMessage())); - } else if (t instanceof SubscriptionBusyException) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Failed for Subscription Busy: " + t.getMessage())); - } else { - resumeAsyncResponseExceptionally(asyncResponse, t); - } - return null; - }); + topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated).thenApply(subscription -> { + // Mark the cursor as "inactive" as it was created without a real consumer connected + ((PersistentSubscription) subscription).deactivateCursor(); + subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId())) + .thenRun(() -> { + log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), + topicName, subscriptionName, targetMessageId); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); + log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), + topicName, subscriptionName, targetMessageId, t); + if (t instanceof SubscriptionInvalidCursorPosition) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for position specified: " + t.getMessage())); + } else if (t instanceof SubscriptionBusyException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Failed for Subscription Busy: " + t.getMessage())); + } else { + resumeAsyncResponseExceptionally(asyncResponse, t); + } + return null; + }); + return null; + }).exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex.getCause()); + return null; + }); } catch (Throwable e) { log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, subscriptionName, targetMessageId, e);