Skip to content

Commit

Permalink
Refactor FastThreadLocal to simplify TLV management
Browse files Browse the repository at this point in the history
Motivation:

When Netty runs in a managed environment such as web application server,
Netty needs to provide an explicit way to remove the thread-local
variables it created to prevent class loader leaks.

FastThreadLocal uses different execution paths for storing a
thread-local variable depending on the type of the current thread.
It increases the complexity of thread-local removal.

Modifications:

- Moved FastThreadLocal and FastThreadLocalThread out of the internal
  package so that a user can use it.
- FastThreadLocal now keeps track of all thread local variables it has
  initialized, and calling FastThreadLocal.removeAll() will remove all
  thread-local variables of the caller thread.
- Added FastThreadLocal.size() for diagnostics and tests
- Introduce InternalThreadLocalMap which is a mixture of hard-wired
  thread local variable fields and extensible indexed variables
- FastThreadLocal now uses InternalThreadLocalMap to implement a
  thread-local variable.
- Added ThreadDeathWatcher.unwatch() so that PooledByteBufAllocator
  tells it to stop watching when its thread-local cache has been freed
  by FastThreadLocal.removeAll().
- Added FastThreadLocalTest to ensure that removeAll() works
- Added microbenchmark for FastThreadLocal and JDK ThreadLocal
- Upgraded to JMH 0.9

Result:

- A user can remove all thread-local variables Netty created, as long as
  he or she did not exit from the current thread. (Note that there's no
  way to remove a thread-local variable from outside of the thread.)
- FastThreadLocal exposes more useful operations such as isSet() because
  we always implement a thread local variable via InternalThreadLocalMap
  instead of falling back to JDK ThreadLocal.
- FastThreadLocalBenchmark shows that this change improves the
  performance of FastThreadLocal even more.
  • Loading branch information
trustin committed Jun 19, 2014
1 parent 55b6f71 commit 760bbc7
Show file tree
Hide file tree
Showing 37 changed files with 1,049 additions and 908 deletions.
32 changes: 30 additions & 2 deletions buffer/src/main/java/io/netty/buffer/PoolThreadCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package io.netty.buffer;


import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.ByteBuffer;

/**
Expand All @@ -26,6 +30,9 @@
* 480222803919">Scalable memory allocation using jemalloc</a>.
*/
final class PoolThreadCache {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;

Expand All @@ -44,6 +51,14 @@ final class PoolThreadCache {

private int allocations;

private final Thread thread = Thread.currentThread();
private final Runnable freeTask = new Runnable() {
@Override
public void run() {
free0();
}
};

// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

Expand Down Expand Up @@ -90,6 +105,10 @@ final class PoolThreadCache {
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}

// The thread-local cache will keep a list of pooled buffers which must be returned to
// the pool when the thread is not alive anymore.
ThreadDeathWatcher.watch(thread, freeTask);
}

private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {
Expand Down Expand Up @@ -192,13 +211,22 @@ boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity) {
/**
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
*/
int free() {
return free(tinySubPageDirectCaches) +
void free() {
ThreadDeathWatcher.unwatch(thread, freeTask);
free0();
}

private void free0() {
int numFreed = free(tinySubPageDirectCaches) +
free(smallSubPageDirectCaches) +
free(normalDirectCaches) +
free(tinySubPageHeapCaches) +
free(smallSubPageHeapCaches) +
free(normalHeapCaches);

if (numFreed > 0 && logger.isDebugEnabled()) {
logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName());
}
}

private static int free(MemoryRegionCache<?>[] caches) {
Expand Down
23 changes: 6 additions & 17 deletions buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package io.netty.buffer;

import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
Expand Down Expand Up @@ -277,24 +276,14 @@ protected PoolThreadCache initialValue() {
directArena = null;
}

final PoolThreadCache cache = new PoolThreadCache(
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}

// The thread-local cache will keep a list of pooled buffers which must be returned to
// the pool when the thread is not alive anymore.
final Thread thread = Thread.currentThread();
ThreadDeathWatcher.watch(thread, new Runnable() {
@Override
public void run() {
int numFreed = cache.free();
if (logger.isDebugEnabled()) {
logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName());
}
}
});

return cache;
@Override
protected void onRemoval(PoolThreadCache value) {
value.free();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static String encode(Cookie cookie) {
throw new NullPointerException("cookie");
}

StringBuilder buf = buffer.get();
StringBuilder buf = stringBuilder();
encode(buf, cookie);
return stripTrailingSeparator(buf);
}
Expand All @@ -52,7 +52,7 @@ public static String encode(Cookie... cookies) {
throw new NullPointerException("cookies");
}

StringBuilder buf = buffer.get();
StringBuilder buf = stringBuilder();
for (Cookie c: cookies) {
if (c == null) {
break;
Expand All @@ -68,7 +68,7 @@ public static String encode(Iterable<Cookie> cookies) {
throw new NullPointerException("cookies");
}

StringBuilder buf = buffer.get();
StringBuilder buf = stringBuilder();
for (Cookie c: cookies) {
if (c == null) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,13 @@
package io.netty.handler.codec.http;


import io.netty.util.internal.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;

final class CookieEncoderUtil {

static final ThreadLocal<StringBuilder> buffer = new FastThreadLocal<StringBuilder>() {
@Override
public StringBuilder get() {
StringBuilder buf = super.get();
buf.setLength(0);
return buf;
}

@Override
protected StringBuilder initialValue() {
return new StringBuilder(512);
}
};
static StringBuilder stringBuilder() {
return InternalThreadLocalMap.get().stringBuilder();
}

static String stripTrailingSeparator(StringBuilder buf) {
if (buf.length() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.netty.handler.codec.http;

import io.netty.util.internal.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocal;

import java.text.ParsePosition;
import java.text.SimpleDateFormat;
Expand All @@ -39,7 +39,7 @@ final class HttpHeaderDateFormat extends SimpleDateFormat {
private final SimpleDateFormat format1 = new HttpHeaderDateFormatObsolete1();
private final SimpleDateFormat format2 = new HttpHeaderDateFormatObsolete2();

private static final ThreadLocal<HttpHeaderDateFormat> dateFormatThreadLocal =
private static final FastThreadLocal<HttpHeaderDateFormat> dateFormatThreadLocal =
new FastThreadLocal<HttpHeaderDateFormat>() {
@Override
protected HttpHeaderDateFormat initialValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static String encode(Cookie cookie) {
throw new NullPointerException("cookie");
}

StringBuilder buf = buffer.get();
StringBuilder buf = stringBuilder();

add(buf, cookie.getName(), cookie.getValue());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.netty.handler.codec;

import io.netty.util.internal.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;

import java.text.DateFormat;
Expand Down Expand Up @@ -769,7 +769,7 @@ public void remove() {
static final class HttpHeaderDateFormat {

private static final ParsePosition parsePos = new ParsePosition(0);
private static final ThreadLocal<HttpHeaderDateFormat> dateFormatThreadLocal =
private static final FastThreadLocal<HttpHeaderDateFormat> dateFormatThreadLocal =
new FastThreadLocal<HttpHeaderDateFormat>() {
@Override
protected HttpHeaderDateFormat initialValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.netty.channel.ChannelHandlerContext;

import io.netty.util.internal.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocal;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration;
Expand All @@ -28,7 +28,7 @@
* many small {@link Object}'s and your actual Thread count is not to big
*/
public class ThreadLocalMarshallerProvider implements MarshallerProvider {
private final ThreadLocal<Marshaller> marshallers = new FastThreadLocal<Marshaller>();
private final FastThreadLocal<Marshaller> marshallers = new FastThreadLocal<Marshaller>();

private final MarshallerFactory factory;
private final MarshallingConfiguration config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.netty.channel.ChannelHandlerContext;

import io.netty.util.internal.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocal;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.marshalling.Unmarshaller;
Expand All @@ -28,7 +28,7 @@
* many small {@link Object}'s.
*/
public class ThreadLocalUnmarshallerProvider implements UnmarshallerProvider {
private final ThreadLocal<Unmarshaller> unmarshallers = new FastThreadLocal<Unmarshaller>();
private final FastThreadLocal<Unmarshaller> unmarshallers = new FastThreadLocal<Unmarshaller>();

private final MarshallerFactory factory;
private final MarshallingConfiguration config;
Expand Down
23 changes: 3 additions & 20 deletions common/src/main/java/io/netty/util/CharsetUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
*/
package io.netty.util;

import io.netty.util.internal.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.util.IdentityHashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -62,22 +61,6 @@ public final class CharsetUtil {
*/
public static final Charset US_ASCII = Charset.forName("US-ASCII");

private static final ThreadLocal<Map<Charset, CharsetEncoder>> encoders =
new FastThreadLocal<Map<Charset, CharsetEncoder>>() {
@Override
protected Map<Charset, CharsetEncoder> initialValue() {
return new IdentityHashMap<Charset, CharsetEncoder>();
}
};

private static final ThreadLocal<Map<Charset, CharsetDecoder>> decoders =
new FastThreadLocal<Map<Charset, CharsetDecoder>>() {
@Override
protected Map<Charset, CharsetDecoder> initialValue() {
return new IdentityHashMap<Charset, CharsetDecoder>();
}
};

/**
* Returns a cached thread-local {@link CharsetEncoder} for the specified
* <tt>charset</tt>.
Expand All @@ -87,7 +70,7 @@ public static CharsetEncoder getEncoder(Charset charset) {
throw new NullPointerException("charset");
}

Map<Charset, CharsetEncoder> map = encoders.get();
Map<Charset, CharsetEncoder> map = InternalThreadLocalMap.get().charsetEncoderCache();
CharsetEncoder e = map.get(charset);
if (e != null) {
e.reset();
Expand All @@ -112,7 +95,7 @@ public static CharsetDecoder getDecoder(Charset charset) {
throw new NullPointerException("charset");
}

Map<Charset, CharsetDecoder> map = decoders.get();
Map<Charset, CharsetDecoder> map = InternalThreadLocalMap.get().charsetDecoderCache();
CharsetDecoder d = map.get(charset);
if (d != null) {
d.reset();
Expand Down
7 changes: 3 additions & 4 deletions common/src/main/java/io/netty/util/Recycler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package io.netty.util;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.IdentityHashMap;
import java.util.Map;

import io.netty.util.internal.FastThreadLocal;

/**
* Light-weight object pool based on a thread-local stack.
*
Expand Down Expand Up @@ -56,7 +55,7 @@ public abstract class Recycler<T> {
}

private final int maxCapacity;
private final ThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
Expand All @@ -67,7 +66,7 @@ protected Recycler() {
this(DEFAULT_MAX_CAPACITY);
}

public Recycler(int maxCapacity) {
protected Recycler(int maxCapacity) {
this.maxCapacity = Math.max(0, maxCapacity);
}

Expand Down
Loading

0 comments on commit 760bbc7

Please sign in to comment.