Skip to content

Commit

Permalink
Avoid register multiple cleaner task for same thread's FastThreadLoca…
Browse files Browse the repository at this point in the history
…l index

Motivation:

Currently if user call set/remove/set/remove many times, it will create multiple cleaner task for same index. It may cause OOM due to long live thread will have more and more task in LIVE_SET.

Modification:

Add flag to avoid recreating tasks.

Result:
Only create 1 clean task. But use more space of indexedVariables.
  • Loading branch information
kojilin authored and normanmaurer committed Feb 5, 2018
1 parent 7bbb4ef commit 1e70d30
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 14 deletions.
33 changes: 21 additions & 12 deletions common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ private static void removeFromVariablesToRemove(

private final int index;

private final int cleanerFlagIndex;

public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
cleanerFlagIndex = InternalThreadLocalMap.nextVariableIndex();
}

/**
Expand All @@ -147,19 +150,25 @@ public final V get() {

private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
if (!FastThreadLocalThread.willCleanupFastThreadLocals(current)) {
// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
// and FastThreadLocal.onRemoval(...) will be called.
ObjectCleaner.register(current, new Runnable() {
@Override
public void run() {
remove(threadLocalMap);

// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
}
});
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
return;
}
// removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
// of the thread, and this Object will be discarded if the associated thread is GCed.
threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);

// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
// and FastThreadLocal.onRemoval(...) will be called.
ObjectCleaner.register(current, new Runnable() {
@Override
public void run() {
remove(threadLocalMap);

// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public Void run() {
}
}

public static int getLiveSetCount() {
return LIVE_SET.size();
}

private ObjectCleaner() {
// Only contains a static method.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.netty.util.concurrent;

import io.netty.util.internal.ObjectCleaner;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -24,7 +25,8 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

public class FastThreadLocalTest {
@Before
Expand All @@ -45,7 +47,7 @@ protected void onRemoval(Boolean value) {

// Initialize a thread-local variable.
assertThat(var.get(), is(nullValue()));
assertThat(FastThreadLocal.size(), is(1));
assertThat(FastThreadLocal.size(), is(2));

// And then remove it.
FastThreadLocal.removeAll();
Expand Down Expand Up @@ -76,6 +78,65 @@ public void run() {
}
}

@Test
public void testMultipleSetRemove() throws Exception {
final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>();
final Runnable runnable = new Runnable() {
@Override
public void run() {
threadLocal.set("1");
threadLocal.remove();
threadLocal.set("2");
threadLocal.remove();
}
};

final int sizeWhenStart = ObjectCleaner.getLiveSetCount();
Thread thread = new Thread(runnable);
thread.start();
thread.join();

assertEquals(1, ObjectCleaner.getLiveSetCount() - sizeWhenStart);

Thread thread2 = new Thread(runnable);
thread2.start();
thread2.join();

assertEquals(2, ObjectCleaner.getLiveSetCount() - sizeWhenStart);
}

@Test
public void testMultipleSetRemove_multipleThreadLocal() throws Exception {
final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>();
final FastThreadLocal<String> threadLocal2 = new FastThreadLocal<String>();
final Runnable runnable = new Runnable() {
@Override
public void run() {
threadLocal.set("1");
threadLocal.remove();
threadLocal.set("2");
threadLocal.remove();
threadLocal2.set("1");
threadLocal2.remove();
threadLocal2.set("2");
threadLocal2.remove();
}
};

final int sizeWhenStart = ObjectCleaner.getLiveSetCount();
Thread thread = new Thread(runnable);
thread.start();
thread.join();

assertEquals(2, ObjectCleaner.getLiveSetCount() - sizeWhenStart);

Thread thread2 = new Thread(runnable);
thread2.start();
thread2.join();

assertEquals(4, ObjectCleaner.getLiveSetCount() - sizeWhenStart);
}

@Test(timeout = 4000)
public void testOnRemoveCalledForFastThreadLocalGet() throws Exception {
testOnRemoveCalled(true, true);
Expand Down

0 comments on commit 1e70d30

Please sign in to comment.