From 09a05b680da6e44918b76599b78e8e0cb0c19109 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 27 Nov 2017 13:53:12 +0100 Subject: [PATCH] Dont use ThreadDeathWatcher to cleanup PoolThreadCache if FastThreadLocalThread with wrapped Runnable is used Motivation: We dont need to use the ThreadDeathWatcher if we use a FastThreadLocalThread for which we wrap the Runnable and ensure we call FastThreadLocal.removeAll() once the Runnable completes. Modifications: - Dont use a ThreadDeathWatcher if we are sure we will call FastThreadLocal.removeAll() - Add unit test. Result: Less overhead / running theads if you only allocate / deallocate from FastThreadLocalThreads. --- .../java/io/netty/buffer/PoolThreadCache.java | 21 +++++---- .../netty/buffer/PooledByteBufAllocator.java | 16 +++++-- .../buffer/PooledByteBufAllocatorTest.java | 46 +++++++++++++++---- .../util/concurrent/DefaultThreadFactory.java | 20 +------- .../concurrent/FastThreadLocalRunnable.java | 39 ++++++++++++++++ .../concurrent/FastThreadLocalThread.java | 32 ++++++++++--- 6 files changed, 128 insertions(+), 46 deletions(-) create mode 100644 common/src/main/java/io/netty/util/concurrent/FastThreadLocalRunnable.java diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index b2a604708b71..a4cb574b9922 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -66,7 +66,8 @@ final class PoolThreadCache { PoolThreadCache(PoolArena heapArena, PoolArena directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, - int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { + int maxCachedBufferCapacity, int freeSweepAllocationThreshold, + boolean useThreadDeathWatcher) { if (maxCachedBufferCapacity < 0) { throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)"); @@ -112,14 +113,16 @@ final class PoolThreadCache { numShiftsNormalHeap = -1; } - // We only need to watch the thread when any cache is used. - if (tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null - || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) { - // Only check freeSweepAllocationThreshold when there are caches in use. - if (freeSweepAllocationThreshold < 1) { - throw new IllegalArgumentException("freeSweepAllocationThreshold: " - + freeSweepAllocationThreshold + " (expected: > 0)"); - } + // Only check if there are caches in use. + if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null + || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) + && freeSweepAllocationThreshold < 1) { + throw new IllegalArgumentException("freeSweepAllocationThreshold: " + + freeSweepAllocationThreshold + " (expected: > 0)"); + } + + if (useThreadDeathWatcher) { + freeTask = new Runnable() { @Override public void run() { diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index d5ce4e183f82..cef3d4f4f8d8 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -435,13 +435,19 @@ protected synchronized PoolThreadCache initialValue() { final PoolArena heapArena = leastUsedArena(heapArenas); final PoolArena directArena = leastUsedArena(directArenas); - if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) { + Thread current = Thread.currentThread(); + boolean fastThread = current instanceof FastThreadLocalThread; + if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { + // If our FastThreadLocalThread will call FastThreadLocal.removeAll() we not need to use + // the ThreadDeathWatcher to release memory from the PoolThreadCache once the Thread dies. + boolean useTheadWatcher = fastThread ? + !((FastThreadLocalThread) current).willCleanupFastThreadLocals() : true; return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, - DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL, useTheadWatcher); } // No caching for non FastThreadLocalThreads. - return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); + return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0, false); } @Override @@ -594,7 +600,9 @@ private static long usedMemory(PoolArena... arenas) { } final PoolThreadCache threadCache() { - return threadCache.get(); + PoolThreadCache cache = threadCache.get(); + assert cache != null; + return cache; } /** diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java index 7237f7c7371e..d0676ad2644a 100644 --- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java +++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java @@ -240,19 +240,28 @@ public void testFreePoolChunk() { assertFalse(lists.get(5).iterator().hasNext()); } - // The ThreadDeathWatcher sleeps 1s, give it double that time. - @Test (timeout = 2000) - public void testThreadCacheDestroyedByThreadDeathWatcher() { + @Test (timeout = 4000) + public void testThreadCacheDestroyedByThreadDeathWatcher() throws InterruptedException { + testThreadCacheDestroyedByThreadDeathWatcher(false); + } + + @Test (timeout = 4000) + public void testThreadCacheDestroyedAfterExitRun() throws InterruptedException { + testThreadCacheDestroyedByThreadDeathWatcher(true); + } + + private static void testThreadCacheDestroyedByThreadDeathWatcher(boolean useRunnable) throws InterruptedException { int numArenas = 11; final PooledByteBufAllocator allocator = new PooledByteBufAllocator(numArenas, numArenas, 8192, 1); final AtomicBoolean threadCachesCreated = new AtomicBoolean(true); + final CountDownLatch latch = new CountDownLatch(numArenas); - for (int i = 0; i < numArenas; i++) { - new FastThreadLocalThread(new Runnable() { - @Override - public void run() { + final Runnable task = new Runnable() { + @Override + public void run() { + try { ByteBuf buf = allocator.newHeapBuffer(1024, 1024); for (int i = 0; i < buf.capacity(); i++) { buf.writeByte(0); @@ -266,10 +275,31 @@ public void run() { } buf.release(); + } finally { + latch.countDown(); } - }).start(); + } + }; + + for (int i = 0; i < numArenas; i++) { + final FastThreadLocalThread thread; + if (useRunnable) { + thread = new FastThreadLocalThread(task); + assertTrue(thread.willCleanupFastThreadLocals()); + } else { + thread = new FastThreadLocalThread() { + @Override + public void run() { + task.run(); + } + }; + assertFalse(thread.willCleanupFastThreadLocals()); + } + thread.start(); } + latch.await(); + // Wait for the ThreadDeathWatcher to have destroyed all thread caches while (allocator.metric().numThreadLocalCaches() > 0) { LockSupport.parkNanos(MILLISECONDS.toNanos(100)); diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java index 18c6e29c4656..15c76582f76d 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java @@ -105,7 +105,7 @@ public DefaultThreadFactory(String poolName, boolean daemon, int priority) { @Override public Thread newThread(Runnable r) { - Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); + Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); @@ -123,22 +123,4 @@ public Thread newThread(Runnable r) { protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } - - private static final class DefaultRunnableDecorator implements Runnable { - - private final Runnable r; - - DefaultRunnableDecorator(Runnable r) { - this.r = r; - } - - @Override - public void run() { - try { - r.run(); - } finally { - FastThreadLocal.removeAll(); - } - } - } } diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalRunnable.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalRunnable.java new file mode 100644 index 000000000000..f2c13e8ac6f4 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalRunnable.java @@ -0,0 +1,39 @@ +/* +* Copyright 2017 The Netty Project +* +* The Netty Project licenses this file to you under the Apache License, +* version 2.0 (the "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations +* under the License. +*/ +package io.netty.util.concurrent; + +import io.netty.util.internal.ObjectUtil; + +final class FastThreadLocalRunnable implements Runnable { + private final Runnable runnable; + + private FastThreadLocalRunnable(Runnable runnable) { + this.runnable = ObjectUtil.checkNotNull(runnable, "runnable"); + } + + @Override + public void run() { + try { + runnable.run(); + } finally { + FastThreadLocal.removeAll(); + } + } + + static Runnable wrap(Runnable runnable) { + return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable); + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java index bd3e1c4441a0..35fef3fe120e 100644 --- a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java @@ -16,42 +16,54 @@ package io.netty.util.concurrent; import io.netty.util.internal.InternalThreadLocalMap; +import io.netty.util.internal.UnstableApi; /** * A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables. */ public class FastThreadLocalThread extends Thread { + // This will be set to true if we have a chance to wrap the Runnable. + private final boolean cleanupFastThreadLocals; private InternalThreadLocalMap threadLocalMap; - public FastThreadLocalThread() { } + public FastThreadLocalThread() { + cleanupFastThreadLocals = false; + } public FastThreadLocalThread(Runnable target) { - super(target); + super(FastThreadLocalRunnable.wrap(target)); + cleanupFastThreadLocals = true; } public FastThreadLocalThread(ThreadGroup group, Runnable target) { - super(group, target); + super(group, FastThreadLocalRunnable.wrap(target)); + cleanupFastThreadLocals = true; } public FastThreadLocalThread(String name) { super(name); + cleanupFastThreadLocals = false; } public FastThreadLocalThread(ThreadGroup group, String name) { super(group, name); + cleanupFastThreadLocals = false; } public FastThreadLocalThread(Runnable target, String name) { - super(target, name); + super(FastThreadLocalRunnable.wrap(target), name); + cleanupFastThreadLocals = true; } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { - super(group, target, name); + super(group, FastThreadLocalRunnable.wrap(target), name); + cleanupFastThreadLocals = true; } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { - super(group, target, name, stackSize); + super(group, FastThreadLocalRunnable.wrap(target), name, stackSize); + cleanupFastThreadLocals = true; } /** @@ -69,4 +81,12 @@ public final InternalThreadLocalMap threadLocalMap() { public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { this.threadLocalMap = threadLocalMap; } + + /** + * Returns {@code true} if {@link FastThreadLocal#removeAll()} will be called once {@link #run()} completes. + */ + @UnstableApi + public boolean willCleanupFastThreadLocals() { + return cleanupFastThreadLocals; + } }