Skip to content

Commit

Permalink
Shutdown Executor on MultithreadEventLoopGroup shutdown. Fixes netty#…
Browse files Browse the repository at this point in the history
…2837

Motivation:

Currently the Executor created by (Nio|Epoll)EventLoopGroup is not correctly shutdown.
This might lead to resource shortages, due to resources not being freed asap.

Modifications:

If (Nio|Epoll)EventLoopGroup create their internal Executor via a constructor
provided `ExecutorServiceFactory` object or via
MultithreadEventLoopGroup.newDefaultExecutorService(...) the ExecutorService.shutdown()
method will be called after (Nio|Epoll)EventLoopGroup is shutdown.

ExecutorService.shutdown() will not be called if the Executor object was passed
to the (Nio|Epoll)EventLoopGroup (that is, it was instantiated outside of Netty).

Result:

Correctly release resources on (Nio|Epoll)EventLoopGroup shutdown.
  • Loading branch information
buchgr authored and trustin committed Sep 12, 2014
1 parent 3dba1ef commit b72a05e
Show file tree
Hide file tree
Showing 30 changed files with 164 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public DefaultEventExecutor(Executor executor) {
}

public DefaultEventExecutor(EventExecutorGroup parent) {
this(parent, new DefaultExecutorFactory(DefaultEventExecutor.class).newExecutor(1));
this(parent, new DefaultExecutorServiceFactory(DefaultEventExecutor.class).newExecutorService(1));
}

public DefaultEventExecutor(EventExecutorGroup parent, Executor executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public DefaultEventExecutorGroup(int nEventExecutors, Executor executor) {
* Create a new instance.
*
* @param nEventExecutors the number of {@link DefaultEventExecutor}s that this group will use.
* @param executorFactory the {@link ExecutorFactory} which produces the {@link Executor} responsible for
* executing the work handled by this {@link EventExecutorGroup}.
* @param executorServiceFactory the {@link ExecutorServiceFactory} which produces the {@link Executor}
* responsible for executing the work handled by this {@link EventExecutorGroup}.
*/
public DefaultEventExecutorGroup(int nEventExecutors, ExecutorFactory executorFactory) {
super(nEventExecutors, executorFactory);
public DefaultEventExecutorGroup(int nEventExecutors, ExecutorServiceFactory executorServiceFactory) {
super(nEventExecutors, executorServiceFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

/**
* An implementation of an {@link ExecutorFactory} that creates a new {@link ForkJoinPool} on each
* call to {@link #newExecutor(int)}.
* An implementation of an {@link ExecutorServiceFactory} that creates a new {@link ForkJoinPool} on each
* call to {@link #newExecutorService(int)}.
* <p>
* This {@link ExecutorFactory} powers Netty's nio and epoll eventloops by default. Netty moved from managing its
* This {@link ExecutorServiceFactory} powers Netty's nio and epoll eventloops by default. Netty moved from managing its
* own threads and pinning a thread to each eventloop to an {@link Executor}-based approach. That way advanced
* users of Netty can plug in their own threadpools and gain more control of scheduling the eventloops.
* <p>
Expand All @@ -43,10 +44,10 @@
* The whole discussion can be found on GitHub
* <a href="https://github.com/netty/netty/issues/2250">https://github.com/netty/netty/issues/2250</a>.
*/
public final class DefaultExecutorFactory implements ExecutorFactory {
public final class DefaultExecutorServiceFactory implements ExecutorServiceFactory {

private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultExecutorFactory.class);
InternalLoggerFactory.getInstance(DefaultExecutorServiceFactory.class);

private static final AtomicInteger executorId = new AtomicInteger();
private final String namePrefix;
Expand All @@ -55,19 +56,19 @@ public final class DefaultExecutorFactory implements ExecutorFactory {
* @param clazzNamePrefix the name of the class will be used to prefix the name of each
* {@link ForkJoinWorkerThread} with.
*/
public DefaultExecutorFactory(Class<?> clazzNamePrefix) {
public DefaultExecutorServiceFactory(Class<?> clazzNamePrefix) {
this(toName(clazzNamePrefix));
}

/**
* @param namePrefix the string to prefix the name of each {@link ForkJoinWorkerThread} with.
*/
public DefaultExecutorFactory(String namePrefix) {
public DefaultExecutorServiceFactory(String namePrefix) {
this.namePrefix = namePrefix;
}

@Override
public Executor newExecutor(int parallelism) {
public ExecutorService newExecutorService(int parallelism) {
ForkJoinWorkerThreadFactory threadFactory =
new DefaultForkJoinWorkerThreadFactory(namePrefix + '-' + executorId.getAndIncrement());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package io.netty.util.concurrent;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* An object that creates new {@link Executor}s on demand. Using executor factories mainly
* simplifies providing custom executor implementations to Netty's event loops.
* An object that creates new {@link ExecutorService} on demand. Using an {@link ExecutorServiceFactory} mainly
* simplifies providing a custom {@link ExecutorService} implementation to Netty's event loops.
*/
public interface ExecutorFactory {
Executor newExecutor(int parallelism);
public interface ExecutorServiceFactory {
ExecutorService newExecutorService(int parallelism);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* table, and it is useful when accessed frequently.
* </p><p>
* To take advantage of this thread-local variable, your thread must implement {@link FastThreadLocalAccess}.
* By default, all threads created by {@link DefaultThreadFactory} and {@link DefaultExecutorFactory} implement
* By default, all threads created by {@link DefaultThreadFactory} and {@link DefaultExecutorServiceFactory} implement
* {@link FastThreadLocalAccess}.
* </p><p>
* Note that the fast path is only possible on threads that implement {@link FastThreadLocalAccess}, because it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -36,35 +37,49 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
private final EventExecutorChooser chooser;

/**
* @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
* @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorServiceFactory the {@link ExecutorServiceFactory} to use, or {@code null} if the default
* should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call.
*/
protected MultithreadEventExecutorGroup(int nEventExecutors, ExecutorFactory executorFactory, Object... args) {
this(nEventExecutors, executorFactory == null ? null : executorFactory.newExecutor(nEventExecutors), args);
protected MultithreadEventExecutorGroup(int nEventExecutors,
ExecutorServiceFactory executorServiceFactory,
Object... args) {
this(nEventExecutors, executorServiceFactory != null
? executorServiceFactory.newExecutorService(nEventExecutors)
: null,
true, args);
}

/**
* @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, Object... args) {
this(nEventExecutors, executor, false, args);
}

private MultithreadEventExecutorGroup(int nEventExecutors,
Executor executor,
boolean shutdownExecutor,
Object... args) {
if (nEventExecutors <= 0) {
throw new IllegalArgumentException(
String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors));
}

if (executor == null) {
executor = newDefaultExecutor(nEventExecutors);
executor = newDefaultExecutorService(nEventExecutors);
shutdownExecutor = true;
}

children = new EventExecutor[nEventExecutors];
Expand Down Expand Up @@ -104,11 +119,18 @@ protected MultithreadEventExecutorGroup(int nEventExecutors, Executor executor,
}
}

final boolean shutdownExecutor0 = shutdownExecutor;
final Executor executor0 = executor;
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
if (shutdownExecutor0) {
// This cast is correct because shutdownExecutor0 is only try if
// executor0 is of type ExecutorService.
((ExecutorService) executor0).shutdown();
}
}
}
};
Expand All @@ -122,8 +144,8 @@ public void operationComplete(Future<Object> future) throws Exception {
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

protected Executor newDefaultExecutor(int nEventExecutors) {
return new DefaultExecutorFactory(getClass()).newExecutor(nEventExecutors);
protected ExecutorService newDefaultExecutorService(int nEventExecutors) {
return new DefaultExecutorServiceFactory(getClass()).newExecutorService(nEventExecutors);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void run() {

private static final class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(null, new DefaultExecutorFactory(TestEventExecutor.class).newExecutor(1), true);
super(null, new DefaultExecutorServiceFactory(TestEventExecutor.class).newExecutorService(1), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ public void run() {
}

/**
* Make sure threads created by the {@link DefaultExecutorFactory} and {@link DefaultThreadFactory}
* Make sure threads created by the {@link DefaultExecutorServiceFactory} and {@link DefaultThreadFactory}
* implement the {@link FastThreadLocalAccess} interface.
*/
@Test
public void testIsFastThreadLocalThread() {
ExecutorFactory executorFactory = new DefaultExecutorFactory(FastThreadLocalTest.class);
ExecutorServiceFactory executorServiceFactory = new DefaultExecutorServiceFactory(FastThreadLocalTest.class);
int parallelism = Runtime.getRuntime().availableProcessors() * 2;

Executor executor = executorFactory.newExecutor(parallelism);
Executor executor = executorServiceFactory.newExecutorService(parallelism);
// submit a "high" number of tasks, to get a good chance to touch every thread.
for (int i = 0; i < parallelism * 100; i++) {
executor.execute(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.DefaultExecutorServiceFactory;
import io.netty.util.concurrent.ExecutorServiceFactory;

/**
* UDT Byte Stream Client
Expand All @@ -42,7 +42,7 @@ public final class ByteEchoClient {

public static void main(String[] args) throws Exception {
// Configure the client.
final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
final ExecutorServiceFactory connectFactory = new DefaultExecutorServiceFactory("connect");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.DefaultExecutorServiceFactory;
import io.netty.util.concurrent.ExecutorServiceFactory;

/**
* UDT Byte Stream Server
Expand All @@ -37,8 +37,8 @@ public final class ByteEchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept");
ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
ExecutorServiceFactory acceptFactory = new DefaultExecutorServiceFactory("accept");
ExecutorServiceFactory connectFactory = new DefaultExecutorServiceFactory("connect");
NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.DefaultExecutorServiceFactory;
import io.netty.util.concurrent.ExecutorServiceFactory;

import java.util.logging.Logger;

Expand All @@ -46,7 +46,7 @@ public final class MsgEchoClient {

public static void main(String[] args) throws Exception {
// Configure the client.
final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
final ExecutorServiceFactory connectFactory = new DefaultExecutorServiceFactory("connect");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.DefaultExecutorServiceFactory;
import io.netty.util.concurrent.ExecutorServiceFactory;

/**
* UDT Message Flow Server
Expand All @@ -37,8 +37,8 @@ public final class MsgEchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
final ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept");
final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
final ExecutorServiceFactory acceptFactory = new DefaultExecutorServiceFactory("accept");
final ExecutorServiceFactory connectFactory = new DefaultExecutorServiceFactory("connect");
final NioEventLoopGroup acceptGroup =
new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup connectGroup =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.DefaultExecutorServiceFactory;
import io.netty.util.concurrent.ExecutorServiceFactory;

import java.net.InetSocketAddress;

Expand All @@ -48,7 +48,7 @@ protected MsgEchoPeerBase(final InetSocketAddress self, final InetSocketAddress

public void run() throws Exception {
// Configure the peer.
final ExecutorFactory connectFactory = new DefaultExecutorFactory("rendezvous");
final ExecutorServiceFactory connectFactory = new DefaultExecutorServiceFactory("rendezvous");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.DefaultExecutorServiceFactory;
import io.netty.util.concurrent.ExecutorServiceFactory;

import java.net.SocketAddress;

Expand All @@ -50,7 +50,7 @@ public ByteEchoPeerBase(int messageSize, SocketAddress myAddress, SocketAddress
}

public void run() throws Exception {
final ExecutorFactory connectFactory = new DefaultExecutorFactory("rendezvous");
final ExecutorServiceFactory connectFactory = new DefaultExecutorServiceFactory("rendezvous");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);

Expand Down
Loading

0 comments on commit b72a05e

Please sign in to comment.