Skip to content

Commit

Permalink
Make all cli arguments follow consistent - notation instead of camelc…
Browse files Browse the repository at this point in the history
…ase (apache#2432)
  • Loading branch information
srkukarni authored Aug 23, 2018
1 parent d6a0ce0 commit 225eeb7
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String DEPRECATED_userConfigString;
@Parameter(names = "--user-config", description = "User-defined config key/values")
protected String userConfigString;
@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order")
@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order", hidden = true)
protected Boolean DEPRECATED_retainOrdering;
@Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order")
protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)")
protected Integer parallelism;
Expand Down Expand Up @@ -329,8 +331,9 @@ private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString;

if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = DEPRECATED_fnConfigFile;
if (DEPRECATED_processingGuarantees != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = DEPRECATED_processingGuarantees;
if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
if (!StringUtils.isBlank(DEPRECATED_userConfigString)) userConfigString = DEPRECATED_userConfigString;
if (DEPRECATED_retainOrdering != null) retainOrdering = DEPRECATED_retainOrdering;
if (DEPRECATED_windowLengthCount != null) windowLengthCount = DEPRECATED_windowLengthCount;
if (DEPRECATED_windowLengthDurationMs != null) windowLengthDurationMs = DEPRECATED_windowLengthDurationMs;
if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount = DEPRECATED_slidingIntervalCount;
Expand Down Expand Up @@ -825,6 +828,7 @@ private void mergeArgs() {

@Override
void runCmd() throws Exception {
// merge deprecated args with new args
mergeArgs();
CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(),
instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
Expand Down Expand Up @@ -1023,6 +1027,7 @@ public void mergeArgs() {

@Override
void runCmd() throws Exception {
// merge deprecated args with new args
mergeArgs();
if (triggerFile == null && triggerValue == null) {
throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified");
Expand Down Expand Up @@ -1057,6 +1062,7 @@ private void mergeArgs() {

@Override
void runCmd() throws Exception {
// merge deprecated args with new args
mergeArgs();
if (StringUtils.isBlank(sourceFile)) {
throw new ParameterException("--source-file needs to be specified");
Expand Down Expand Up @@ -1091,6 +1097,7 @@ private void mergeArgs() {

@Override
void runCmd() throws Exception {
// merge deprecated args with new args
mergeArgs();
if (StringUtils.isBlank(destinationFile)) {
throw new ParameterException("--destination-file needs to be specified");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,56 @@ void processArguments() throws Exception {
@Parameters(commandDescription = "Run a Pulsar IO sink connector locally (rather than deploying it to the Pulsar cluster)")
protected class LocalSinkRunner extends CreateSink {

@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
protected String DEPRECATED_brokerServiceUrl;
@Parameter(names = "--broker-service-url", description = "The URL for the Pulsar broker")
protected String brokerServiceUrl;

@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker")
@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
protected String DEPRECATED_clientAuthPlugin;
@Parameter(names = "--client-auth-plugin", description = "Client authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;

@Parameter(names = "--clientAuthParams", description = "Client authentication param")
@Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
protected String DEPRECATED_clientAuthParams;
@Parameter(names = "--client-auth-params", description = "Client authentication param")
protected String clientAuthParams;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
@Parameter(names = "--use_tls", description = "Use tls connection", hidden = true)
protected Boolean DEPRECATED_useTls;
@Parameter(names = "--use-tls", description = "Use tls connection")
protected boolean useTls;

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection", hidden = true)
protected Boolean DEPRECATED_tlsAllowInsecureConnection;
@Parameter(names = "--tls-allow-insecure", description = "Allow insecure tls connection")
protected boolean tlsAllowInsecureConnection;

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification", hidden = true)
protected Boolean DEPRECATED_tlsHostNameVerificationEnabled;
@Parameter(names = "--hostname-verification-enabled", description = "Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path", hidden = true)
protected String DEPRECATED_tlsTrustCertFilePath;
@Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;

private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
if (!StringUtils.isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams;
if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
if (DEPRECATED_tlsAllowInsecureConnection != null) tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
if (DEPRECATED_tlsHostNameVerificationEnabled != null) tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
}

@Override
void runCmd() throws Exception {
// merge deprecated args with new args
mergeArgs();

CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(),
0, brokerServiceUrl, null,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
Expand Down Expand Up @@ -212,35 +239,50 @@ abstract class SinkCommand extends BaseCommand {
"--inputs" }, description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)")
protected String inputs;

@Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)")
@Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)", hidden = true)
protected String DEPRECATED_topicsPattern;
@Parameter(names = "--topics-pattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)")
protected String topicsPattern;

@Parameter(names = { "-st",
"--schema-type" }, description = "The builtin schema type (eg: 'avro', 'json', etc..) or the class name for a Schema implementation")
protected String schemaType = "";

@Parameter(names = "--subsName", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
@Parameter(names = "--subsName", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer", hidden = true)
protected String DEPRECATED_subsName;
@Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
protected String subsName;

@Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)")
@Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true)
protected String DEPRECATED_customSerdeInputString;
@Parameter(names = "--custom-serde-inputs", description = "The map of input topics to SerDe class names (as a JSON string)")
protected String customSerdeInputString;

@Parameter(names = "--customSchemaInputs", description = "The map of input topics to Schema types or class names (as a JSON string)")
@Parameter(names = "--custom-schema-inputs", description = "The map of input topics to Schema types or class names (as a JSON string)")
protected String customSchemaInputString;


@Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink")
@Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink", hidden = true)
protected FunctionConfig.ProcessingGuarantees DEPRECATED_processingGuarantees;
@Parameter(names = "--processing-guarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
@Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order")
@Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order", hidden = true)
protected Boolean DEPRECATED_retainOrdering;
@Parameter(names = "--retain-ordering", description = "Sink consumes and sinks messages in order")
protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)")
protected Integer parallelism;
@Parameter(names = {"-a", "--archive"}, description = "Path to the archive file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class)
protected String archive;
@Parameter(names = "--className", description = "The sink's class name if archive is file-url-path (file://)")
@Parameter(names = "--className", description = "The sink's class name if archive is file-url-path (file://)", hidden = true)
protected String DEPRECATED_className;
@Parameter(names = "--classname", description = "The sink's class name if archive is file-url-path (file://)")
protected String className;

@Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the "
+ "sink's configuration", hidden = true)
protected String DEPRECATED_sinkConfigFile;
@Parameter(names = "--sink-config-file", description = "The path to a YAML config file specifying the "
+ "sink's configuration")
protected String sinkConfigFile;
@Parameter(names = "--cpu", description = "The CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime)")
Expand All @@ -249,14 +291,29 @@ abstract class SinkCommand extends BaseCommand {
protected Long ram;
@Parameter(names = "--disk", description = "The disk (in bytes) that need to be allocated per sink instance (applicable only to Docker runtime)")
protected Long disk;
@Parameter(names = "--sinkConfig", description = "User defined configs key/values")
@Parameter(names = "--sinkConfig", description = "User defined configs key/values", hidden = true)
protected String DEPRECATED_sinkConfigString;
@Parameter(names = "--sink-config", description = "User defined configs key/values")
protected String sinkConfigString;

protected SinkConfig sinkConfig;

private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = DEPRECATED_subsName;
if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern;
if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString;
if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
if (DEPRECATED_retainOrdering != null) retainOrdering = DEPRECATED_retainOrdering;
if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className;
if (!StringUtils.isBlank(DEPRECATED_sinkConfigFile)) sinkConfigFile = DEPRECATED_sinkConfigFile;
if (!StringUtils.isBlank(DEPRECATED_sinkConfigString)) sinkConfigString = DEPRECATED_sinkConfigString;
}

@Override
void processArguments() throws Exception {
super.processArguments();
// merge deprecated args with new args
mergeArgs();

if (null != sinkConfigFile) {
this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, SinkConfig.class);
Expand Down
Loading

0 comments on commit 225eeb7

Please sign in to comment.