Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Copy list of subscriptions when checking for message expiration to av…
…oid deadlocks (apache#8877) ### Motivation Some of our broker servers experienced what appears to be a deadlock. The following is the thread dump at that time. [threaddump.txt.zip](https://github.com/apache/pulsar/files/5665572/threaddump.txt.zip) The thread "ForkJoinPool.commonPool-worker-120" was locking an instance of `ManagedLedgerImpl`. And this thread seemed to be waiting for `subscriptions`, which is an instance of `ConcurrentOpenHashMap`, to be unlocked. Many other threads were blocked because the lock on the `ManagedLedgerImpl` instance was not released. ``` "ForkJoinPool.commonPool-worker-120" apache#903 daemon prio=5 os_prio=0 tid=0x00007f9aa0010000 nid=0x12b59 waiting on condition [0x00007f9528cc3000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007fa20b3e5eb0> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section) at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119) at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:650) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:720) - locked <0x00007fa20512f968> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:643) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:590) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$476/1880414247.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$475/707554512.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ``` The thread that locked `subscriptions` seems to be "pulsar-msg-expiry-monitor-24-1". ``` "pulsar-msg-expiry-monitor-24-1" apache#304 prio=5 os_prio=0 tid=0x00007f99602dd000 nid=0x12036 waiting on condition [0x00007f998d47c000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007fca4361dfb0> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNthEntry(ManagedCursorImpl.java:537) at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:1820) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:901) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$36(PersistentTopic.java:1102) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1011/2104832020.accept(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1102) at org.apache.pulsar.broker.service.BrokerService$$Lambda$1009/2005752676.accept(Unknown Source) at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$32(BrokerService.java:951) at org.apache.pulsar.broker.service.BrokerService$$Lambda$779/1852910990.accept(Unknown Source) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:948) at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:925) at org.apache.pulsar.broker.service.BrokerService$$Lambda$108/203149502.run(Unknown Source) at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ``` I can't understand why "pulsar-msg-expiry-monitor-24-1" was stuck. However, it seems that this deadlock can be avoided if `subscriptions` is not locked when checking for message expiration, so I created this PR. If anyone can explain why "pulsar-msg-expiry-monitor-24-1" was stuck, please let me know. ### Modifications When expiring messages for each subscription, copy the values of `subscriptions` as `List` and execute `forEach()` for that `List` instance.
- Loading branch information