Skip to content

Commit

Permalink
[FLINK-25430][runtime] Integrates JobResultStore initialization along…
Browse files Browse the repository at this point in the history
… JobGraphStore initialization

The goal is to do the recovery of dirty JobResults from
the JobResultStore along the JobGraph recovery. Both
will be pass through the call chain up to the Dispatcher.

The *DispatcherLeaderProcess implementations take care of
this recovery and make sure that any JobGraph that is
recovered but has a matching dirty JobResult is not passed
on into the Dispatcher. There's a dedicated Precondition
checking this in the Dispatcher constructor itself.
  • Loading branch information
XComp committed Jan 27, 2022
1 parent 6c5ae20 commit 31d9caa
Show file tree
Hide file tree
Showing 35 changed files with 968 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayService;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -83,7 +85,9 @@ public ApplicationDispatcherGatewayServiceFactory(
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
Collection<JobResult> recoveredDirtyJobResults,
JobGraphWriter jobGraphWriter,
JobResultStore jobResultStore) {

final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

Expand All @@ -94,6 +98,7 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
rpcService,
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new ApplicationDispatcherBootstrap(
application,
Expand All @@ -103,7 +108,7 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
scheduledExecutor,
errorHandler),
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices, jobGraphWriter));
partialDispatcherServices, jobGraphWriter, jobResultStore));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.testjar.BlockingJob;
import org.apache.flink.client.testjar.ErrorHandlingSubmissionJob;
import org.apache.flink.client.testjar.FailingJob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
Expand All @@ -48,6 +54,7 @@
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -153,6 +160,64 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
}
}

@Test
public void testDirtyJobResultRecoveryInApplicationMode() throws Exception {
final Deadline deadline = Deadline.fromNow(TIMEOUT);
final Configuration configuration = new Configuration();
configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100));
final TestingMiniClusterConfiguration clusterConfiguration =
TestingMiniClusterConfiguration.newBuilder()
.setConfiguration(configuration)
.build();

// having a dirty entry in the JobResultStore should make the ApplicationDispatcherBootstrap
// implementation fail to submit the job
final JobResultStore jobResultStore = new EmbeddedJobResultStore();
jobResultStore.createDirtyResult(
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
final EmbeddedHaServicesWithLeadershipControl haServices =
new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {

@Override
public JobResultStore getJobResultStore() {
return jobResultStore;
}
};

final TestingMiniCluster.Builder clusterBuilder =
TestingMiniCluster.newBuilder(clusterConfiguration)
.setHighAvailabilityServicesSupplier(() -> haServices)
.setDispatcherResourceManagerComponentFactorySupplier(
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
clusterConfiguration.getConfiguration(),
ErrorHandlingSubmissionJob.createPackagedProgram()));
try (final MiniCluster cluster = clusterBuilder.build()) {
// start mini cluster and submit the job
cluster.start();

// the cluster should shut down automatically once the application completes
awaitClusterStopped(cluster, deadline);
}

FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
.as(
"The job's main method shouldn't have been succeeded due to a DuplicateJobSubmissionException.")
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);

assertThat(
jobResultStore.hasDirtyJobResultEntry(
ApplicationDispatcherBootstrap.ZERO_JOB_ID))
.isTrue();
assertThat(
jobResultStore.hasCleanJobResultEntry(
ApplicationDispatcherBootstrap.ZERO_JOB_ID))
.isFalse();
}

@Test
public void testSubmitFailedJobOnApplicationError() throws Exception {
final Deadline deadline = Deadline.fromNow(TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.testjar;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.cli.CliFrontendTestUtils;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.util.FlinkException;

import java.io.File;
import java.io.FileNotFoundException;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@code ErrorHandlingSubmissionJob} provides a factory method for creating a {@link
* PackagedProgram} that monitors the job submission within the job's {@code main} method.
*/
public class ErrorHandlingSubmissionJob {

private static final AtomicReference<Exception> SUBMISSION_EXCEPTION = new AtomicReference<>();

public static PackagedProgram createPackagedProgram() throws FlinkException {
try {
return PackagedProgram.newBuilder()
.setUserClassPaths(
Collections.singletonList(
new File(CliFrontendTestUtils.getTestJarPath())
.toURI()
.toURL()))
.setEntryPointClassName(ErrorHandlingSubmissionJob.class.getName())
.build();
} catch (ProgramInvocationException | FileNotFoundException | MalformedURLException e) {
throw new FlinkException("Could not load the provided entrypoint class.", e);
}
}

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(Arrays.asList(1, 2, 3))
.map(element -> element + 1)
.output(new DiscardingOutputFormat<>());

try {
env.execute();
} catch (Exception e) {
SUBMISSION_EXCEPTION.set(e);
throw e;
}
}

public static Exception getSubmissionException() {
return SUBMISSION_EXCEPTION.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand All @@ -55,6 +56,10 @@ public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty();
}

public static <T> Set<T> ofNullable(@Nullable T obj) {
return obj == null ? Collections.emptySet() : Collections.singleton(obj);
}

public static <T, R> Stream<R> mapWithIndex(
Collection<T> input, final BiFunction<T, Integer, R> mapper) {
final AtomicInteger count = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,38 @@

package org.apache.flink.util;

import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for java collection utilities. */
@ExtendWith(TestLoggerExtension.class)
public class CollectionUtilTest {

@Test
public void testPartition() {
List<Integer> list = Arrays.asList(1, 2, 3, 4);
Collection<List<Integer>> partitioned = CollectionUtil.partition(list, 4);

Assert.assertEquals(
"List partitioned into the an incorrect number of partitions",
4,
partitioned.size());
for (List<Integer> partition : partitioned) {
Assert.assertEquals("Unexpected number of elements in partition", 1, partition.size());
}
assertThat(partitioned)
.as("List partitioned into the an incorrect number of partitions")
.hasSize(4);
assertThat(partitioned).allSatisfy(partition -> assertThat(partition).hasSize(1));
}

@Test
public void testOfNullableWithNull() {
assertThat(CollectionUtil.ofNullable(null)).isEmpty();
}

@Test
public void testFromNullableWithObject() {
final Object element = new Object();
assertThat(CollectionUtil.ofNullable(element)).singleElement().isEqualTo(element);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -157,11 +158,13 @@ public Dispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
Collection<JobResult> recoveredDirtyJobs,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices)
throws Exception {
super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
checkNotNull(dispatcherServices);
assertRecoveredJobsAndDirtyJobResults(recoveredJobs, recoveredDirtyJobs);

this.configuration = dispatcherServices.getConfiguration();
this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices();
Expand All @@ -171,6 +174,7 @@ public Dispatcher(
this.blobServer = dispatcherServices.getBlobServer();
this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
this.jobResultStore = dispatcherServices.getJobResultStore();
this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
this.ioExecutor = dispatcherServices.getIoExecutor();
Expand All @@ -179,8 +183,6 @@ public Dispatcher(
JobManagerSharedServices.fromConfiguration(
configuration, blobServer, fatalErrorHandler);

this.jobResultStore = highAvailabilityServices.getJobResultStore();

runningJobs = new HashMap<>(16);

this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
Expand All @@ -196,6 +198,7 @@ public Dispatcher(
this.dispatcherBootstrapFactory = checkNotNull(dispatcherBootstrapFactory);

this.recoveredJobs = new HashSet<>(recoveredJobs);

this.blobServer.retainJobs(
recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()));

Expand Down Expand Up @@ -246,6 +249,25 @@ private void startDispatcherServices() throws Exception {
}
}

private static void assertRecoveredJobsAndDirtyJobResults(
Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobResults) {
final Set<JobID> jobIdsOfFinishedJobs =
recoveredDirtyJobResults.stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet());

final boolean noRecoveredJobGraphHasDirtyJobResult =
recoveredJobs.stream()
.noneMatch(
recoveredJobGraph ->
jobIdsOfFinishedJobs.contains(
recoveredJobGraph.getJobID()));

Preconditions.checkArgument(
noRecoveredJobGraphHasDirtyJobResult,
"There should be no overlap between the recovered JobGraphs and the passed dirty JobResults based on their job ID.");
}

private void startRecoveredJobs() {
for (JobGraph recoveredJob : recoveredJobs) {
runRecoveredJob(recoveredJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;

import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.RpcService;

import java.util.Collection;
Expand All @@ -31,6 +32,7 @@ Dispatcher createDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
Collection<JobResult> recoveredDirtyJobResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
PartialDispatcherServicesWithJobPersistenceComponents
partialDispatcherServicesWithJobPersistenceComponents)
Expand Down
Loading

0 comments on commit 31d9caa

Please sign in to comment.