Skip to content

Commit

Permalink
[FLINK-23507] Use IP address of a kubernetes node when constructing n…
Browse files Browse the repository at this point in the history
…ode port connection string for the REST gateway.

This closes apache#16720.
  • Loading branch information
dmvk authored and tillrohrmann committed Aug 13, 2021
1 parent 20b137e commit d9830ca
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ $ kubectl port-forward service/<ServiceName> 8081

- **NodePort**: Exposes the service on each Node’s IP at a static port (the `NodePort`).
`<NodeIP>:<NodePort>` can be used to contact the JobManager service.
`NodeIP` can also be replaced with the Kubernetes ApiServer address.
You can find its address in your kube config file.

- **LoadBalancer**: Exposes the service externally using a cloud provider’s load balancer.
Since the cloud provider and Kubernetes needs some time to prepare the load balancer, you may get a `NodePort` JobManager Web Interface in the client log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ $ kubectl port-forward service/<ServiceName> 8081

- **NodePort**: Exposes the service on each Node’s IP at a static port (the `NodePort`).
`<NodeIP>:<NodePort>` can be used to contact the JobManager service.
`NodeIP` can also be replaced with the Kubernetes ApiServer address.
You can find its address in your kube config file.

- **LoadBalancer**: Exposes the service externally using a cloud provider’s load balancer.
Since the cloud provider and Kubernetes needs some time to prepare the load balancer, you may get a `NodePort` JobManager Web Interface in the client log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@
<td>Map</td>
<td>The user-specified annotations that are set to the rest Service. The value should be in the form of a1:v1,a2:v2</td>
</tr>
<tr>
<td><h5>kubernetes.rest-service.exposed.node-port-address-type</h5></td>
<td style="word-wrap: break-word;">InternalIP</td>
<td><p>Enum</p></td>
<td>The user-specified <a href="https://kubernetes.io/docs/concepts/architecture/nodes/#addresses">address type</a> that is used for filtering node IPs when constructing a <a href="https://kubernetes.io/docs/concepts/services-networking/service/#nodeport">node port</a> connection string. This option is only considered when 'kubernetes.rest-service.exposed.type' is set to 'NodePort'.<br /><br />Possible values:<ul><li>"InternalIP"</li><li>"ExternalIP"</li></ul></td>
</tr>
<tr>
<td><h5>kubernetes.rest-service.exposed.type</h5></td>
<td style="word-wrap: break-word;">LoadBalancer</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ private ClusterClientProvider<String> createClusterClientProvider(String cluster

try {
// Flink client will always use Kubernetes service to contact with jobmanager. So we
// have a pre-configured web
// monitor address. Using StandaloneClientHAServices to create RestClusterClient is
// reasonable.
// have a pre-configured web monitor address. Using StandaloneClientHAServices to
// create RestClusterClient is reasonable.
return new RestClusterClient<>(
configuration,
clusterId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;

/** This class holds configuration constants used by Flink's kubernetes runners. */
@PublicEvolving
Expand All @@ -59,6 +60,25 @@ public class KubernetesConfigOptions {
"The exposed type of the rest service. "
+ "The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.");

public static final ConfigOption<NodePortAddressType>
REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE =
key("kubernetes.rest-service.exposed.node-port-address-type")
.enumType(NodePortAddressType.class)
.defaultValue(NodePortAddressType.InternalIP)
.withDescription(
Description.builder()
.text(
"The user-specified %s that is used for filtering node IPs when constructing a %s connection string. This option is only considered when '%s' is set to '%s'.",
link(
"https://kubernetes.io/docs/concepts/architecture/nodes/#addresses",
"address type"),
link(
"https://kubernetes.io/docs/concepts/services-networking/service/#nodeport",
"node port"),
text(REST_SERVICE_EXPOSED_TYPE.key()),
text(ServiceExposedType.NodePort.name()))
.build());

public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT =
key("kubernetes.jobmanager.service-account")
.stringType()
Expand Down Expand Up @@ -455,6 +475,12 @@ public enum ServiceExposedType {
LoadBalancer
}

/** The flink rest service exposed type. */
public enum NodePortAddressType {
InternalIP,
ExternalIP,
}

/** The container image pull policy. */
public enum ImagePullPolicy {
IfNotPresent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
import io.fabric8.kubernetes.api.model.NodeAddress;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
Expand Down Expand Up @@ -72,26 +73,27 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {

private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);

private final NamespacedKubernetesClient internalClient;
private final String clusterId;
private final String namespace;
private final int maxRetryAttempts;
private final KubernetesConfigOptions.NodePortAddressType nodePortAddressType;

private final NamespacedKubernetesClient internalClient;
private final ExecutorService kubeClientExecutorService;

public Fabric8FlinkKubeClient(
Configuration flinkConfig,
NamespacedKubernetesClient client,
ExecutorService executorService) {
this.internalClient = checkNotNull(client);
this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));

this.namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);

this.maxRetryAttempts =
flinkConfig.getInteger(
KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES);

this.nodePortAddressType =
flinkConfig.get(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE);
this.internalClient = checkNotNull(client);
this.kubeClientExecutorService = checkNotNull(executorService);
}

Expand Down Expand Up @@ -433,8 +435,26 @@ private Optional<Endpoint> getLoadBalancerRestEndpoint(
address = loadBalancer.getIngress().get(0).getHostname();
}
} else {
// Use node port
address = this.internalClient.getMasterUrl().getHost();
// Use node port. Node port is accessible on any node within kubernetes cluster. We'll
// only consider IPs with the configured address type.
address =
internalClient.nodes().list().getItems().stream()
.flatMap(node -> node.getStatus().getAddresses().stream())
.filter(
nodeAddress ->
nodePortAddressType
.name()
.equals(nodeAddress.getType()))
.map(NodeAddress::getAddress)
.filter(ip -> !ip.isEmpty())
.findAny()
.orElse(null);
if (address == null) {
LOG.warn(
"Unable to find any node ip with type [{}]. Please see [{}] config option for more details.",
nodePortAddressType,
KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE.key());
}
}
boolean noAddress = address == null || address.isEmpty();
return noAddress ? Optional.empty() : Optional.of(new Endpoint(address, restPort));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.Preconditions;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.LoadBalancerIngress;
import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.NodeAddressBuilder;
import io.fabric8.kubernetes.api.model.NodeBuilder;
import io.fabric8.kubernetes.api.model.NodeListBuilder;
import io.fabric8.kubernetes.api.model.NodeStatusBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import io.fabric8.kubernetes.api.model.ServicePort;
Expand All @@ -41,6 +47,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

/**
Expand All @@ -52,6 +59,32 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
protected static final int REST_PORT = 9021;
protected static final int NODE_PORT = 31234;

protected void mockExpectedNodesFromServerSide(List<String> addresses) {
final List<Node> nodes = new ArrayList<>();
Collections.shuffle(addresses);
for (String address : addresses) {
final String[] parts = address.split(":");
Preconditions.checkState(
parts.length == 2, "Address should be in format \"<type>:<ip>\".");
nodes.add(
new NodeBuilder()
.withStatus(
new NodeStatusBuilder()
.withAddresses(
new NodeAddressBuilder()
.withType(parts[0])
.withAddress(parts[1])
.build())
.build())
.build());
}
server.expect()
.get()
.withPath("/api/v1/nodes")
.andReturn(200, new NodeListBuilder().withItems(nodes).build())
.always();
}

protected void mockExpectedServiceFromServerSide(Service expectedService) {
final String serviceName = expectedService.getMetadata().getName();
final String path =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -58,12 +60,15 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isIn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -246,12 +251,61 @@ public void testServiceLoadBalancerNullHostAndIP() {
}

@Test
public void testNodePortService() {
public void testNodePortServiceWithInternalIP() {
testNodePortService(KubernetesConfigOptions.NodePortAddressType.InternalIP);
}

@Test
public void testNodePortServiceWithExternalIP() {
testNodePortService(KubernetesConfigOptions.NodePortAddressType.ExternalIP);
}

private void testNodePortService(KubernetesConfigOptions.NodePortAddressType addressType) {
flinkConfig.set(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE, addressType);
final List<String> internalAddresses =
Arrays.asList("InternalIP:10.0.0.1", "InternalIP:10.0.0.2", "InternalIP:10.0.0.3");
final List<String> externalAddresses =
Arrays.asList("ExternalIP:7.7.7.7", "ExternalIP:8.8.8.8", "ExternalIP:9.9.9.9");
final List<String> addresses = new ArrayList<>();
addresses.addAll(internalAddresses);
addresses.addAll(externalAddresses);
mockExpectedServiceFromServerSide(buildExternalServiceWithNodePort());
mockExpectedNodesFromServerSide(addresses);
try (final Fabric8FlinkKubeClient localClient =
new Fabric8FlinkKubeClient(
flinkConfig,
kubeClient,
org.apache.flink.util.concurrent.Executors.newDirectExecutorService())) {
final Optional<Endpoint> resultEndpoint = localClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.isPresent(), is(true));
final List<String> expectedIps;
switch (addressType) {
case InternalIP:
expectedIps =
internalAddresses.stream()
.map(s -> s.split(":")[1])
.collect(Collectors.toList());
break;
case ExternalIP:
expectedIps =
externalAddresses.stream()
.map(s -> s.split(":")[1])
.collect(Collectors.toList());
break;
default:
throw new IllegalArgumentException(
String.format("Unexpected address type %s.", addressType));
}
assertThat(resultEndpoint.get().getAddress(), isIn(expectedIps));
assertThat(resultEndpoint.get().getPort(), is(NODE_PORT));
}
}

final Optional<Endpoint> resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.isPresent(), is(true));
assertThat(resultEndpoint.get().getPort(), is(NODE_PORT));
@Test
public void testNodePortServiceWithNoMatchingIP() {
mockExpectedServiceFromServerSide(buildExternalServiceWithNodePort());
assertFalse(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent());
}

@Test
Expand Down

0 comments on commit d9830ca

Please sign in to comment.