Skip to content

Commit

Permalink
[FLINK-14762][client] ClusterClient#submitJob returns CompletableFutu…
Browse files Browse the repository at this point in the history
…re of JobID
  • Loading branch information
tisonkun committed Nov 29, 2019
1 parent c927e17 commit 1266c88
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public static JobExecutionResult submitJob(
try {
return client
.submitJob(jobGraph)
.thenApply(JobSubmissionResult::getJobID)
.thenApply(DetachedJobExecutionResult::new)
.get();
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -130,7 +129,6 @@ public static JobExecutionResult submitJobAndWaitForResult(
try {
jobResult = client
.submitJob(jobGraph)
.thenApply(JobSubmissionResult::getJobID)
.thenCompose(client::requestJobResult)
.get();
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.client.deployment;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -60,7 +59,6 @@ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @N
final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID);
return clusterClient
.submitJob(jobGraph)
.thenApply(JobSubmissionResult::getJobID)
.thenApply(jobID -> new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobID) {
@Override
protected void doClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.client.program;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -29,7 +28,6 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collection;
Expand Down Expand Up @@ -92,9 +90,9 @@ default void close() {
* Submit the given {@link JobGraph} to the cluster.
*
* @param jobGraph to submit
* @return Future which is completed with the {@link JobSubmissionResult}
* @return {@link JobID} of the submitted job
*/
CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph);
CompletableFuture<JobID> submitJob(JobGraph jobGraph);

/**
* Requests the {@link JobStatus} of the job with the given {@link JobID}.
Expand All @@ -107,7 +105,7 @@ default void close() {
* @param jobId for which to request the {@link JobResult}
* @return Future which is completed with the {@link JobResult}
*/
CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId);
CompletableFuture<JobResult> requestJobResult(JobID jobId);

/**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public Configuration getFlinkConfiguration() {
}

@Override
public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
return miniCluster.submitJob(jobGraph);
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
return miniCluster.submitJob(jobGraph).thenApply(JobSubmissionResult::getJobID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
Expand Down Expand Up @@ -263,14 +262,8 @@ public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
});
}

/**
* Submits the given {@link JobGraph} to the dispatcher.
*
* @param jobGraph to submit
* @return Future which is completed with the submission response
*/
@Override
public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
try {
final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
Expand Down Expand Up @@ -328,8 +321,7 @@ public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGra
});

return submissionFuture
.thenApply(
(JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID()))
.thenApply(ignore -> jobGraph.getJobID())
.exceptionally(
(Throwable throwable) -> {
throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.client.program;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -95,7 +94,7 @@ public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
}

@Override
public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -68,7 +67,7 @@ public void testPortForwarding() throws Exception {
.build();

RestClusterClient mockedClient = Mockito.mock(RestClusterClient.class);
when(mockedClient.submitJob(any())).thenReturn(CompletableFuture.completedFuture(new JobSubmissionResult(jobID)));
when(mockedClient.submitJob(any())).thenReturn(CompletableFuture.completedFuture(jobID));
when(mockedClient.requestJobResult(any())).thenReturn(CompletableFuture.completedFuture(jobResult));

PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenAnswer((invocation) -> {
Expand Down Expand Up @@ -106,7 +105,7 @@ public void testRemoteExecutionWithSavepoint() throws Exception {
RestClusterClient mockedClient = Mockito.mock(RestClusterClient.class);

PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenReturn(mockedClient);
when(mockedClient.submitJob(any())).thenReturn(CompletableFuture.completedFuture(new JobSubmissionResult(jobID)));
when(mockedClient.submitJob(any())).thenReturn(CompletableFuture.completedFuture(jobID));
when(mockedClient.requestJobResult(eq(jobID))).thenReturn(CompletableFuture.completedFuture(jobResult));

JobExecutionResult actualResult = env.execute("fakeJobName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.test.runtime;

import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
Expand Down Expand Up @@ -99,11 +99,10 @@ public void testSequentialReading() throws Exception {

final MiniClusterClient client = new MiniClusterClient(configuration, miniCluster);
final JobGraph jobGraph = createJobGraph();
final CompletableFuture<JobSubmissionResult> submitFuture = client.submitJob(jobGraph);
// wait for the submission to succeed
final JobSubmissionResult result = submitFuture.get();
final JobID jobID = client.submitJob(jobGraph).get();

final CompletableFuture<JobResult> resultFuture = client.requestJobResult(result.getJobID());
final CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobID);
final JobResult jobResult = resultFuture.get();

assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.test.runtime;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
Expand Down Expand Up @@ -118,12 +118,11 @@ private void executeSchedulingTest(Configuration configuration) throws Exception
MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);

JobGraph jobGraph = createJobGraph(slotIdleTimeout << 1, parallelism);
CompletableFuture<JobSubmissionResult> submissionFuture = miniClusterClient.submitJob(jobGraph);

// wait for the submission to succeed
JobSubmissionResult jobSubmissionResult = submissionFuture.get();
JobID jobID = miniClusterClient.submitJob(jobGraph).get();

CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobSubmissionResult.getJobID());
CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobID);

JobResult jobResult = resultFuture.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.yarn;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
Expand Down Expand Up @@ -301,11 +300,7 @@ private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescrip
}

private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient) throws InterruptedException, java.util.concurrent.ExecutionException {
final CompletableFuture<JobSubmissionResult> jobSubmissionResultCompletableFuture =
restClusterClient.submitJob(job);

final JobSubmissionResult jobSubmissionResult = jobSubmissionResultCompletableFuture.get();
return jobSubmissionResult.getJobID();
return restClusterClient.submitJob(job).get();
}

private void killApplicationMaster(final String processName) throws IOException, InterruptedException {
Expand Down

0 comments on commit 1266c88

Please sign in to comment.