Skip to content

Commit

Permalink
[FLINK-11779] Update the description of the CLI -m parameter
Browse files Browse the repository at this point in the history
We now document that the CLI ignores -m parameter if high-availability
is ZOOKEEPER

1. Change the description message of -m parameter of `DefaultCLI` and
`FlinkYarnSessionCli`.

2. Change the description message of
"rest.address.port"
  • Loading branch information
guoweiM authored and aljoscha committed Sep 23, 2020
1 parent dad7297 commit 0baa1c4
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;

import org.apache.commons.cli.CommandLine;
Expand All @@ -33,10 +32,6 @@
import org.apache.commons.cli.Options;
import org.slf4j.Logger;

import java.net.InetSocketAddress;

import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;

/**
* Base class for {@link CustomCommandLine} implementations which specify a JobManager address and
* a ZooKeeper namespace.
Expand All @@ -47,11 +42,6 @@ public abstract class AbstractCustomCommandLine implements CustomCommandLine {
protected final Option zookeeperNamespaceOption = new Option("z", "zookeeperNamespace", true,
"Namespace to create the Zookeeper sub-paths for high availability mode");


protected final Option addressOption = new Option("m", "jobmanager", true,
"Address of the JobManager to which to connect. " +
"Use this flag to connect to a different JobManager than the one specified in the configuration.");

protected final Configuration configuration;

protected AbstractCustomCommandLine(Configuration configuration) {
Expand All @@ -69,7 +59,6 @@ public void addRunOptions(Options baseOptions) {

@Override
public void addGeneralOptions(Options baseOptions) {
baseOptions.addOption(addressOption);
baseOptions.addOption(zookeeperNamespaceOption);
}

Expand All @@ -78,12 +67,6 @@ public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandL
final Configuration resultingConfiguration = new Configuration(configuration);
resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME);

if (commandLine.hasOption(addressOption.getOpt())) {
String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
InetSocketAddress jobManagerAddress = NetUtils.parseHostPortAddress(addressWithPort);
setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress);
}

if (commandLine.hasOption(zookeeperNamespaceOption.getOpt())) {
String zkNamespace = commandLine.getOptionValue(zookeeperNamespaceOption.getOpt());
resultingConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,30 @@

package org.apache.flink.client.cli;

import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import java.net.InetSocketAddress;

import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;

/**
* The default CLI which is used for interaction with standalone clusters.
*/
public class DefaultCLI extends AbstractCustomCommandLine {

private static final Option addressOption = new Option("m", "jobmanager", true,
"Address of the JobManager to which to connect. " +
"Use this flag to connect to a different JobManager than the one specified in the configuration. " +
"Attention: This option is respected only if the high-availability configuration is NONE.");

public static final String ID = "default";

public DefaultCLI(Configuration configuration) {
Expand All @@ -40,6 +54,19 @@ public boolean isActive(CommandLine commandLine) {
return true;
}

@Override
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {

final Configuration resultingConfiguration = super.applyCommandLineOptionsToConfiguration(commandLine);
if (commandLine.hasOption(addressOption.getOpt())) {
String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
InetSocketAddress jobManagerAddress = NetUtils.parseHostPortAddress(addressWithPort);
setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress);
}
resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME);
return resultingConfiguration;
}

@Override
public String getId() {
return ID;
Expand All @@ -48,5 +75,6 @@ public String getId() {
@Override
public void addGeneralOptions(Options baseOptions) {
super.addGeneralOptions(baseOptions);
baseOptions.addOption(addressOption);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class RestOptions {
key("rest.address")
.noDefaultValue()
.withFallbackKeys(JobManagerOptions.ADDRESS.key())
.withDescription("The address that should be used by clients to connect to the server.");
.withDescription("The address that should be used by clients to connect to the server. Attention: This option is respected only if the high-availability configuration is NONE.");

/**
* The port that the REST client connects to and the REST server binds to if {@link #BIND_PORT}
Expand All @@ -79,7 +79,7 @@ public class RestOptions {
.withDeprecatedKeys(WebOptions.PORT.key())
.withDescription(
Description.builder()
.text("The port that the client connects to. If %s has not been specified, then the REST server will bind to this port.", text(BIND_PORT.key()))
.text("The port that the client connects to. If %s has not been specified, then the REST server will bind to this port. Attention: This option is respected only if the high-availability configuration is NONE.", text(BIND_PORT.key()))
.build());

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn.cli;

import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

abstract class AbstractYarnCli extends AbstractCustomCommandLine {

public static final String ID = "yarn-cluster";

protected Option applicationId =
new Option("yid", "yarnapplicationId", true, "Attach to running YARN session");

protected Option addressOption =
new Option("m", "jobmanager", true, "Set to " + ID + " to use YARN execution mode.");

protected AbstractYarnCli(Configuration configuration) {
super(configuration);
}

@Override
public boolean isActive(CommandLine commandLine) {
final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
final boolean yarnJobManager = ID.equals(jobManagerOption);
final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
|| YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
return hasYarnExecutor || yarnJobManager || hasYarnAppId;
}

@Override
public void addGeneralOptions(Options baseOptions) {
super.addGeneralOptions(baseOptions);
baseOptions.addOption(applicationId);
baseOptions.addOption(addressOption);
}

@Override
public String getId() {
return ID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,27 @@
package org.apache.flink.yarn.cli;

import org.apache.flink.annotation.Internal;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

/**
* A stub Yarn Command Line to throw an exception with the correct
* message when the {@code HADOOP_CLASSPATH} is not set.
*/
@Internal
public class FallbackYarnSessionCli extends AbstractCustomCommandLine {

public static final String ID = "yarn-cluster";

private final Option applicationId;
public class FallbackYarnSessionCli extends AbstractYarnCli {

public FallbackYarnSessionCli(Configuration configuration) {
super(configuration);
applicationId = new Option("yid", "yarnapplicationId", true, "Attach to running YARN session");
}

@Override
public void addGeneralOptions(Options baseOptions) {
super.addGeneralOptions(baseOptions);
baseOptions.addOption(applicationId);
}

@Override
public boolean isActive(CommandLine commandLine) {
if (originalIsActive(commandLine)) {
if (super.isActive(commandLine)) {
throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
}
return false;
}

private boolean originalIsActive(CommandLine commandLine) {
final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
final boolean yarnJobManager = ID.equals(jobManagerOption);
final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
|| YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
return hasYarnExecutor || yarnJobManager || hasYarnAppId;
}

@Override
public String getId() {
return ID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.yarn.cli;

import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.deployment.ClusterClientFactory;
Expand Down Expand Up @@ -92,16 +91,12 @@
/**
* Class handling the command line interface to the YARN session.
*/
public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
public class FlinkYarnSessionCli extends AbstractYarnCli {
private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);

//------------------------------------ Constants -------------------------

private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;

/** The id for the CommandLine interface. */
private static final String ID = "yarn-cluster";

// YARN-session related constants
private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
private static final String YARN_APPLICATION_ID_KEY = "applicationID";
Expand All @@ -117,8 +112,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
// the prefix transformation is used by the CliFrontend static constructor.
private final Option query;
// --- or ---
private final Option applicationId;
// --- or ---
private final Option queue;
private final Option shipPath;
private final Option flinkJar;
Expand Down Expand Up @@ -186,7 +179,6 @@ public FlinkYarnSessionCli(
// Create the command line options

query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
applicationId = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
Expand Down Expand Up @@ -297,18 +289,10 @@ private void encodeFilesToShipToCluster(final Configuration configuration, final

@Override
public boolean isActive(CommandLine commandLine) {
final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
final boolean yarnJobManager = ID.equals(jobManagerOption);
final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
|| YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
return hasYarnExecutor || yarnJobManager || hasYarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
}

@Override
public String getId() {
return ID;
if (!super.isActive(commandLine)) {
return (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
}
return true;
}

@Override
Expand All @@ -320,12 +304,6 @@ public void addRunOptions(Options baseOptions) {
}
}

@Override
public void addGeneralOptions(Options baseOptions) {
super.addGeneralOptions(baseOptions);
baseOptions.addOption(applicationId);
}

@Override
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
// we ignore the addressOption because it can only contain "yarn-cluster"
Expand Down

0 comments on commit 0baa1c4

Please sign in to comment.