Skip to content

Commit

Permalink
Fix bug in Recycler with racing calls to recycle (netty#11037)
Browse files Browse the repository at this point in the history
Motivation:
It is possible for two separate threads to race on recycling an object.
If this happens, the object might be added to a WeakOrderQueue when it shouldn't be.
The end result of this is that an object could be acquired multiple times, without a recycle in between.
Effectively, it ends up in circulation twice.

Modification:
We fix this by making the update to the lastRecycledId field of the handle, an atomic state transition.
Only the thread that "wins" the race and succeeds in their state transition will be allowed to recycle the object.
The others will bail out on their recycling.
We use weakCompareAndSet because we only need the atomicity guarantee, and the program order within each thread is sufficient.
Also, spurious failures just means we won't recycle that particular object, which is fine.

Result:
Objects no longer risk circulating twice due to a recycle race.

This fixes netty#10986
  • Loading branch information
chrisvest committed Feb 26, 2021
1 parent a1172bf commit f313408
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 5 deletions.
29 changes: 24 additions & 5 deletions common/src/main/java/io/netty/util/Recycler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
import static java.lang.Math.max;
Expand Down Expand Up @@ -206,8 +207,16 @@ final int threadLocalSize() {

public interface Handle<T> extends ObjectPool.Handle<T> { }

@SuppressWarnings("unchecked")
private static final class DefaultHandle<T> implements Handle<T> {
int lastRecycledId;
private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> LAST_RECYCLED_ID_UPDATER;
static {
AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(
DefaultHandle.class, "lastRecycledId");
LAST_RECYCLED_ID_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
}

volatile int lastRecycledId;
int recycleId;

boolean hasBeenRecycled;
Expand All @@ -232,6 +241,12 @@ public void recycle(Object object) {

stack.push(this);
}

public boolean compareAndSetLastRecycledId(int expectLastRecycledId, int updateLastRecycledId) {
// Use "weak…" because we do not need synchronize-with ordering, only atomicity.
// Also, spurious failures are fine, since no code should rely on recycling for correctness.
return LAST_RECYCLED_ID_UPDATER.weakCompareAndSet(this, expectLastRecycledId, updateLastRecycledId);
}
}

private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
Expand Down Expand Up @@ -368,11 +383,15 @@ void setNext(WeakOrderQueue next) {

void reclaimAllSpaceAndUnlink() {
head.reclaimAllSpaceAndUnlink();
this.next = null;
next = null;
}

void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;
if (!handle.compareAndSetLastRecycledId(0, id)) {
// Separate threads could be racing to add the handle to each their own WeakOrderQueue.
// We only add the handle to the queue if we win the race and observe that lastRecycledId is zero.
return;
}

// While we also enforce the recycling ratio when we transfer objects from the WeakOrderQueue to the Stack
// we better should enforce it as well early. Missing to do so may let the WeakOrderQueue grow very fast
Expand Down Expand Up @@ -646,10 +665,10 @@ void push(DefaultHandle<?> item) {
}

private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
if (item.recycleId != 0 || !item.compareAndSetLastRecycledId(0, OWN_THREAD_ID)) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
item.recycleId = OWN_THREAD_ID;

int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
Expand Down
108 changes: 108 additions & 0 deletions common/src/test/java/io/netty/util/RecyclerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.junit.Test;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -103,12 +104,119 @@ public void testMultipleRecycleAtDifferentThread() throws InterruptedException {
});
thread2.start();
thread2.join();
HandledObject a = recycler.get();
HandledObject b = recycler.get();
assertNotSame(a, b);
IllegalStateException exception = exceptionStore.get();
if (exception != null) {
throw exception;
}
}

@Test
public void testMultipleRecycleAtDifferentThreadRacing() throws InterruptedException {
Recycler<HandledObject> recycler = newRecycler(1024);
final HandledObject object = recycler.get();
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<IllegalStateException>();

final CountDownLatch countDownLatch = new CountDownLatch(2);
final Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
object.recycle();
} catch (IllegalStateException e) {
Exception x = exceptionStore.getAndSet(e);
if (x != null) {
e.addSuppressed(x);
}
} finally {
countDownLatch.countDown();
}
}
});
thread1.start();

final Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
object.recycle();
} catch (IllegalStateException e) {
Exception x = exceptionStore.getAndSet(e);
if (x != null) {
e.addSuppressed(x);
}
} finally {
countDownLatch.countDown();
}
}
});
thread2.start();

try {
countDownLatch.await();
HandledObject a = recycler.get();
HandledObject b = recycler.get();
assertNotSame(a, b);
IllegalStateException exception = exceptionStore.get();
if (exception != null) {
assertEquals("recycled already", exception.getMessage());
assertEquals(0, exception.getSuppressed().length);
}
} finally {
thread1.join(1000);
thread2.join(1000);
}
}

@Test
public void testMultipleRecycleRacing() throws InterruptedException {
Recycler<HandledObject> recycler = newRecycler(1024);
final HandledObject object = recycler.get();
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<IllegalStateException>();

final CountDownLatch countDownLatch = new CountDownLatch(1);
final Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
object.recycle();
} catch (IllegalStateException e) {
Exception x = exceptionStore.getAndSet(e);
if (x != null) {
e.addSuppressed(x);
}
} finally {
countDownLatch.countDown();
}
}
});
thread1.start();

try {
object.recycle();
} catch (IllegalStateException e) {
Exception x = exceptionStore.getAndSet(e);
if (x != null) {
e.addSuppressed(x);
}
}

try {
countDownLatch.await();
HandledObject a = recycler.get();
HandledObject b = recycler.get();
assertNotSame(a, b);
IllegalStateException exception = exceptionStore.get();
if (exception != null) {
throw exception;
}
} finally {
thread1.join(1000);
}
}

@Test
public void testRecycle() {
Recycler<HandledObject> recycler = newRecycler(1024);
Expand Down

0 comments on commit f313408

Please sign in to comment.