Skip to content

Commit

Permalink
[FLINK-32191][netty] support set keepalive options to NettyClient.
Browse files Browse the repository at this point in the history
  • Loading branch information
huwh authored and KarmaGYZ committed Jun 28, 2023
1 parent e35f33f commit 873a563
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@
<td>Integer</td>
<td>The number of Netty client threads.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.tcp.keepCount</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of keepalive probes TCP should send before Netty client dropping the connection. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.tcp.keepIdleSec</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.tcp.keepIntervalSec</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The time (in seconds) between individual keepalive probes. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.num-arenas</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@
<td>Integer</td>
<td>The number of Netty client threads.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.tcp.keepCount</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of keepalive probes TCP should send before Netty client dropping the connection. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.tcp.keepIdleSec</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.tcp.keepIntervalSec</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The time (in seconds) between individual keepalive probes. Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, refer to https://bugs.openjdk.org/browse/JDK-8194298.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.num-arenas</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,36 @@ public class NettyShuffleEnvironmentOptions {
+ " based on the platform. Note that the \"epoll\" mode can get better performance, less GC and have more advanced features which are"
+ " only available on modern Linux.");

@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> CLIENT_TCP_KEEP_IDLE_SECONDS =
key("taskmanager.network.netty.client.tcp.keepIdleSec")
.intType()
.noDefaultValue()
.withDescription(
"The time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes. "
+ "Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, "
+ "refer to https://bugs.openjdk.org/browse/JDK-8194298.");

@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> CLIENT_TCP_KEEP_INTERVAL_SECONDS =
key("taskmanager.network.netty.client.tcp.keepIntervalSec")
.intType()
.noDefaultValue()
.withDescription(
"The time (in seconds) between individual keepalive probes. "
+ "Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, "
+ "refer to https://bugs.openjdk.org/browse/JDK-8194298.");

@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> CLIENT_TCP_KEEP_COUNT =
key("taskmanager.network.netty.client.tcp.keepCount")
.intType()
.noDefaultValue()
.withDescription(
"The maximum number of keepalive probes TCP should send before Netty client dropping the connection. "
+ "Note: This will not take effect when using netty transport type of nio with an older version of JDK 8, "
+ "refer to https://bugs.openjdk.org/browse/JDK-8194298.");

// ------------------------------------------------------------------------
// Partition Request Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,44 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.annotation.VisibleForTesting;

import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;

import jdk.net.ExtendedSocketOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketOption;

import static org.apache.flink.util.Preconditions.checkState;

class NettyClient {

private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);

@VisibleForTesting static final String NIO_TCP_KEEPIDLE_KEY = "TCP_KEEPIDLE";
@VisibleForTesting static final String NIO_TCP_KEEPINTERVAL_KEY = "TCP_KEEPINTERVAL";
@VisibleForTesting static final String NIO_TCP_KEEPCOUNT_KEY = "TCP_KEEPCOUNT";

private final NettyConfig config;

private NettyProtocol protocol;
Expand Down Expand Up @@ -152,6 +163,27 @@ private void initNioBootstrap() {
new NioEventLoopGroup(
config.getClientNumThreads(), NettyServer.getNamedThreadFactory(name));
bootstrap.group(nioGroup).channel(NioSocketChannel.class);

config.getTcpKeepIdleInSeconds()
.ifPresent(idle -> setNioKeepaliveOptions(NIO_TCP_KEEPIDLE_KEY, idle));
config.getTcpKeepInternalInSeconds()
.ifPresent(interval -> setNioKeepaliveOptions(NIO_TCP_KEEPINTERVAL_KEY, interval));
config.getTcpKeepCount()
.ifPresent(count -> setNioKeepaliveOptions(NIO_TCP_KEEPCOUNT_KEY, count));
}

@SuppressWarnings("unchecked")
private void setNioKeepaliveOptions(String option, int value) {
try {
Field field = ExtendedSocketOptions.class.getField(option);
bootstrap.option(NioChannelOption.of((SocketOption<Integer>) field.get(null)), value);
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error(
"Ignore keepalive option {}, this may be due to using netty transport type of nio and an older version of jdk 8,"
+ " refer to https://bugs.openjdk.org/browse/JDK-8194298",
option,
e);
}
}

private void initEpollBootstrap() {
Expand All @@ -163,6 +195,14 @@ private void initEpollBootstrap() {
new EpollEventLoopGroup(
config.getClientNumThreads(), NettyServer.getNamedThreadFactory(name));
bootstrap.group(epollGroup).channel(EpollSocketChannel.class);

config.getTcpKeepIdleInSeconds()
.ifPresent(idle -> bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, idle));
config.getTcpKeepInternalInSeconds()
.ifPresent(
interval -> bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, interval));
config.getTcpKeepCount()
.ifPresent(count -> bootstrap.option(EpollChannelOption.TCP_KEEPCNT, count));
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;

import java.net.InetAddress;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -142,6 +143,18 @@ public TransportType getTransportType() {
}
}

public Optional<Integer> getTcpKeepIdleInSeconds() {
return config.getOptional(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS);
}

public Optional<Integer> getTcpKeepInternalInSeconds() {
return config.getOptional(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS);
}

public Optional<Integer> getTcpKeepCount() {
return config.getOptional(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT);
}

@Nullable
public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
return getSSLEnabled() ? SSLUtils.createInternalClientSSLEngineFactory(config) : null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.util.NetUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;

import jdk.net.ExtendedSocketOptions;
import org.junit.jupiter.api.Test;

import java.net.InetAddress;
import java.util.Map;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

/** Tests for {@link NettyClient}. */
public class NettyClientTest {
@Test
void testSetKeepaliveOptionWithNioConfigurable() throws Exception {
assumeThat(keepaliveForNioConfigurable()).isTrue();

final Configuration config = new Configuration();
config.set(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE, "nio");
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS, 300);
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS, 10);
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT, 8);

try (NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
final NettyClient client = createNettyClient(config, clientPort);
Map<String, Object> options =
client.getBootstrap().config().options().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().name(), Map.Entry::getValue));
assertThat(options)
.containsEntry(NettyClient.NIO_TCP_KEEPIDLE_KEY, 300)
.containsEntry(NettyClient.NIO_TCP_KEEPINTERVAL_KEY, 10)
.containsEntry(NettyClient.NIO_TCP_KEEPCOUNT_KEY, 8);
}
}

/**
* Test that keepalive options will not take effect when using netty transport type of nio with
* an older version of JDK 8.
*/
@Test
void testSetKeepaliveOptionWithNioNotConfigurable() throws Exception {
assumeThat(keepaliveForNioConfigurable()).isFalse();

final Configuration config = new Configuration();
config.set(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE, "nio");
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS, 300);
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS, 10);
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT, 8);

try (NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
final NettyClient client = createNettyClient(config, clientPort);
Map<String, Object> options =
client.getBootstrap().config().options().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().name(), Map.Entry::getValue));
assertThat(options)
.doesNotContainKeys(
NettyClient.NIO_TCP_KEEPIDLE_KEY,
NettyClient.NIO_TCP_KEEPINTERVAL_KEY,
NettyClient.NIO_TCP_KEEPCOUNT_KEY);
}
}

@Test
void testSetKeepaliveOptionWithEpoll() throws Exception {
assumeThat(Epoll.isAvailable()).isTrue();

final Configuration config = new Configuration();
config.set(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE, "epoll");
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS, 300);
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS, 10);
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT, 8);

try (NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
final NettyClient client = createNettyClient(config, clientPort);
Map<ChannelOption<?>, Object> options = client.getBootstrap().config().options();
assertThat(options)
.containsEntry(EpollChannelOption.TCP_KEEPIDLE, 300)
.containsEntry(EpollChannelOption.TCP_KEEPINTVL, 10)
.containsEntry(EpollChannelOption.TCP_KEEPCNT, 8);
}
}

private static boolean keepaliveForNioConfigurable() {
try {
ExtendedSocketOptions.class.getField(NettyClient.NIO_TCP_KEEPIDLE_KEY);
} catch (NoSuchFieldException e) {
return false;
}
return true;
}

private static NettyClient createNettyClient(Configuration config, NetUtils.Port port)
throws Exception {

final NettyConfig nettyClientConfig =
new NettyConfig(
InetAddress.getLoopbackAddress(),
port.getPort(),
NettyTestUtil.DEFAULT_SEGMENT_SIZE,
1,
config);

final NettyBufferPool bufferPool = new NettyBufferPool(1);
final NettyProtocol protocol = new NettyProtocol(null, null);

return NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
}
}

0 comments on commit 873a563

Please sign in to comment.