Skip to content

Commit

Permalink
Make releasing objects back to Recycler faster (netty#13174)
Browse files Browse the repository at this point in the history
Motivation:
The Recycler implementation was changed in netty#11858 to rely on an MPSC queue implementation for delivering released objects back to their originating thread local pool.
Typically, the release will often happen from the same thread that claimed the object, so the overhead of having a thread-safe release goes to waste.

Modification:
We add an unsynchronized ArrayDeque for batching claims out of the `pooledHandles`.
This amortises `claim` calls.

We then also re-introduce the concept of an owner thread (but by default only if said thread is a FastThreadLocalThread), and release directly into the claim `batch` if the release is from the owner thread.

Result:
The `RecyclerBenchmark.recyclerGetAndRecycle` benchmark sees a 27.4% improvement, and the `RecyclerBenchmark.producerConsumer` benchmark sees a 22.5% improvement.

Fixes netty#13153

Co-authored-by: Norman Maurer <[email protected]>
  • Loading branch information
chrisvest and normanmaurer authored Feb 2, 2023
1 parent 5ee9da5 commit 8a8337e
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 25 deletions.
52 changes: 44 additions & 8 deletions common/src/main/java/io/netty/util/Recycler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package io.netty.util;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.jctools.queues.MessagePassingQueue;
import org.jetbrains.annotations.VisibleForTesting;

import java.util.ArrayDeque;
import java.util.Queue;
Expand Down Expand Up @@ -54,6 +56,7 @@ public String toString() {
private static final int RATIO;
private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
private static final boolean BLOCKING_POOL;
private static final boolean BATCH_FAST_TL_ONLY;

static {
// In the future, we might have different maxCapacity for different object types.
Expand All @@ -74,18 +77,21 @@ public String toString() {
RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));

BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);

if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
logger.debug("-Dio.netty.recycler.ratio: disabled");
logger.debug("-Dio.netty.recycler.chunkSize: disabled");
logger.debug("-Dio.netty.recycler.blocking: disabled");
logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
}
}
}
Expand All @@ -104,6 +110,7 @@ protected void onRemoval(LocalPool<T> value) throws Exception {
super.onRemoval(value);
MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
value.pooledHandles = null;
value.owner = null;
handles.clear();
}
};
Expand Down Expand Up @@ -195,9 +202,10 @@ public final boolean recycle(T o, Handle<T> handle) {
return true;
}

@VisibleForTesting
final int threadLocalSize() {
LocalPool<T> localPool = threadLocal.getIfExists();
return localPool == null ? 0 : localPool.pooledHandles.size();
return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size();
}

/**
Expand Down Expand Up @@ -255,14 +263,21 @@ void toAvailable() {
}
}

private static final class LocalPool<T> {
private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
private final int ratioInterval;
private final int chunkSize;
private final ArrayDeque<DefaultHandle<T>> batch;
private volatile Thread owner;
private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
private int ratioCounter;

@SuppressWarnings("unchecked")
LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
this.ratioInterval = ratioInterval;
this.chunkSize = chunkSize;
batch = new ArrayDeque<DefaultHandle<T>>(chunkSize);
Thread currentThread = Thread.currentThread();
owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null;
if (BLOCKING_POOL) {
pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
} else {
Expand All @@ -276,7 +291,10 @@ DefaultHandle<T> claim() {
if (handles == null) {
return null;
}
DefaultHandle<T> handle = handles.relaxedPoll();
if (batch.isEmpty()) {
handles.drain(this, chunkSize);
}
DefaultHandle<T> handle = batch.pollFirst();
if (null != handle) {
handle.toClaimed();
}
Expand All @@ -285,9 +303,17 @@ DefaultHandle<T> claim() {

void release(DefaultHandle<T> handle) {
handle.toAvailable();
MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
if (handles != null) {
handles.relaxedOffer(handle);
Thread owner = this.owner;
if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
accept(handle);
} else if (owner != null && owner.getState() == Thread.State.TERMINATED) {
this.owner = null;
pooledHandles = null;
} else {
MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
if (handles != null) {
handles.relaxedOffer(handle);
}
}
}

Expand All @@ -298,13 +324,18 @@ DefaultHandle<T> newHandle() {
}
return null;
}

@Override
public void accept(DefaultHandle<T> e) {
batch.addLast(e);
}
}

/**
* This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from
* {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose.
* The implementation relies on synchronised monitor locks for thread-safety.
* The {@code drain} and {@code fill} bulk operations are not supported by this implementation.
* The {@code fill} bulk operation is not supported by this implementation.
*/
private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
private final Queue<T> deque;
Expand Down Expand Up @@ -379,7 +410,12 @@ public T relaxedPeek() {

@Override
public int drain(Consumer<T> c, int limit) {
throw new UnsupportedOperationException();
T obj;
int i = 0;
for (; i < limit && (obj = poll()) != null; i++) {
c.accept(obj);
}
return i;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.jupiter.api.Assertions.assertFalse;

@ExtendWith(RunInFastThreadLocalThreadExtension.class)
public class RecyclerFastThreadLocalTest extends RecyclerTest {
@NotNull
@Override
protected Thread newThread(Runnable runnable) {
return new FastThreadLocalThread(runnable);
}

@Override
@Test
@Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception {
final Recycler<HandledObject> recycler = newRecycler(1024);
final AtomicBoolean collected = new AtomicBoolean();
final AtomicReference<HandledObject> reference = new AtomicReference<HandledObject>();
Thread thread = new FastThreadLocalThread(new Runnable() {
@Override
public void run() {
HandledObject object = recycler.get();
// Store a reference to the HandledObject to ensure it is not collected when the run method finish.
reference.set(object);
}
}) {
@Override
protected void finalize() throws Throwable {
super.finalize();
collected.set(true);
}
};
assertFalse(collected.get());
thread.start();
thread.join();

// Null out so it can be collected.
thread = null;

// Loop until the Thread was collected. If we can not collect it the Test will fail due of a timeout.
while (!collected.get()) {
System.gc();
System.runFinalization();
Thread.sleep(50);
}

// Now call recycle after the Thread was collected to ensure this still works...
reference.getAndSet(null).recycle();
}
}
46 changes: 29 additions & 17 deletions common/src/test/java/io/netty/util/RecyclerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.netty.util;

import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.function.Executable;
Expand All @@ -23,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -39,11 +41,11 @@

public class RecyclerTest {

private static Recycler<HandledObject> newRecycler(int maxCapacityPerThread) {
protected static Recycler<HandledObject> newRecycler(int maxCapacityPerThread) {
return newRecycler(maxCapacityPerThread, 8, maxCapacityPerThread >> 1);
}

private static Recycler<HandledObject> newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) {
protected static Recycler<HandledObject> newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) {
return new Recycler<HandledObject>(maxCapacityPerThread, ratio, chunkSize) {
@Override
protected HandledObject newObject(
Expand All @@ -53,6 +55,11 @@ protected HandledObject newObject(
};
}

@NotNull
protected Thread newThread(Runnable runnable) {
return new Thread(runnable);
}

@Test
@Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception {
Expand Down Expand Up @@ -114,7 +121,7 @@ public void testMultipleRecycleAtDifferentThread() throws InterruptedException {
Recycler<HandledObject> recycler = newRecycler(1024);
final HandledObject object = recycler.get();
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<IllegalStateException>();
final Thread thread1 = new Thread(new Runnable() {
final Thread thread1 = newThread(new Runnable() {
@Override
public void run() {
object.recycle();
Expand All @@ -123,7 +130,7 @@ public void run() {
thread1.start();
thread1.join();

final Thread thread2 = new Thread(new Runnable() {
final Thread thread2 = newThread(new Runnable() {
@Override
public void run() {
try {
Expand All @@ -149,7 +156,7 @@ public void testMultipleRecycleAtDifferentThreadRacing() throws InterruptedExcep
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<IllegalStateException>();

final CountDownLatch countDownLatch = new CountDownLatch(2);
final Thread thread1 = new Thread(new Runnable() {
final Thread thread1 = newThread(new Runnable() {
@Override
public void run() {
try {
Expand All @@ -166,7 +173,7 @@ public void run() {
});
thread1.start();

final Thread thread2 = new Thread(new Runnable() {
final Thread thread2 = newThread(new Runnable() {
@Override
public void run() {
try {
Expand Down Expand Up @@ -206,7 +213,7 @@ public void testMultipleRecycleRacing() throws InterruptedException {
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<IllegalStateException>();

final CountDownLatch countDownLatch = new CountDownLatch(1);
final Thread thread1 = new Thread(new Runnable() {
final Thread thread1 = newThread(new Runnable() {
@Override
public void run() {
try {
Expand Down Expand Up @@ -313,13 +320,13 @@ public void testRecycleAtDifferentThread() throws Exception {
final HandledObject o = recycler.get();
final HandledObject o2 = recycler.get();

final Thread thread = new Thread() {
final Thread thread = newThread(new Runnable() {
@Override
public void run() {
o.recycle();
o2.recycle();
}
};
});
thread.start();
thread.join();

Expand All @@ -332,7 +339,12 @@ public void testRecycleAtTwoThreadsMulti() throws Exception {
final Recycler<HandledObject> recycler = newRecycler(256);
final HandledObject o = recycler.get();

ExecutorService single = Executors.newSingleThreadExecutor();
ExecutorService single = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
return RecyclerTest.this.newThread(r);
}
});

final CountDownLatch latch1 = new CountDownLatch(1);
single.execute(new Runnable() {
Expand Down Expand Up @@ -366,7 +378,7 @@ public void run() {

@Test
public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception {
final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY
final int maxCapacity = 4;
final Recycler<HandledObject> recycler = newRecycler(maxCapacity, 4, 4);

// Borrow 2 * maxCapacity objects.
Expand All @@ -382,14 +394,14 @@ public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception {
array[i].recycle();
}

final Thread thread = new Thread() {
final Thread thread = newThread(new Runnable() {
@Override
public void run() {
for (int i = maxCapacity; i < array.length; i ++) {
array[i].recycle();
for (int i1 = maxCapacity; i1 < array.length; i1++) {
array[i1].recycle();
}
}
};
});
thread.start();
thread.join();

Expand Down Expand Up @@ -426,14 +438,14 @@ protected HandledObject newObject(Recycler.Handle<HandledObject> handle) {
instancesCount.set(0);

// Recycle from other thread.
final Thread thread = new Thread() {
final Thread thread = newThread(new Runnable() {
@Override
public void run() {
for (HandledObject object: array) {
object.recycle();
}
}
};
});
thread.start();
thread.join();

Expand Down
Loading

0 comments on commit 8a8337e

Please sign in to comment.