Skip to content

Commit

Permalink
Issue 12668: Protocol Handlers and Proxy Extensions: ability to use a…
Browse files Browse the repository at this point in the history
… dedicated EventLoopGroup for IO (apache#12692)
  • Loading branch information
eolivelli authored Nov 19, 2021
1 parent ad70f71 commit efe50cc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String protocolHandlerDirectory = "./protocols";

@FieldContext(
category = CATEGORY_PROTOCOLS,
doc = "Use a separate ThreadPool for each Protocol Handler"
)
private boolean useSeparateThreadPoolForProtocolHandlers = true;

@FieldContext(
category = CATEGORY_PROTOCOLS,
doc = "List of messaging protocols to load, which is a list of protocol names"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ public class BrokerService implements Closeable {

private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
private final ServerBootstrap defaultServerBootstrap;
private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>();

@Getter
private final BundlesQuotas bundlesQuotas;
Expand Down Expand Up @@ -385,6 +386,16 @@ private void startProtocolHandler(String protocol,
SocketAddress address,
ChannelInitializer<SocketChannel> initializer) throws IOException {
ServerBootstrap bootstrap = defaultServerBootstrap.clone();
ServiceConfiguration configuration = pulsar.getConfiguration();
boolean useSeparateThreadPool = configuration.isUseSeparateThreadPoolForProtocolHandlers();
if (useSeparateThreadPool) {
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ph-" + protocol);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
protocolHandlersWorkerGroups.add(dedicatedWorkerGroup);
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
}
bootstrap.childHandler(initializer);
try {
bootstrap.bind(address).sync();
Expand Down Expand Up @@ -698,9 +709,14 @@ public CompletableFuture<Void> closeAsync() {

CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
log.info("Event loops shutting down gracefully...");
List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
shutdownEventLoops.add(shutdownEventLoopGracefully(acceptorGroup));
shutdownEventLoops.add(shutdownEventLoopGracefully(workerGroup));
for (EventLoopGroup group : protocolHandlersWorkerGroups) {
shutdownEventLoops.add(shutdownEventLoopGracefully(group));
}
CompletableFuture<Void> shutdownFuture =
CompletableFuture.allOf(shutdownEventLoopGracefully(acceptorGroup),
shutdownEventLoopGracefully(workerGroup))
CompletableFuture.allOf(shutdownEventLoops.toArray(new CompletableFuture[0]))
.handle((v, t) -> {
if (t != null) {
log.warn("Error shutting down event loops gracefully", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private Set<String> proxyExtensions = new TreeSet<>();

@FieldContext(
category = CATEGORY_PLUGIN,
doc = "Use a separate ThreadPool for each Proxy Extension"
)
private boolean useSeparateThreadPoolForProxyExtensions = true;

/***** --- WebSocket --- ****/
@FieldContext(
category = CATEGORY_WEBSOCKET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -85,6 +87,7 @@ public class ProxyService implements Closeable {

private final EventLoopGroup acceptorGroup;
private final EventLoopGroup workerGroup;
private final List<EventLoopGroup> extensionsWorkerGroups = new ArrayList<>();

private Channel listenChannel;
private Channel listenChannelTls;
Expand Down Expand Up @@ -263,6 +266,15 @@ private void startProxyExtension(String extensionName,
ChannelInitializer<SocketChannel> initializer,
ServerBootstrap serverBootstrap) throws IOException {
ServerBootstrap bootstrap = serverBootstrap.clone();
boolean useSeparateThreadPool = proxyConfig.isUseSeparateThreadPoolForProxyExtensions();
if (useSeparateThreadPool) {
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ext-" + extensionName);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(numThreads, false, defaultThreadFactory);
extensionsWorkerGroups.add(dedicatedWorkerGroup);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
}
bootstrap.childHandler(initializer);
try {
bootstrap.bind(address).sync();
Expand Down Expand Up @@ -314,6 +326,9 @@ public void close() throws IOException {
}
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
for (EventLoopGroup group : extensionsWorkerGroups) {
group.shutdownGracefully();
}
if (timer != null) {
timer.stop();
}
Expand Down

0 comments on commit efe50cc

Please sign in to comment.