Skip to content

Commit

Permalink
[FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recovered from HA…
Browse files Browse the repository at this point in the history
… store

In HA mode, the Dispatcher should fail if it cannot recover the persisted jobs. The idea
is that another Dispatcher will be brought up and tries it again. This is better than
simply dropping the not recovered jobs.

This closes apache#5746.
  • Loading branch information
tillrohrmann committed Mar 28, 2018
1 parent 6e1d415 commit aa6cf35
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ void recoverJobs() {
try {
jobIds = submittedJobGraphStore.getJobIds();
} catch (Exception e) {
log.error("Could not recover job ids from the submitted job graph store. Aborting recovery.", e);
onFatalError(new FlinkException("Could not recover job ids from the submitted job graph store. Aborting recovery.", e));
return;
}

Expand All @@ -580,7 +580,7 @@ void recoverJobs() {

runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT));
} catch (Exception e) {
log.error("Could not recover the job graph for " + jobId + '.', e);
onFatalError(new FlinkException("Could not recover the job graph for " + jobId + '.', e));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@

package org.apache.flink.runtime.jobmanager;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -177,7 +178,7 @@ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
success = true;
return null;
} catch (Exception e) {
throw new Exception("Could not retrieve the submitted job graph state handle " +
throw new FlinkException("Could not retrieve the submitted job graph state handle " +
"for " + path + "from the submitted job graph store.", e);
}
SubmittedJobGraph jobGraph;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
import org.apache.flink.runtime.checkpoint._
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager}
import org.apache.flink.runtime.clusterframework.messages._
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager}
import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter}
import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
Expand Down Expand Up @@ -501,7 +501,11 @@ class JobManager(
}
}
} catch {
case t: Throwable => log.warn(s"Failed to recover job $jobId.", t)
case t: Throwable => {
log.error(s"Failed to recover job $jobId.", t)
// stop one self in order to be restarted and trying to recover the jobs again
context.stop(self)
}
}
}(context.dispatcher)

Expand All @@ -523,9 +527,12 @@ class JobManager(
}
}
} catch {
case e: Exception =>
log.warn("Failed to recover job ids from submitted job graph store. Aborting " +
"recovery.", e)
case e: Exception => {
log.error("Failed to recover job ids from submitted job graph store. Aborting " +
"recovery.", e)
// stop one self in order to be restarted and trying to recover the jobs again
context.stop(self)
}
}
}(context.dispatcher)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
Expand All @@ -66,6 +67,7 @@
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

import org.junit.After;
Expand All @@ -77,7 +79,6 @@
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.Mockito;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -91,6 +92,7 @@
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -111,9 +113,6 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

/**
* Test for the {@link Dispatcher} component.
Expand All @@ -137,7 +136,7 @@ public class DispatcherTest extends TestLogger {

private TestingFatalErrorHandler fatalErrorHandler;

private SubmittedJobGraphStore submittedJobGraphStore;
private InMemorySubmittedJobGraphStore submittedJobGraphStore;

private TestingLeaderElectionService dispatcherLeaderElectionService;

Expand Down Expand Up @@ -173,7 +172,7 @@ public void setUp() throws Exception {

fatalErrorHandler = new TestingFatalErrorHandler();
final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
submittedJobGraphStore = spy(new InMemorySubmittedJobGraphStore());
submittedJobGraphStore = new InMemorySubmittedJobGraphStore();

dispatcherLeaderElectionService = new TestingLeaderElectionService();
jobMasterLeaderElectionService = new TestingLeaderElectionService();
Expand All @@ -197,7 +196,7 @@ public void setUp() throws Exception {
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
configuration,
haServices,
mock(ResourceManagerGateway.class),
new TestingResourceManagerGateway(),
new BlobServer(configuration, new VoidBlobStore()),
heartbeatServices,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
Expand Down Expand Up @@ -245,6 +244,13 @@ public void testJobSubmission() throws Exception {
*/
@Test
public void testLeaderElection() throws Exception {
CompletableFuture<Void> jobIdsFuture = new CompletableFuture<>();
submittedJobGraphStore.setJobIdsFunction(
(Collection<JobID> jobIds) -> {
jobIdsFuture.complete(null);
return jobIds;
});

UUID expectedLeaderSessionId = UUID.randomUUID();

assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
Expand All @@ -256,7 +262,8 @@ public void testLeaderElection() throws Exception {

assertEquals(expectedLeaderSessionId, actualLeaderSessionId);

verify(submittedJobGraphStore, Mockito.timeout(TIMEOUT.toMilliseconds()).atLeast(1)).getJobIds();
// wait that we asked the SubmittedJobGraphStore for the stored jobs
jobIdsFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -431,6 +438,72 @@ public void testWaitingForJobMasterLeadership() throws ExecutionException, Inter
assertThat(jobStatusFuture.get(), notNullValue());
}

/**
* Tests that the {@link Dispatcher} terminates if it cannot recover jobs ids from
* the {@link SubmittedJobGraphStore}. See FLINK-8943.
*/
@Test
public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
final FlinkException testException = new FlinkException("Test exception");
submittedJobGraphStore.setJobIdsFunction(
(Collection<JobID> jobIds) -> {
throw testException;
});

UUID expectedLeaderSessionId = UUID.randomUUID();

assertNull(dispatcherLeaderElectionService.getConfirmationFuture());

dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);

UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);

assertEquals(expectedLeaderSessionId, actualLeaderSessionId);

// we expect that a fatal error occurred
final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);

assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true));

fatalErrorHandler.clearError();
}

/**
* Tests that the {@link Dispatcher} terminates if it cannot recover jobs from
* the {@link SubmittedJobGraphStore}. See FLINK-8943.
*/
@Test
public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
final FlinkException testException = new FlinkException("Test exception");

final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null);
submittedJobGraphStore.putJobGraph(submittedJobGraph);

submittedJobGraphStore.setRecoverJobGraphFunction(
(JobID jobId, Map<JobID, SubmittedJobGraph> submittedJobs) -> {
throw testException;
});

UUID expectedLeaderSessionId = UUID.randomUUID();

assertNull(dispatcherLeaderElectionService.getConfirmationFuture());

dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);

UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);

assertEquals(expectedLeaderSessionId, actualLeaderSessionId);

// we expect that a fatal error occurred
final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);

assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true));

fatalErrorHandler.clearError();
}

private static class TestingDispatcher extends Dispatcher {

private final CountDownLatch submitJobLatch = new CountDownLatch(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import akka.pattern.Patterns;
import akka.testkit.CallingThreadDispatcher;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -127,7 +128,8 @@
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -160,7 +162,7 @@ public static void teardown() {
@Test
public void testJobRecoveryWhenLosingLeadership() throws Exception {
FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS);
FiniteDuration jobRecoveryTimeout = new FiniteDuration(0, TimeUnit.SECONDS);
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
Configuration flinkConfiguration = new Configuration();
UUID leaderSessionID = UUID.randomUUID();
Expand Down Expand Up @@ -229,7 +231,7 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception {
testingHighAvailabilityServices,
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("taskmanager"),
Option.<String>apply("taskmanager"),
true,
TestingTaskManager.class);

Expand Down Expand Up @@ -341,7 +343,7 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception {
}

/**
* Tests that a failing job recovery won't cause other job recoveries to fail.
* Tests that a job recovery failure terminates the {@link JobManager}.
*/
@Test
public void testFailingJobRecovery() throws Exception {
Expand Down Expand Up @@ -396,15 +398,22 @@ public void testFailingJobRecovery() throws Exception {

jobManager = system.actorOf(jobManagerProps);

final TestProbe testProbe = new TestProbe(system);

testProbe.watch(jobManager);

Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());

Await.ready(started, deadline.timeLeft());

// make the job manager the leader --> this triggers the recovery of all jobs
myLeaderElectionService.isLeader(leaderSessionID);

// check that we have successfully recovered the second job
assertThat(recoveredJobs, containsInAnyOrder(jobId2));
// check that we did not recover any jobs
assertThat(recoveredJobs, is(empty()));

// verify that the JobManager terminated
testProbe.expectTerminated(jobManager, timeout);
} finally {
TestingUtils.stopActor(jobManager);
}
Expand Down Expand Up @@ -447,7 +456,7 @@ public TestingFailingHAJobManager(
checkpointRecoveryFactory,
jobRecoveryTimeout,
jobManagerMetricGroup,
Option.empty());
Option.<String>empty());

this.recoveredJobs = recoveredJobs;
}
Expand Down
Loading

0 comments on commit aa6cf35

Please sign in to comment.