Skip to content

Commit

Permalink
[FLINK-23087] Make AddressResolution a top-level class
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jun 24, 2021
1 parent 81df551 commit 2da9760
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -122,11 +123,10 @@ private ClusterClientProvider<String> createClusterClientProvider(String cluster
}

private String getWebMonitorAddress(Configuration configuration) throws Exception {
HighAvailabilityServicesUtils.AddressResolution resolution =
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION;
AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION;
if (configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)
== KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
resolution = HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION;
resolution = AddressResolution.NO_ADDRESS_RESOLUTION;
LOG.warn(
"Please note that Flink client operations(e.g. cancel, list, stop,"
+ " savepoint, etc.) won't work from outside the Kubernetes cluster"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
Expand Down Expand Up @@ -351,9 +352,7 @@ protected String getRPCPortRange(Configuration configuration) {
protected HighAvailabilityServices createHaServices(
Configuration configuration, Executor executor) throws Exception {
return HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
configuration, executor, AddressResolution.NO_ADDRESS_RESOLUTION);
}

protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
Expand Down Expand Up @@ -191,15 +192,14 @@ public static Tuple2<String, Integer> getJobManagerAddress(Configuration configu
* @return Address of WebMonitor.
*/
public static String getWebMonitorAddress(
Configuration configuration, HighAvailabilityServicesUtils.AddressResolution resolution)
throws UnknownHostException {
Configuration configuration, AddressResolution resolution) throws UnknownHostException {
final String address =
checkNotNull(
configuration.getString(RestOptions.ADDRESS),
"%s must be set",
RestOptions.ADDRESS.key());

if (resolution == HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION) {
if (resolution == AddressResolution.TRY_ADDRESS_RESOLUTION) {
// Fail fast if the hostname cannot be resolved
//noinspection ResultOfMethodCallIgnored
InetAddress.getByName(address);
Expand Down Expand Up @@ -299,13 +299,4 @@ private static ClientHighAvailabilityServices createCustomClientHAServices(Confi
e);
}
}

/**
* Enum specifying whether address resolution should be tried or not when creating the {@link
* HighAvailabilityServices}.
*/
public enum AddressResolution {
TRY_ADDRESS_RESOLUTION,
NO_ADDRESS_RESOLUTION
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rpc;

/** Enum specifying whether address resolution should be tried. */
public enum AddressResolution {
TRY_ADDRESS_RESOLUTION,
NO_ADDRESS_RESOLUTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -122,7 +121,7 @@ public static String getRpcUrl(
String hostname,
int port,
String endpointName,
HighAvailabilityServicesUtils.AddressResolution addressResolution,
AddressResolution addressResolution,
Configuration config)
throws UnknownHostException {

Expand Down Expand Up @@ -153,7 +152,7 @@ public static String getRpcUrl(
String hostname,
int port,
String endpointName,
HighAvailabilityServicesUtils.AddressResolution addressResolution,
AddressResolution addressResolution,
AkkaProtocol akkaProtocol)
throws UnknownHostException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
Expand Down Expand Up @@ -141,9 +142,7 @@ public TaskManagerRunner(

highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
configuration, executor, AddressResolution.NO_ADDRESS_RESOLUTION);

JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;

Expand Down Expand Up @@ -63,9 +64,7 @@ public void testCreateCustomHAServices() throws Exception {
// when
actualHaServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
config, executor, AddressResolution.NO_ADDRESS_RESOLUTION);
// then
assertSame(haServices, actualHaServices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exc
"1.1.1.1",
1234,
"foobar",
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
AddressResolution.NO_ADDRESS_RESOLUTION,
config);

try {
Expand All @@ -140,7 +140,7 @@ public void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exc
localHost.getHostName(),
correctInetSocketAddress.getPort(),
JobMaster.JOB_MANAGER_NAME,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
AddressResolution.NO_ADDRESS_RESOLUTION,
config);

faultyLeaderElectionService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -236,9 +237,7 @@ private static Configuration createFlinkConfigWithJobManagerPort(final int port)
private HighAvailabilityServices createHighAvailabilityServices(final Configuration config)
throws Exception {
return HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
config, Executors.directExecutor(), AddressResolution.NO_ADDRESS_RESOLUTION);
}

private static ServerSocket openServerSocket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.util;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -32,7 +32,7 @@
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;

/** Unit tests for respecting {@link HighAvailabilityServicesUtils.AddressResolution}. */
/** Unit tests for respecting {@link AddressResolution}. */
public class AddressResolutionTest extends TestLogger {

private static final String ENDPOINT_NAME = "endpoint";
Expand Down Expand Up @@ -67,7 +67,7 @@ public void testNoAddressResolution() throws UnknownHostException {
NON_EXISTING_HOSTNAME,
PORT,
ENDPOINT_NAME,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
AddressResolution.NO_ADDRESS_RESOLUTION,
new Configuration());
}

Expand All @@ -78,7 +78,7 @@ public void testTryAddressResolution() {
NON_EXISTING_HOSTNAME,
PORT,
ENDPOINT_NAME,
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION,
AddressResolution.TRY_ADDRESS_RESOLUTION,
new Configuration());
fail("This should fail with an UnknownHostException");
} catch (UnknownHostException ignore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.flink.runtime.akka

import java.net.{InetAddress, InetSocketAddress}
import java.util.Collections

import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions}
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
import org.apache.flink.runtime.rpc.AddressResolution
import org.apache.flink.runtime.rpc.akka.AkkaBootstrapTools.FixedThreadPoolExecutorConfiguration
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
Expand Down Expand Up @@ -126,9 +127,7 @@ public void testCancelingOnProcessFailure() throws Throwable {
final ScheduledExecutorService ioExecutor = TestingUtils.defaultExecutor();
final HighAvailabilityServices haServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
ioExecutor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
config, ioExecutor, AddressResolution.NO_ADDRESS_RESOLUTION);

final AtomicReference<Throwable> programException = new AtomicReference<>();

Expand Down

0 comments on commit 2da9760

Please sign in to comment.