From 363884e13639e75a508be33db485b99fcbb8d26a Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 7 Jan 2022 01:17:36 -0800 Subject: [PATCH] Add a lock-based message passing queue to help debug a problem with Recycler (#11972) Motivation: The Recycler has been observed to produce infinite loops on ARM CPUs. It is not clear if this is caused by a JDK bug, a JCTools bug, or a Recycler bug. Having the ability to switch out the JCTools queue implementation will aid us in the investigation. Modification: Implement MessagePassingQueue as a synchronized ArrayDeque, and add a system property to enable the use of this implementation in Recycler. Result: We should now be able to rule out the possibility of a bug in either JCTools or the Recycler. --- .../src/main/java/io/netty/util/Recycler.java | 124 +++++++++++++++++- .../test/java/io/netty/util/RecyclerTest.java | 6 - 2 files changed, 121 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index d5dfc4cbcfdf..b80c749df16a 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -23,8 +23,11 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import org.jctools.queues.MessagePassingQueue; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import static io.netty.util.internal.PlatformDependent.newMpscQueue; import static java.lang.Math.max; import static java.lang.Math.min; @@ -50,6 +53,7 @@ public String toString() { private static final int DEFAULT_MAX_CAPACITY_PER_THREAD; private static final int RATIO; private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD; + private static final boolean BLOCKING_POOL; static { // In the future, we might have different maxCapacity for different object types. @@ -69,15 +73,19 @@ public String toString() { // bursts. RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8)); + BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false); + if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) { logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled"); logger.debug("-Dio.netty.recycler.ratio: disabled"); logger.debug("-Dio.netty.recycler.chunkSize: disabled"); + logger.debug("-Dio.netty.recycler.blocking: disabled"); } else { logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD); logger.debug("-Dio.netty.recycler.ratio: {}", RATIO); logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD); + logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL); } } } @@ -252,9 +260,11 @@ private static final class LocalPool { @SuppressWarnings("unchecked") LocalPool(int maxCapacity, int ratioInterval, int chunkSize) { this.ratioInterval = ratioInterval; - // If the queue is of type MessagePassingQueue we can use a special LocalPoolQueue implementation. - pooledHandles = (MessagePassingQueue>) PlatformDependent - .newMpscQueue(chunkSize, maxCapacity); + if (BLOCKING_POOL) { + pooledHandles = new BlockingMessageQueue>(maxCapacity); + } else { + pooledHandles = (MessagePassingQueue>) newMpscQueue(chunkSize, maxCapacity); + } ratioCounter = ratioInterval; // Start at interval so the first one will be recycled. } @@ -279,4 +289,112 @@ DefaultHandle newHandle() { return null; } } + + /** + * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from + * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose. + * The implementation relies on synchronised monitor locks for thread-safety. + * The {@code drain} and {@code fill} bulk operations are not supported by this implementation. + */ + private static final class BlockingMessageQueue implements MessagePassingQueue { + private final Queue deque; + private final int maxCapacity; + + BlockingMessageQueue(int maxCapacity) { + this.maxCapacity = maxCapacity; + // This message passing queue is backed by an ArrayDeque instance, + // made thread-safe by synchronising on `this` BlockingMessageQueue instance. + // Why ArrayDeque? + // We use ArrayDeque instead of LinkedList or LinkedBlockingQueue because it's more space efficient. + // We use ArrayDeque instead of ArrayList because we need the queue APIs. + // We use ArrayDeque instead of ConcurrentLinkedQueue because CLQ is unbounded and has O(n) size(). + // We use ArrayDeque instead of ArrayBlockingQueue because ABQ allocates its max capacity up-front, + // and these queues will usually have large capacities, in potentially great numbers (one per thread), + // but often only have comparatively few items in them. + deque = new ArrayDeque(); + } + + @Override + public synchronized boolean offer(T e) { + if (deque.size() == maxCapacity) { + return false; + } + return deque.offer(e); + } + + @Override + public synchronized T poll() { + return deque.poll(); + } + + @Override + public synchronized T peek() { + return deque.peek(); + } + + @Override + public synchronized int size() { + return deque.size(); + } + + @Override + public synchronized void clear() { + deque.clear(); + } + + @Override + public synchronized boolean isEmpty() { + return deque.isEmpty(); + } + + @Override + public int capacity() { + return maxCapacity; + } + + @Override + public boolean relaxedOffer(T e) { + return offer(e); + } + + @Override + public T relaxedPoll() { + return poll(); + } + + @Override + public T relaxedPeek() { + return peek(); + } + + @Override + public int drain(Consumer c, int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public int fill(Supplier s, int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public int drain(Consumer c) { + throw new UnsupportedOperationException(); + } + + @Override + public int fill(Supplier s) { + throw new UnsupportedOperationException(); + } + + @Override + public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { + throw new UnsupportedOperationException(); + } + + @Override + public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { + throw new UnsupportedOperationException(); + } + } } diff --git a/common/src/test/java/io/netty/util/RecyclerTest.java b/common/src/test/java/io/netty/util/RecyclerTest.java index 76fdd465f321..edfd797d0afe 100644 --- a/common/src/test/java/io/netty/util/RecyclerTest.java +++ b/common/src/test/java/io/netty/util/RecyclerTest.java @@ -43,12 +43,6 @@ private static Recycler newRecycler(int maxCapacityPerThread) { return newRecycler(maxCapacityPerThread, 8, maxCapacityPerThread >> 1); } - private static Recycler newRecycler(int maxCapacityPerThread, int maxSharedCapacityFactor, - int ratio, int maxDelayedQueuesPerThread, - int delayedQueueRatio, int chunkSize) { - return newRecycler(maxCapacityPerThread, ratio, chunkSize); - } - private static Recycler newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) { return new Recycler(maxCapacityPerThread, ratio, chunkSize) { @Override