Skip to content

Commit

Permalink
[FLINK-25266][e2e] Support job jar submission with FlinkContainers
Browse files Browse the repository at this point in the history
  • Loading branch information
fapaul committed Jan 11, 2022
1 parent b5ade69 commit eb1ee99
Showing 1 changed file with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.tests.util.flink.container;

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.tests.util.flink.JobSubmission;
import org.apache.flink.tests.util.flink.SQLJobSubmission;
import org.apache.flink.util.function.RunnableWithException;

Expand Down Expand Up @@ -57,6 +59,8 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -258,6 +262,55 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException, InterruptedEx
}
}

/**
* Submits the given job to the cluster.
*
* @param job job to submit
*/
public JobID submitJob(JobSubmission job) throws IOException, InterruptedException {
final List<String> commands = new ArrayList<>();
commands.add("flink/bin/flink");
commands.add("run");

if (job.isDetached()) {
commands.add("-d");
}
if (job.getParallelism() > 0) {
commands.add("-p");
commands.add(String.valueOf(job.getParallelism()));
}
job.getMainClass()
.ifPresent(
mainClass -> {
commands.add("--class");
commands.add(mainClass);
});
final Path jobJar = job.getJar();
final String containerPath = "/tmp/" + jobJar.getFileName();
commands.add(containerPath);
jobManager.copyFileToContainer(
MountableFile.forHostPath(jobJar.toAbsolutePath()), containerPath);
commands.addAll(job.getArguments());

LOG.info("Running {}.", commands.stream().collect(Collectors.joining(" ")));

// Execute command in JobManager
Container.ExecResult execResult =
jobManager.execInContainer("bash", "-c", String.join(" ", commands));

final Pattern pattern =
job.isDetached()
? Pattern.compile("Job has been submitted with JobID (.*)")
: Pattern.compile("Job with JobID (.*) has finished.");

final String stdout = execResult.getStdout();
LOG.info(stdout);
LOG.error(execResult.getStderr());
final Matcher matcher = pattern.matcher(stdout);
checkState(matcher.find(), "Cannot extract JobID from stdout.");
return JobID.fromHexString(matcher.group(1));
}

// ------------------------ JUnit 5 lifecycle management ------------------------
@Override
public void beforeAll(ExtensionContext context) throws Exception {
Expand Down

0 comments on commit eb1ee99

Please sign in to comment.