Skip to content

Commit

Permalink
Dont use ThreadDeathWatcher to cleanup PoolThreadCache if FastThreadL…
Browse files Browse the repository at this point in the history
…ocalThread with wrapped Runnable is used

Motivation:

We dont need to use the ThreadDeathWatcher if we use a FastThreadLocalThread for which we wrap the Runnable and ensure we call FastThreadLocal.removeAll() once the Runnable completes.

Modifications:

- Dont use a ThreadDeathWatcher if we are sure we will call FastThreadLocal.removeAll()
- Add unit test.

Result:

Less overhead / running theads if you only allocate / deallocate from FastThreadLocalThreads.
  • Loading branch information
normanmaurer committed Nov 28, 2017
1 parent 9ef8323 commit 09a05b6
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 46 deletions.
21 changes: 12 additions & 9 deletions buffer/src/main/java/io/netty/buffer/PoolThreadCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ final class PoolThreadCache {

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
int maxCachedBufferCapacity, int freeSweepAllocationThreshold,
boolean useThreadDeathWatcher) {
if (maxCachedBufferCapacity < 0) {
throw new IllegalArgumentException("maxCachedBufferCapacity: "
+ maxCachedBufferCapacity + " (expected: >= 0)");
Expand Down Expand Up @@ -112,14 +113,16 @@ final class PoolThreadCache {
numShiftsNormalHeap = -1;
}

// We only need to watch the thread when any cache is used.
if (tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) {
// Only check freeSweepAllocationThreshold when there are caches in use.
if (freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}

if (useThreadDeathWatcher) {

freeTask = new Runnable() {
@Override
public void run() {
Expand Down
16 changes: 12 additions & 4 deletions buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,19 @@ protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
Thread current = Thread.currentThread();
boolean fastThread = current instanceof FastThreadLocalThread;
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
// If our FastThreadLocalThread will call FastThreadLocal.removeAll() we not need to use
// the ThreadDeathWatcher to release memory from the PoolThreadCache once the Thread dies.
boolean useTheadWatcher = fastThread ?
!((FastThreadLocalThread) current).willCleanupFastThreadLocals() : true;
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL, useTheadWatcher);
}
// No caching for non FastThreadLocalThreads.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0, false);
}

@Override
Expand Down Expand Up @@ -594,7 +600,9 @@ private static long usedMemory(PoolArena<?>... arenas) {
}

final PoolThreadCache threadCache() {
return threadCache.get();
PoolThreadCache cache = threadCache.get();
assert cache != null;
return cache;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,28 @@ public void testFreePoolChunk() {
assertFalse(lists.get(5).iterator().hasNext());
}

// The ThreadDeathWatcher sleeps 1s, give it double that time.
@Test (timeout = 2000)
public void testThreadCacheDestroyedByThreadDeathWatcher() {
@Test (timeout = 4000)
public void testThreadCacheDestroyedByThreadDeathWatcher() throws InterruptedException {
testThreadCacheDestroyedByThreadDeathWatcher(false);
}

@Test (timeout = 4000)
public void testThreadCacheDestroyedAfterExitRun() throws InterruptedException {
testThreadCacheDestroyedByThreadDeathWatcher(true);
}

private static void testThreadCacheDestroyedByThreadDeathWatcher(boolean useRunnable) throws InterruptedException {
int numArenas = 11;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);

final AtomicBoolean threadCachesCreated = new AtomicBoolean(true);
final CountDownLatch latch = new CountDownLatch(numArenas);

for (int i = 0; i < numArenas; i++) {
new FastThreadLocalThread(new Runnable() {
@Override
public void run() {
final Runnable task = new Runnable() {
@Override
public void run() {
try {
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
for (int i = 0; i < buf.capacity(); i++) {
buf.writeByte(0);
Expand All @@ -266,10 +275,31 @@ public void run() {
}

buf.release();
} finally {
latch.countDown();
}
}).start();
}
};

for (int i = 0; i < numArenas; i++) {
final FastThreadLocalThread thread;
if (useRunnable) {
thread = new FastThreadLocalThread(task);
assertTrue(thread.willCleanupFastThreadLocals());
} else {
thread = new FastThreadLocalThread() {
@Override
public void run() {
task.run();
}
};
assertFalse(thread.willCleanupFastThreadLocals());
}
thread.start();
}

latch.await();

// Wait for the ThreadDeathWatcher to have destroyed all thread caches
while (allocator.metric().numThreadLocalCaches() > 0) {
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public DefaultThreadFactory(String poolName, boolean daemon, int priority) {

@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
Expand All @@ -123,22 +123,4 @@ public Thread newThread(Runnable r) {
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}

private static final class DefaultRunnableDecorator implements Runnable {

private final Runnable r;

DefaultRunnableDecorator(Runnable r) {
this.r = r;
}

@Override
public void run() {
try {
r.run();
} finally {
FastThreadLocal.removeAll();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;

final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;

private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}

@Override
public void run() {
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}

static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,54 @@
package io.netty.util.concurrent;

import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.UnstableApi;

/**
* A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables.
*/
public class FastThreadLocalThread extends Thread {
// This will be set to true if we have a chance to wrap the Runnable.
private final boolean cleanupFastThreadLocals;

private InternalThreadLocalMap threadLocalMap;

public FastThreadLocalThread() { }
public FastThreadLocalThread() {
cleanupFastThreadLocals = false;
}

public FastThreadLocalThread(Runnable target) {
super(target);
super(FastThreadLocalRunnable.wrap(target));
cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(ThreadGroup group, Runnable target) {
super(group, target);
super(group, FastThreadLocalRunnable.wrap(target));
cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(String name) {
super(name);
cleanupFastThreadLocals = false;
}

public FastThreadLocalThread(ThreadGroup group, String name) {
super(group, name);
cleanupFastThreadLocals = false;
}

public FastThreadLocalThread(Runnable target, String name) {
super(target, name);
super(FastThreadLocalRunnable.wrap(target), name);
cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
super(group, FastThreadLocalRunnable.wrap(target), name);
cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
super(group, target, name, stackSize);
super(group, FastThreadLocalRunnable.wrap(target), name, stackSize);
cleanupFastThreadLocals = true;
}

/**
Expand All @@ -69,4 +81,12 @@ public final InternalThreadLocalMap threadLocalMap() {
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}

/**
* Returns {@code true} if {@link FastThreadLocal#removeAll()} will be called once {@link #run()} completes.
*/
@UnstableApi
public boolean willCleanupFastThreadLocals() {
return cleanupFastThreadLocals;
}
}

0 comments on commit 09a05b6

Please sign in to comment.