Skip to content

Commit

Permalink
[FLINK-11533] [container] Make jobClassName argument optional
Browse files Browse the repository at this point in the history
[pr-review] Address comments
  • Loading branch information
uce committed Mar 1, 2019
1 parent b54fb2e commit 34aa3f5
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.FlinkException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

Expand All @@ -39,8 +43,10 @@
*/
class ClassPathJobGraphRetriever implements JobGraphRetriever {

@Nonnull
private final String jobClassName;
private static final Logger LOG = LoggerFactory.getLogger(ClassPathJobGraphRetriever.class);
private static final String JAVA_CLASS_PATH = "java.class.path";
private static final String PATH_SEPARATOR = "path.separator";
private static final String DEFAULT_PATH_SEPARATOR = ":";

@Nonnull
private final JobID jobId;
Expand All @@ -51,15 +57,18 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever {
@Nonnull
private final String[] programArguments;

@Nullable
private final String jobClassName;

ClassPathJobGraphRetriever(
@Nonnull String jobClassName,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments) {
this.jobClassName = requireNonNull(jobClassName, "jobClassName");
@Nonnull String[] programArguments,
@Nullable String jobClassName) {
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
this.jobClassName = jobClassName;
}

@Override
Expand Down Expand Up @@ -89,4 +98,5 @@ private PackagedProgram createPackagedProgram() throws FlinkException {
throw new FlinkException("Could not load the provided entrypoint class.", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,28 @@
*/
final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration {

@Nonnull
private final String jobClassName;

@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

@Nonnull
private final JobID jobId;

@Nullable
private final String jobClassName;

StandaloneJobClusterConfiguration(
@Nonnull String configDir,
@Nonnull Properties dynamicProperties,
@Nonnull String[] args,
@Nullable String hostname,
int restPort,
@Nonnull String jobClassName,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull JobID jobId) {
@Nonnull JobID jobId,
@Nullable String jobClassName) {
super(configDir, dynamicProperties, args, hostname, restPort);
this.jobClassName = requireNonNull(jobClassName, "jobClassName");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.jobId = requireNonNull(jobId, "jobId");
}

@Nonnull
String getJobClassName() {
return jobClassName;
this.jobClassName = jobClassName;
}

@Nonnull
Expand All @@ -72,4 +67,9 @@ SavepointRestoreSettings getSavepointRestoreSettings() {
JobID getJobId() {
return jobId;
}

@Nullable
String getJobClassName() {
return jobClassName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes

private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
.longOpt("job-classname")
.required(true)
.required(false)
.hasArg(true)
.argName("job class name")
.desc("Class name of the job to run.")
Expand Down Expand Up @@ -81,27 +81,27 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final int restPort = getRestPort(commandLine);
final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
final JobID jobId = getJobId(commandLine);
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());

return new StandaloneJobClusterConfiguration(
configDir,
dynamicProperties,
commandLine.getArgs(),
hostname,
restPort,
jobClassName,
savepointRestoreSettings,
jobId);
jobId,
jobClassName);
}

private int getRestPort(CommandLine commandLine) throws FlinkParseException {
final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
try {
return Integer.parseInt(restPortString);
} catch (NumberFormatException e) {
throw flinkParseException(REST_PORT_OPTION, e);
throw createFlinkParseException(REST_PORT_OPTION, e);
}
}

Expand All @@ -113,11 +113,11 @@ private static JobID getJobId(CommandLine commandLine) throws FlinkParseExceptio
try {
return JobID.fromHexString(jobId);
} catch (IllegalArgumentException e) {
throw flinkParseException(JOB_ID_OPTION, e);
throw createFlinkParseException(JOB_ID_OPTION, e);
}
}

private static FlinkParseException flinkParseException(Option option, Exception cause) {
private static FlinkParseException createFlinkParseException(Option option, Exception cause) {
return new FlinkParseException(String.format("Failed to parse '--%s' option", option.getLongOpt()), cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.util.SignalHandler;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

Expand All @@ -41,9 +42,6 @@
*/
public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {

@Nonnull
private final String jobClassName;

@Nonnull
private final JobID jobId;

Expand All @@ -53,24 +51,27 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
@Nonnull
private final String[] programArguments;

@Nullable
private final String jobClassName;

private StandaloneJobClusterEntryPoint(
Configuration configuration,
@Nonnull String jobClassName,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments) {
@Nonnull String[] programArguments,
@Nullable String jobClassName) {
super(configuration);
this.jobClassName = requireNonNull(jobClassName, "jobClassName");
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
this.jobClassName = jobClassName;
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
new ClassPathJobGraphRetriever(jobClassName, jobId, savepointRestoreSettings, programArguments));
new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName));
}

public static void main(String[] args) {
Expand All @@ -96,10 +97,10 @@ public static void main(String[] args) {

StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(
configuration,
clusterConfiguration.getJobClassName(),
clusterConfiguration.getJobId(),
clusterConfiguration.getSavepointRestoreSettings(),
clusterConfiguration.getArgs());
clusterConfiguration.getArgs(),
clusterConfiguration.getJobClassName());

ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public void testJobGraphRetrieval() throws FlinkException {
final JobID jobId = new JobID();

final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
TestJob.class.getCanonicalName(),
jobId,
SavepointRestoreSettings.none(),
PROGRAM_ARGUMENTS);
PROGRAM_ARGUMENTS,
TestJob.class.getCanonicalName());

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

Expand All @@ -67,10 +67,10 @@ public void testSavepointRestoreSettings() throws FlinkException {
final JobID jobId = new JobID();

final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
TestJob.class.getCanonicalName(),
jobId,
savepointRestoreSettings,
PROGRAM_ARGUMENTS);
PROGRAM_ARGUMENTS,
TestJob.class.getCanonicalName());

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -77,14 +79,18 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti
@Test
public void testOnlyRequiredArguments() throws FlinkParseException {
final String configDir = "/foo/bar";
final String jobClassName = "foobar";
final String[] args = {"--configDir", configDir, "--job-classname", jobClassName};
final String[] args = {"--configDir", configDir};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
assertThat(clusterConfiguration.getDynamicProperties(), is(equalTo(new Properties())));
assertThat(clusterConfiguration.getArgs(), is(new String[0]));
assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
assertThat(clusterConfiguration.getHostname(), is(nullValue()));
assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none())));
assertThat(clusterConfiguration.getJobId(), is(not(nullValue())));
assertThat(clusterConfiguration.getJobClassName(), is(nullValue()));
}

@Test(expected = FlinkParseException.class)
Expand Down

0 comments on commit 34aa3f5

Please sign in to comment.