Skip to content

Commit

Permalink
Support to set listener name for client cli (apache#7621)
Browse files Browse the repository at this point in the history
### Motivation

Currently, CLI tools `pulsar-client` and `pulsar-perf` cann't specify required listener name when using `advertisedListeners`.

### Modifications

Add option `--listener-name` for `pulsar-client` and `pulsar-perf`.
  • Loading branch information
murong00 authored Jul 28, 2020
1 parent 00e3089 commit 4f5b39a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class PulsarClientTool {
@Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
String authPluginClassName = null;

@Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
String listenerName = null;

@Parameter(
names = { "--auth-params" },
description = "Authentication parameters, whose format is determined by the implementation " +
Expand Down Expand Up @@ -115,6 +118,9 @@ private void updateConfig() throws UnsupportedAuthenticationException {
authentication = AuthenticationFactory.create(authPluginClassName, authParams);
clientBuilder.authentication(authentication);
}
if (isNotBlank(this.listenerName)) {
clientBuilder.listenerName(this.listenerName);
}
clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
clientBuilder.serviceUrl(serviceURL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ static class Arguments {
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
public String authPluginClassName;

@Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
String listenerName = null;

@Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
private int maxPendingChuckedMessage = 0;

Expand Down Expand Up @@ -267,6 +270,10 @@ public static void main(String[] args) throws Exception {
clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
}

if (isNotBlank(arguments.listenerName)) {
clientBuilder.listenerName(arguments.listenerName);
}

PulsarClient pulsarClient = clientBuilder.build();

class EncKeyReader implements CryptoKeyReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ static class Arguments {
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
public String authPluginClassName;

@Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
String listenerName = null;

@Parameter(names = { "-ch",
"--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
private boolean chunkingAllowed = false;
Expand Down Expand Up @@ -428,6 +431,10 @@ private static void runProducer(Arguments arguments,
clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
}

if (isNotBlank(arguments.listenerName)) {
clientBuilder.listenerName(arguments.listenerName);
}

client = clientBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
.sendTimeout(0, TimeUnit.SECONDS) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ static class Arguments {
@Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name")
public String authPluginClassName;

@Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
String listenerName = null;

@Parameter(
names = { "--auth-params" },
description = "Authentication parameters, whose format is determined by the implementation " +
Expand Down Expand Up @@ -220,6 +223,10 @@ public static void main(String[] args) throws Exception {
clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection);
}

if (isNotBlank(arguments.listenerName)) {
clientBuilder.listenerName(arguments.listenerName);
}

PulsarClient pulsarClient = clientBuilder.build();

List<CompletableFuture<Reader<byte[]>>> futures = Lists.newArrayList();
Expand Down
4 changes: 4 additions & 0 deletions site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ Options
|---|---|---|
|`--auth-params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{\"key1\":\"val1\",\"key2\":\"val2\"}"|{"saslJaasClientSectionName":"PulsarClient", "serverType":"broker"}|
|`--auth-plugin`|Authentication plugin class name|org.apache.pulsar.client.impl.auth.AuthenticationSasl|
|`--listener-name`|Listener name for the broker||
|`--url`|Broker URL to which to connect|pulsar://localhost:6650/ </br> ws://localhost:8080 |


Expand Down Expand Up @@ -415,6 +416,7 @@ Options
|---|---|---|
|`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
|`--auth_plugin`|Authentication plugin class name||
|`--listener-name`|Listener name for the broker||
|`--acks-delay-millis`|Acknowlegments grouping delay in millis|100|
|`-k`, `--encryption-key-name`|The private key name to decrypt payload||
|`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
Expand Down Expand Up @@ -448,6 +450,7 @@ Options
|---|---|---|
|`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
|`--auth_plugin`|Authentication plugin class name||
|`--listener-name`|Listener name for the broker||
|`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.||
|`--conf-file`|Configuration file||
Expand Down Expand Up @@ -485,6 +488,7 @@ Options
|---|---|---|
|`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
|`--auth_plugin`|Authentication plugin class name||
|`--listener-name`|Listener name for the broker||
|`--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
Expand Down

0 comments on commit 4f5b39a

Please sign in to comment.