Skip to content

Commit

Permalink
[FLINK-20944][k8s] Do not resolve the rest endpoint address when the …
Browse files Browse the repository at this point in the history
…service exposed type is ClusterIP

This closes apache#14692.
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Jan 21, 2021
1 parent 425baea commit 65da60b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,30 @@ private ClusterClientProvider<String> createClusterClientProvider(String cluster
return new RestClusterClient<>(
configuration,
clusterId,
new StandaloneClientHAServices(
HighAvailabilityServicesUtils.getWebMonitorAddress(
configuration,
HighAvailabilityServicesUtils.AddressResolution
.TRY_ADDRESS_RESOLUTION)));
new StandaloneClientHAServices(getWebMonitorAddress(configuration)));
} catch (Exception e) {
throw new RuntimeException(
new ClusterRetrieveException("Could not create the RestClusterClient.", e));
}
};
}

private String getWebMonitorAddress(Configuration configuration) throws Exception {
HighAvailabilityServicesUtils.AddressResolution resolution =
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION;
if (configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)
== KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
resolution = HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION;
LOG.warn(
"Please note that Flink client operations(e.g. cancel, list, stop,"
+ " savepoint, etc.) won't work from outside the Kubernetes cluster"
+ " since '{}' has been set to {}.",
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
}
return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
}

@Override
public ClusterClientProvider<String> retrieve(String clusterId) {
final ClusterClientProvider<String> clusterClientProvider =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.utils.Constants;
Expand All @@ -46,7 +48,9 @@

import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_IP_ADDRESS;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/** Tests for the {@link KubernetesClusterDescriptor}. */
Expand Down Expand Up @@ -186,6 +190,27 @@ public void testDeployApplicationClusterWithMultipleJarsSet() {
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig));
}

@Test
public void testDeployApplicationClusterWithClusterIP() throws Exception {
flinkConfig.set(
PipelineOptions.JARS, Collections.singletonList("local:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
flinkConfig.set(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.ClusterIP);

final ClusterClient<String> clusterClient =
descriptor
.deployApplicationCluster(clusterSpecification, appConfig)
.getClusterClient();

final String address = CLUSTER_ID + Constants.FLINK_REST_SERVICE_SUFFIX + "." + NAMESPACE;
final int port = flinkConfig.get(RestOptions.PORT);
assertThat(
clusterClient.getWebInterfaceURL(),
is(String.format("http://%s:%d", address, port)));
}

private ClusterClientProvider<String> deploySessionCluster() throws ClusterDeploymentException {
mockExpectedServiceFromServerSide(loadBalancerSvc);
return descriptor.deploySessionCluster(clusterSpecification);
Expand Down

0 comments on commit 65da60b

Please sign in to comment.