Skip to content

Commit

Permalink
[FLINK-24603][e2e] Add support for setting job main class
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 5, 2021
1 parent 24fd6d5 commit 295a1e3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ public JobID submitJob(final JobSubmission jobSubmission, Duration timeout) thro
commands.add("-p");
commands.add(String.valueOf(jobSubmission.getParallelism()));
}
jobSubmission
.getMainClass()
.ifPresent(
mainClass -> {
commands.add("--class");
commands.add(mainClass);
});
commands.add(jobSubmission.getJar().toAbsolutePath().toString());
commands.addAll(jobSubmission.getArguments());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,32 @@

import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

/** Programmatic definition of a job-submission. */
public class JobSubmission {

private final Path jar;

private final String mainClass;
private final int parallelism;
private final boolean detached;
private final List<String> arguments;

JobSubmission(
final Path jar,
@Nullable final String mainClass,
final int parallelism,
final boolean detached,
final List<String> arguments) {
this.jar = jar;
this.mainClass = mainClass;
this.parallelism = parallelism;
this.detached = detached;
this.arguments = Collections.unmodifiableList(arguments);
Expand All @@ -60,19 +67,35 @@ public Path getJar() {
return jar;
}

public Optional<String> getMainClass() {
return Optional.ofNullable(mainClass);
}

/** Builder for the {@link JobSubmission}. */
public static class JobSubmissionBuilder {
private final Path jar;
private int parallelism = 0;
private final List<String> arguments = new ArrayList<>(2);
private boolean detached = false;
private String mainClass = null;

public JobSubmissionBuilder(final Path jar) {
Preconditions.checkNotNull(jar);
Preconditions.checkArgument(jar.isAbsolute(), "Jar path must be absolute.");
this.jar = jar;
}

/**
* Sets the main class for the job.
*
* @param mainClass main class for the job
* @return the modified builder
*/
public JobSubmissionBuilder setMainClass(final String mainClass) {
this.mainClass = mainClass;
return this;
}

/**
* Sets the parallelism for the job.
*
Expand Down Expand Up @@ -122,7 +145,7 @@ public JobSubmissionBuilder addArgument(final String key, final String value) {
}

public JobSubmission build() {
return new JobSubmission(jar, parallelism, detached, arguments);
return new JobSubmission(jar, mainClass, parallelism, detached, arguments);
}
}
}

0 comments on commit 295a1e3

Please sign in to comment.