Skip to content

Commit

Permalink
[FLINK-32811][runtime] Add port range support for "taskmanager.data.b…
Browse files Browse the repository at this point in the history
…ind-port"
  • Loading branch information
ferenc-csaky authored Aug 29, 2023
1 parent c23a300 commit de01f02
Show file tree
Hide file tree
Showing 16 changed files with 354 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
<td>Long</td>
<td>Time we wait for the timers in milliseconds to finish all pending timer threads when the stream task is cancelled.</td>
</tr>
<tr>
<td><h5>taskmanager.data.bind-port</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The task manager's bind port used for data exchange operations. Also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. If not configured, 'taskmanager.data.port' will be used.</td>
</tr>
<tr>
<td><h5>taskmanager.data.port</h5></td>
<td style="word-wrap: break-word;">0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>Integer</td>
<td>The port that the client connects to. If rest.bind-port has not been specified, then the REST server will bind to this port. Attention: This option is respected only if the high-availability configuration is NONE.</td>
</tr>
<tr>
<td><h5>taskmanager.data.bind-port</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The task manager's bind port used for data exchange operations. Also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. If not configured, 'taskmanager.data.port' will be used.</td>
</tr>
<tr>
<td><h5>taskmanager.data.port</h5></td>
<td style="word-wrap: break-word;">0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<tr>
<td><h5>taskmanager.data.bind-port</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The task manager's bind port used for data exchange operations. If not configured, 'taskmanager.data.port' will be used.</td>
<td>String</td>
<td>The task manager's bind port used for data exchange operations. Also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. If not configured, 'taskmanager.data.port' will be used.</td>
</tr>
<tr>
<td><h5>taskmanager.data.port</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ public class NettyShuffleEnvironmentOptions {
"The task manager’s external port used for data exchange operations.");

/** The local network port that the task manager listen at for data exchange. */
public static final ConfigOption<Integer> DATA_BIND_PORT =
@Documentation.Section({
Documentation.Sections.COMMON_HOST_PORT,
Documentation.Sections.ALL_TASK_MANAGER
})
public static final ConfigOption<String> DATA_BIND_PORT =
key("taskmanager.data.bind-port")
.intType()
.stringType()
.noDefaultValue()
.withDescription(
"The task manager's bind port used for data exchange operations. If not configured, '"
"The task manager's bind port used for data exchange operations."
+ " Also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both."
+ " If not configured, '"
+ DATA_PORT.key()
+ "' will be used.");

Expand Down
18 changes: 13 additions & 5 deletions flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public static Iterator<Integer> getPortRangeFromString(String rangeDefinition)
int dashIdx = range.indexOf('-');
if (dashIdx == -1) {
// only one port in range:
final int port = Integer.valueOf(range);
final int port = Integer.parseInt(range);
if (!isValidHostPort(port)) {
throw new IllegalConfigurationException(
"Invalid port configuration. Port must be between 0"
Expand All @@ -415,22 +415,30 @@ public static Iterator<Integer> getPortRangeFromString(String rangeDefinition)
rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
} else {
// evaluate range
final int start = Integer.valueOf(range.substring(0, dashIdx));
final int start = Integer.parseInt(range.substring(0, dashIdx));
if (!isValidHostPort(start)) {
throw new IllegalConfigurationException(
"Invalid port configuration. Port must be between 0"
+ "and 65535, but was "
+ "and 65535, but range start was "
+ start
+ ".");
}
final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
final int end = Integer.parseInt(range.substring(dashIdx + 1));
if (!isValidHostPort(end)) {
throw new IllegalConfigurationException(
"Invalid port configuration. Port must be between 0"
+ "and 65535, but was "
+ "and 65535, but range end was "
+ end
+ ".");
}
if (start >= end) {
throw new IllegalConfigurationException(
"Invalid port configuration."
+ " Port range end must be bigger than port range start."
+ " If you wish to use single port please provide the value directly, not as a range."
+ " Given range: "
+ range);
}
rangeIterator =
new Iterator<Integer>() {
int i = start;
Expand Down
34 changes: 34 additions & 0 deletions flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.util;

import org.apache.flink.configuration.IllegalConfigurationException;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -258,6 +260,38 @@ public void testFreePortRangeUtility() {
error = t;
}
Assert.assertTrue(error instanceof NumberFormatException);

// invalid port
try {
NetUtils.getPortRangeFromString("70000");
} catch (Throwable t) {
error = t;
}
Assert.assertTrue(error instanceof IllegalConfigurationException);

// invalid start
try {
NetUtils.getPortRangeFromString("70000-70001");
} catch (Throwable t) {
error = t;
}
Assert.assertTrue(error instanceof IllegalConfigurationException);

// invalid end
try {
NetUtils.getPortRangeFromString("0-70000");
} catch (Throwable t) {
error = t;
}
Assert.assertTrue(error instanceof IllegalConfigurationException);

// same range
try {
NetUtils.getPortRangeFromString("5-5");
} catch (Throwable t) {
error = t;
}
Assert.assertTrue(error instanceof IllegalConfigurationException);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ void shutdown() {
private void initNioBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple clients running on the same host.
String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

NioEventLoopGroup nioGroup =
new NioEventLoopGroup(
Expand Down Expand Up @@ -189,7 +190,8 @@ private void setNioKeepaliveOptions(String option, int value) {
private void initEpollBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple clients running on the same host.
String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

EpollEventLoopGroup epollGroup =
new EpollEventLoopGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.runtime.util.PortRange;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -51,7 +51,7 @@ enum TransportType {

private final InetAddress serverAddress;

private final int serverPort;
private final PortRange serverPortRange;

private final int memorySegmentSize;

Expand All @@ -65,11 +65,18 @@ public NettyConfig(
int memorySegmentSize,
int numberOfSlots,
Configuration config) {
this(serverAddress, new PortRange(serverPort), memorySegmentSize, numberOfSlots, config);
}

this.serverAddress = checkNotNull(serverAddress);
public NettyConfig(
InetAddress serverAddress,
PortRange serverPortRange,
int memorySegmentSize,
int numberOfSlots,
Configuration config) {

checkArgument(NetUtils.isValidHostPort(serverPort), "Invalid port number.");
this.serverPort = serverPort;
this.serverAddress = checkNotNull(serverAddress);
this.serverPortRange = serverPortRange;

checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
this.memorySegmentSize = memorySegmentSize;
Expand All @@ -86,8 +93,8 @@ InetAddress getServerAddress() {
return serverAddress;
}

int getServerPort() {
return serverPort;
PortRange getServerPortRange() {
return serverPortRange;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -179,7 +186,7 @@ public String toString() {
String format =
"NettyConfig ["
+ "server address: %s, "
+ "server port: %d, "
+ "server port range: %s, "
+ "ssl enabled: %s, "
+ "memory segment size (bytes): %d, "
+ "transport type: %s, "
Expand All @@ -195,7 +202,7 @@ public String toString() {
return String.format(
format,
serverAddress,
serverPort,
serverPortRange,
getSSLEnabled() ? "true" : "false",
memorySegmentSize,
getTransportType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;

Expand Down Expand Up @@ -109,9 +111,6 @@ int init(
// Configuration
// --------------------------------------------------------------------

// Server bind address
bootstrap.localAddress(config.getServerAddress(), config.getServerPort());

// Pooled allocators for Netty's ByteBuf instances
bootstrap.option(ChannelOption.ALLOCATOR, nettyBufferPool);
bootstrap.childOption(ChannelOption.ALLOCATOR, nettyBufferPool);
Expand Down Expand Up @@ -145,7 +144,35 @@ int init(
// Start Server
// --------------------------------------------------------------------

bindFuture = bootstrap.bind().syncUninterruptibly();
LOG.debug(
"Trying to initialize Netty server on address: {} and port range {}",
config.getServerAddress(),
config.getServerPortRange());

Iterator<Integer> portsIterator = config.getServerPortRange().getPortsIterator();
while (portsIterator.hasNext() && bindFuture == null) {
Integer port = portsIterator.next();
LOG.debug("Trying to bind Netty server to port: {}", port);

bootstrap.localAddress(config.getServerAddress(), port);
try {
bindFuture = bootstrap.bind().syncUninterruptibly();
} catch (Exception e) {
LOG.debug("Failed to bind Netty server", e);
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException)) {
throw e;
}
}
}

if (bindFuture == null) {
throw new BindException(
"Could not start rest endpoint on any port in port range "
+ config.getServerPortRange());
}

localAddress = (InetSocketAddress) bindFuture.channel().localAddress();

Expand All @@ -166,6 +193,10 @@ ServerBootstrap getBootstrap() {
return bootstrap;
}

Integer getListeningPort() {
return localAddress == null ? null : localAddress.getPort();
}

void shutdown() {
final long start = System.nanoTime();
if (bindFuture != null) {
Expand All @@ -186,7 +217,8 @@ void shutdown() {
private void initNioBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple servers running on the same host.
String name = NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

NioEventLoopGroup nioGroup =
new NioEventLoopGroup(config.getServerNumThreads(), getNamedThreadFactory(name));
Expand All @@ -196,7 +228,8 @@ private void initNioBootstrap() {
private void initEpollBootstrap() {
// Add the server port number to the name in order to distinguish
// multiple servers running on the same host.
String name = NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
String name =
NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPortRange() + ")";

EpollEventLoopGroup epollGroup =
new EpollEventLoopGroup(config.getServerNumThreads(), getNamedThreadFactory(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ public static TaskManagerServices fromConfiguration(
ioExecutor);
final int listeningDataPort = shuffleEnvironment.start();

LOG.info(
"TaskManager data connection initialized successfully; listening internally on port: {}",
listeningDataPort);

final KvStateService kvStateService =
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();
Expand Down
Loading

0 comments on commit de01f02

Please sign in to comment.