Skip to content

Commit

Permalink
[FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to l…
Browse files Browse the repository at this point in the history
…ocalhost by default
  • Loading branch information
autophagy authored Feb 16, 2022
1 parent 010891a commit 9bd61e1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 1 deletion.
15 changes: 15 additions & 0 deletions flink-dist/src/main/resources/flink-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,28 @@ jobmanager.rpc.address: localhost

jobmanager.rpc.port: 6123

# The host interface the JobManager will bind to. My default, this is localhost, and will prevent
# the JobManager from communicating outside the machine/container it is running on.
#
# To enable this, set the bind-host address to one that has access to an outside facing network
# interface, such as 0.0.0.0.

jobmanager.bind-host: localhost


# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.

jobmanager.memory.process.size: 1600m

# The host interface the TaskManager will bind to. My default, this is localhost, and will prevent
# the TaskManager from communicating outside the machine/container it is running on.
#
# To enable this, set the bind-host address to one that has access to an outside facing network
# interface, such as 0.0.0.0.

taskmanager.bind-host: localhost

# The total process memory size for the TaskManager.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ public FlinkContainers build() {
CHECKPOINT_PATH.toAbsolutePath().toUri().toString());
this.conf.set(RestOptions.BIND_ADDRESS, "0.0.0.0");

this.conf.set(JobManagerOptions.BIND_HOST, "0.0.0.0");
this.conf.set(TaskManagerOptions.BIND_HOST, "0.0.0.0");

// Create temporary directory for building Flink image
final Path imageBuildingTempDir;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
Expand Down Expand Up @@ -158,6 +160,8 @@ private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfi
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS);
clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST);
clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
return clusterSideConfig.toMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class ConnectionUtils {
* state failed to determine the address.
*/
private enum AddressDetectionState {
LOOPBACK(100),
/** Connect from interface returned by InetAddress.getLocalHost(). * */
LOCAL_HOST(200),
/** Detect own IP address based on the target IP address. Look for common prefix */
Expand Down Expand Up @@ -115,6 +116,7 @@ public static InetAddress findConnectingAddress(
final List<AddressDetectionState> strategies =
Collections.unmodifiableList(
Arrays.asList(
AddressDetectionState.LOOPBACK,
AddressDetectionState.LOCAL_HOST,
AddressDetectionState.ADDRESS,
AddressDetectionState.FAST_CONNECT,
Expand Down Expand Up @@ -225,6 +227,18 @@ private static InetAddress tryLocalHostBeforeReturning(
private static InetAddress findAddressUsingStrategy(
AddressDetectionState strategy, InetSocketAddress targetAddress, boolean logging)
throws IOException {
if (strategy == AddressDetectionState.LOOPBACK) {
InetAddress loopback = InetAddress.getLoopbackAddress();

if (tryToConnect(loopback, targetAddress, strategy.getTimeout(), logging)) {
LOG.debug(
"Using InetAddress.getLoopbackAddress() immediately for connecting address");
return loopback;
} else {
return null;
}
}

// try LOCAL_HOST strategy independent of the network interfaces
if (strategy == AddressDetectionState.LOCAL_HOST) {
InetAddress localhostName;
Expand Down Expand Up @@ -432,7 +446,7 @@ public InetAddress findConnectingAddress(Duration timeout, Duration startLogging
}

if (targetAddress != null) {
AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
AddressDetectionState strategy = AddressDetectionState.LOOPBACK;

boolean logging = elapsedTimeMillis >= startLoggingAfter.toMillis();
if (logging) {
Expand All @@ -449,6 +463,9 @@ public InetAddress findConnectingAddress(Duration timeout, Duration startLogging

// pick the next strategy
switch (strategy) {
case LOOPBACK:
strategy = AddressDetectionState.LOCAL_HOST;
break;
case LOCAL_HOST:
strategy = AddressDetectionState.ADDRESS;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -64,6 +65,8 @@ public static Configuration loadConfiguration(
ApplicationConstants.Environment.NM_HOST.key());

configuration.setString(JobManagerOptions.ADDRESS, hostname);
configuration.removeConfig(JobManagerOptions.BIND_HOST);
configuration.removeConfig(TaskManagerOptions.BIND_HOST);
configuration.setString(RestOptions.ADDRESS, hostname);
configuration.setString(RestOptions.BIND_ADDRESS, hostname);

Expand Down

0 comments on commit 9bd61e1

Please sign in to comment.