Skip to content

Commit

Permalink
[FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices
Browse files Browse the repository at this point in the history
This PR introduces a standalone high availability services implementation which can be used
in a distributed setting with no HA guarantees. Additionally, it introduces a common base
class which is also used by the EmbeddedHaServices. This base class instantiates the
standalone variants of the checkpoint recovery factory, submitted job graphs store, running
jobs registry and blob store.

The StandaloneHaServices are instantiated with a fixed address for the Job- and
ResourceManager. This address and the HighAvailability.DEFAULT_LEADER_ID is returned by
the corresponding LeaderRetrievalServices when being started.

This closes apache#3622.
  • Loading branch information
tillrohrmann committed May 5, 2017
1 parent 43fa507 commit a0bb99c
Show file tree
Hide file tree
Showing 62 changed files with 897 additions and 581 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public InetSocketAddress getJobManagerAddress() {
try {
LeaderConnectionInfo leaderConnectionInfo =
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), timeout);
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true), timeout);

return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
} catch (Exception e) {
Expand Down Expand Up @@ -464,7 +464,7 @@ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoade
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
} catch (Exception e) {
throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
}
Expand Down Expand Up @@ -498,7 +498,7 @@ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException
public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
} catch (Exception e) {
throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
}
Expand Down Expand Up @@ -721,7 +721,7 @@ private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> cl
public ActorGateway getJobManagerGateway() throws Exception {
LOG.debug("Looking up JobManager");
return LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true),
actorSystemLoader.get(),
lookupTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -36,7 +37,7 @@
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;

public class RemoteExecutorHostnameResolutionTest {
public class RemoteExecutorHostnameResolutionTest extends TestLogger {

private static final String nonExistingHostname = "foo.bar.com.invalid";
private static final int port = 14451;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.NetUtils;
Expand All @@ -67,7 +67,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


/**
* Simple and maybe stupid test to check the {@link ClusterClient} class.
*/
Expand Down Expand Up @@ -129,7 +128,9 @@ public void shutDownActorSystem() {
*/
@Test
public void testDetachedMode() throws Exception{
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
jobManagerSystem.actorOf(
Props.create(SuccessReturningActor.class),
JobMaster.JOB_MANAGER_NAME);
ClusterClient out = new StandaloneClusterClient(config);
out.setDetached(true);

Expand Down Expand Up @@ -198,30 +199,28 @@ public void testDetachedMode() throws Exception{
* This test verifies correct job submission messaging logic and plan translation calls.
*/
@Test
public void shouldSubmitToJobClient() {
try {
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
public void shouldSubmitToJobClient() throws IOException, ProgramInvocationException {
jobManagerSystem.actorOf(
Props.create(SuccessReturningActor.class),
JobMaster.JOB_MANAGER_NAME);

ClusterClient out = new StandaloneClusterClient(config);
out.setDetached(true);
JobSubmissionResult result = out.run(program.getPlanWithJars(), 1);
ClusterClient out = new StandaloneClusterClient(config);
out.setDetached(true);
JobSubmissionResult result = out.run(program.getPlanWithJars(), 1);

assertNotNull(result);
assertNotNull(result);

program.deleteExtractedLibraries();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
program.deleteExtractedLibraries();
}

/**
* This test verifies correct that the correct exception is thrown when the job submission fails.
*/
@Test
public void shouldSubmitToJobClientFails() throws IOException {
jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
jobManagerSystem.actorOf(
Props.create(FailureReturningActor.class),
JobMaster.JOB_MANAGER_NAME);

ClusterClient out = new StandaloneClusterClient(config);
out.setDetached(true);
Expand All @@ -245,7 +244,9 @@ public void shouldSubmitToJobClientFails() throws IOException {
@Test
public void tryLocalExecution() {
try {
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
jobManagerSystem.actorOf(
Props.create(SuccessReturningActor.class),
JobMaster.JOB_MANAGER_NAME);

PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
when(packagedProgramMock.isUsingInteractiveMode()).thenReturn(true);
Expand Down Expand Up @@ -276,7 +277,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
@Test
public void testGetExecutionPlan() {
try {
jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
jobManagerSystem.actorOf(
Props.create(FailureReturningActor.class),
JobMaster.JOB_MANAGER_NAME);

PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
assertNotNull(prg.getPreviewPlan());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testUnresolvableHostname1() {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);

LeaderRetrievalUtils.createLeaderRetrievalService(config);
LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
}
catch (Exception e) {
System.err.println("Shouldn't throw an exception!");
Expand All @@ -69,7 +69,7 @@ public void testUnresolvableHostname1() {
* Tests that the StandaloneLeaderRetrievalService does not resolve host names by default.
*/
@Test
public void testUnresolvableHostname2() {
public void testUnresolvableHostname2() throws Exception {

try {
Configuration config = new Configuration();
Expand All @@ -78,16 +78,11 @@ public void testUnresolvableHostname2() {
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);

LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
fail("This should fail with an IllegalConfigurationException");
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}

private static void checkPreconditions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
Expand All @@ -43,12 +46,10 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
Expand Down Expand Up @@ -326,9 +327,14 @@ private ActorRef getJobManager() throws IOException {
throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
}

return JobManager.getJobManagerActorRef(AkkaUtils.getAkkaProtocol(configuration),
NetUtils.unresolvedHostAndPortToNormalizedString(this.jobManagerHost, this.jobManagerPort),
actorSystem, AkkaUtils.getLookupTimeout(configuration));
final String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
jobManagerHost,
jobManagerPort,
JobMaster.JOB_MANAGER_NAME,
AddressResolution.TRY_ADDRESS_RESOLUTION,
configuration);

return AkkaUtils.getActorRef(jobManagerAkkaUrl, actorSystem, AkkaUtils.getLookupTimeout(configuration));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager.exceptions;
package org.apache.flink.util;

/**
* Exception which occures when creating a configuration object fails.
* Exception which occurs when creating a configuration object fails.
*/
public class ConfigurationException extends Exception {
public class ConfigurationException extends FlinkException {
private static final long serialVersionUID = 3971647332059381556L;

public ConfigurationException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
Expand All @@ -65,6 +66,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Option;
import scala.Some;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

Expand Down Expand Up @@ -301,8 +304,8 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
actorSystem,
futureExecutor,
ioExecutor,
new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
scala.Option.<String>empty(),
new Some<>(JobMaster.JOB_MANAGER_NAME),
Option.<String>empty(),
getJobManagerClass(),
getArchivistClass())._1();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

import java.util.regex.Matcher;
Expand Down Expand Up @@ -62,7 +61,7 @@ public static String getRedirectAddress(
final String jobManagerName = localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);

if (!localJobManagerAddress.equals(leaderAddress) &&
!leaderAddress.equals(JobManager.getLocalJobManagerAkkaURL(Option.apply(jobManagerName)))) {
!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
// We are not the leader and need to redirect
Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.highavailability.ZookeeperHaServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,7 +84,7 @@ static BlobStore createBlobStoreFromConfig(Configuration config) throws IOExcept
if (highAvailabilityMode == HighAvailabilityMode.NONE) {
return new VoidBlobStore();
} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
return ZookeeperHaServices.createBlobStore(config);
return ZooKeeperHaServices.createBlobStore(config);
} else {
throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public ClassLoader getClassLoader() throws JobRetrievalException {
private ActorGateway getJobManager() throws JobRetrievalException {
try {
return LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
LeaderRetrievalUtils.createLeaderRetrievalService(configuration, true),
actorSystem,
AkkaUtils.getLookupTimeout(configuration));
} catch (Exception e) {
Expand Down

This file was deleted.

Loading

0 comments on commit a0bb99c

Please sign in to comment.