Skip to content

Commit

Permalink
[FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based…
Browse files Browse the repository at this point in the history
… HA services

Remove client less factory methods from ZooKeeperUtils

Introduce default job id

This closes apache#3781.
  • Loading branch information
tillrohrmann committed May 5, 2017
1 parent a0bb99c commit ddd6a99
Show file tree
Hide file tree
Showing 135 changed files with 3,474 additions and 2,131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ protected int run(String[] args) {
}
finally {
if (client != null) {
client.shutdown();
try {
client.shutdown();
} catch (Exception e) {
LOG.warn("Could not properly shut down the cluster client.", e);
}
}
if (program != null) {
program.deleteExtractedLibraries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ ClusterType retrieveCluster(
* @param config The Flink config to use
* @param userJarFiles User jar files to include in the classpath of the cluster.
* @return The client to communicate with the cluster which the CustomCommandLine brought up.
* @throws UnsupportedOperationException if the operation is not supported
* @throws Exception if the cluster could not be created
*/
ClusterType createCluster(
String applicationName,
CommandLine commandLine,
Configuration config,
List<URL> userJarFiles) throws UnsupportedOperationException;
List<URL> userJarFiles) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.client.JobRetrievalException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand Down Expand Up @@ -72,7 +75,6 @@
import java.util.List;
import java.util.Map;


/**
* Encapsulates the functionality necessary to submit a program to a remote cluster.
*/
Expand All @@ -95,6 +97,9 @@ public abstract class ClusterClient {
/** Lookup timeout for the job manager retrieval service */
private final FiniteDuration lookupTimeout;

/** Service factory for high available */
protected final HighAvailabilityServices highAvailabilityServices;

/** Flag indicating whether to sysout print execution updates */
private boolean printStatusDuringExecution = true;

Expand All @@ -119,17 +124,34 @@ public abstract class ClusterClient {
*
* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
*
* @throws java.io.IOException Thrown, if the client's actor system could not be started.
* @throws Exception we cannot create the high availability services
*/
public ClusterClient(Configuration flinkConfig) throws IOException {
public ClusterClient(Configuration flinkConfig) throws Exception {
this(flinkConfig,
HighAvailabilityServicesUtils.createHighAvailabilityServices(
flinkConfig,
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
}

/**
* Creates a instance that submits the programs to the JobManager defined in the
* configuration. This method will try to resolve the JobManager hostname and throw an exception
* if that is not possible.
*
* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
* @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
*/
public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);

this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);

this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG);

this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -202,12 +224,16 @@ public ActorSystem get() {
/**
* Shuts down the client. This stops the internal actor system and actors.
*/
public void shutdown() {
public void shutdown() throws Exception {
synchronized (this) {
try {
finalizeCluster();
} finally {
this.actorSystemLoader.shutdown();
actorSystemLoader.shutdown();
}

if (highAvailabilityServices != null) {
highAvailabilityServices.closeAndCleanupAllData();
}
}
}
Expand Down Expand Up @@ -241,7 +267,8 @@ public InetSocketAddress getJobManagerAddress() {
try {
LeaderConnectionInfo leaderConnectionInfo =
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true), timeout);
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout);

return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
} catch (Exception e) {
Expand Down Expand Up @@ -411,17 +438,17 @@ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws

waitForClusterToBeReady();

final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
} catch (Exception e) {
throw new ProgramInvocationException("Could not create the leader retrieval service", e);
}

try {
logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
this.lastJobExecutionResult = JobClient.submitJobAndWait(actorSystemLoader.get(), flinkConfig,
leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
this.lastJobExecutionResult = JobClient.submitJobAndWait(
actorSystemLoader.get(),
flinkConfig,
highAvailabilityServices,
jobGraph,
timeout,
printStatusDuringExecution,
classLoader);

return this.lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
Expand Down Expand Up @@ -462,13 +489,6 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade
* @throws JobExecutionException if an error occurs during monitoring the job execution
*/
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
} catch (Exception e) {
throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
}

ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
Expand All @@ -477,13 +497,13 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException
}

final JobListeningContext listeningContext = JobClient.attachToRunningJob(
jobID,
jobManagerGateway,
flinkConfig,
actorSystemLoader.get(),
leaderRetrievalService,
timeout,
printStatusDuringExecution);
jobID,
jobManagerGateway,
flinkConfig,
actorSystemLoader.get(),
highAvailabilityServices,
timeout,
printStatusDuringExecution);

return JobClient.awaitJobResult(listeningContext);
}
Expand All @@ -496,13 +516,6 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException
* @throws JobExecutionException if an error occurs during monitoring the job execution
*/
public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
} catch (Exception e) {
throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
}

ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
Expand All @@ -515,7 +528,7 @@ public JobListeningContext connectToJob(JobID jobID) throws JobExecutionExceptio
jobManagerGateway,
flinkConfig,
actorSystemLoader.get(),
leaderRetrievalService,
highAvailabilityServices,
timeout,
printStatusDuringExecution);
}
Expand Down Expand Up @@ -721,7 +734,7 @@ private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> cl
public ActorGateway getJobManagerGateway() throws Exception {
LOG.debug("Looking up JobManager");
return LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true),
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
actorSystemLoader.get(),
lookupTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import scala.concurrent.Await;
import scala.concurrent.Future;

import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
Expand All @@ -38,10 +38,14 @@
*/
public class StandaloneClusterClient extends ClusterClient {

public StandaloneClusterClient(Configuration config) throws IOException {
public StandaloneClusterClient(Configuration config) throws Exception {
super(config);
}

public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices) {
super(config, highAvailabilityServices);
}

@Override
public void waitForClusterToBeReady() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
Expand All @@ -33,7 +32,6 @@
import java.net.UnknownHostException;
import java.util.Collections;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;

Expand All @@ -48,26 +46,20 @@ public static void check() {
}

@Test
public void testUnresolvableHostname1() {
public void testUnresolvableHostname1() throws Exception {

RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
try {
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
catch (ProgramInvocationException e) {
catch (UnknownHostException ignored) {
// that is what we want!
assertTrue(e.getCause() instanceof UnknownHostException);
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testUnresolvableHostname2() {
public void testUnresolvableHostname2() throws Exception {

InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
Expand All @@ -76,14 +68,8 @@ public void testUnresolvableHostname2() {
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
catch (ProgramInvocationException e) {
catch (UnknownHostException ignored) {
// that is what we want!
assertTrue(e.getCause() instanceof UnknownHostException);
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.UUID;
Expand Down Expand Up @@ -199,7 +198,7 @@ public void testDetachedMode() throws Exception{
* This test verifies correct job submission messaging logic and plan translation calls.
*/
@Test
public void shouldSubmitToJobClient() throws IOException, ProgramInvocationException {
public void shouldSubmitToJobClient() throws Exception {
jobManagerSystem.actorOf(
Props.create(SuccessReturningActor.class),
JobMaster.JOB_MANAGER_NAME);
Expand All @@ -217,7 +216,7 @@ public void shouldSubmitToJobClient() throws IOException, ProgramInvocationExcep
* This test verifies correct that the correct exception is thrown when the job submission fails.
*/
@Test
public void shouldSubmitToJobClientFails() throws IOException {
public void shouldSubmitToJobClientFails() throws Exception {
jobManagerSystem.actorOf(
Props.create(FailureReturningActor.class),
JobMaster.JOB_MANAGER_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.util.StandaloneUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -32,7 +34,7 @@
import static org.junit.Assume.assumeTrue;

/**
* Tests that verify that the LeaderRetrievalSevice correctly handles non-resolvable host names
* Tests that verify that the LeaderRetrievalService correctly handles non-resolvable host names
* and does not fail with another exception
*/
public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
Expand All @@ -48,21 +50,16 @@ public static void check() {
* Tests that the StandaloneLeaderRetrievalService resolves host names if specified.
*/
@Test
public void testUnresolvableHostname1() {
public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
Configuration config = new Configuration();

try {
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);

LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
}
catch (Exception e) {
System.err.println("Shouldn't throw an exception!");
e.printStackTrace();
fail(e.getMessage());
}
StandaloneUtils.createLeaderRetrievalService(
config,
false,
JobMaster.JOB_MANAGER_NAME);
}

/*
Expand All @@ -77,7 +74,10 @@ public void testUnresolvableHostname2() throws Exception {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);

LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
StandaloneUtils.createLeaderRetrievalService(
config,
true,
JobMaster.JOB_MANAGER_NAME);
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
Expand Down
Loading

0 comments on commit ddd6a99

Please sign in to comment.