Skip to content

Commit

Permalink
Some minor fixes in Kafka-IO and Client-Cli. (apache#3418)
Browse files Browse the repository at this point in the history
  • Loading branch information
murong00 authored and merlimat committed Jan 29, 2019
1 parent 8b74640 commit 252ba22
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class CmdProduce {
@Parameter(description = "TopicName", required = true)
private List<String> mainOptions;

@Parameter(names = { "-m", "--messages" }, description = "Comma separted string messages to send, "
@Parameter(names = { "-m", "--messages" }, description = "Comma separated string messages to send, "
+ "either -m or -f must be specified.")
private List<String> messages = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ public class KafkaSinkConfig implements Serializable {
+ "before considering a request complete. This controls the durability of records that are sent.")
private String acks;
@FieldDoc(
defaultValue = "16384",
defaultValue = "16384L",
help =
"The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
private long batchSize = 16384L;
@FieldDoc(
defaultValue = "16384",
defaultValue = "1048576L",
help =
"The maximum size of a Kafka request in bytes.")
private long maxRequestSize = 1048576L;
Expand Down
27 changes: 14 additions & 13 deletions site2/docs/io-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ to a Pulsar topic.
|------|----------|---------|-------------|
| bootstrapServers | `true` | `null` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
| groupId | `true` | `null` | A unique string that identifies the consumer group this consumer belongs to. |
| fetchMinBytes | `false` | `null` | Minimum bytes expected for each fetch response. |
| autoCommitEnabled | `false` | `false` | If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. |
| autoCommitIntervalMs | `false` | `null` | The frequency in ms that the consumer offsets are committed to zookeeper. |
| fetchMinBytes | `false` | `1` | Minimum bytes expected for each fetch response. |
| autoCommitEnabled | `false` | `true` | If true, the consumer's offset will be periodically committed in the background. This committed offset will be used when the process fails as the position from which the new consumer will begin. |
| autoCommitIntervalMs | `false` | `5000` | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
| heartbeatIntervalMs | `false` | `3000` | The interval between heartbeats to the consumer when using Kafka's group management facilities. |
| sessionTimeoutMs | `false` | `null` | The timeout used to detect consumer failures when using Kafka's group management facility. |
| topic | `true` | `null` | Topic name to receive records from Kafka |
| keySerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface. |
| valueSerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
| sessionTimeoutMs | `false` | `30000` | The timeout used to detect consumer failures when using Kafka's group management facility. |
| topic | `true` | `null` | Topic name to receive records from Kafka. |
| keyDeserializationClass | `false` | `org.apache.kafka.common.serialization.StringDeserializer` | Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface. |
| valueDeserializationClass | `false` | `org.apache.kafka.common.serialization.ByteArrayDeserializer` | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. |

## Sink

Expand All @@ -33,9 +33,10 @@ to a Kafka topic.

| Name | Required | Default | Description |
|------|----------|---------|-------------|
| acks | `true` | `null` | The kafka producer acks mode |
| batchSize | `true` | `null` | The kafka producer batch size. |
| maxRequestSize | `true` | `null` | The maximum size of a request in bytes. |
| topic | `true` | `null` | Topic name to receive records from Kafka |
| keySerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
| valueSerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
| bootstrapServers | `true` | `null` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
| acks | `true` | `null` | The kafka producer acks mode. |
| batchSize | `false` | `16384` | The kafka producer batch size. |
| maxRequestSize | `false` | `1048576` | The maximum size of a request in bytes. |
| topic | `true` | `null` | Topic name to receive records from Kafka. |
| keySerializerClass | `false` | `org.apache.kafka.common.serialization.StringSerializer` | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
| valueSerializerClass | `false` | `org.apache.kafka.common.serialization.ByteArraySerializer` | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
30 changes: 19 additions & 11 deletions site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ Options
|---|---|---|
|`--hex`|Display binary messages in hexadecimal format.|false|
|`-n`, `--num-messages`|Number of messages to consume, 0 means to consume forever.|0|
|`-r`, `--rate`|Rate (in messages per second) at which to produce; a value 0 means to produce messages as fast as possible|0.0|
|`-r`, `--rate`|Rate (in messages per second) at which to consume; a value 0 means to consume messages as fast as possible|0.0|
|`-s`, `--subscription-name`|Subscription name||
|`-t`, `--subscription-type`|The type of the subscription. Possible values: Exclusive, Shared, Failover.|Exclusive|

Expand Down Expand Up @@ -383,6 +383,7 @@ Commands
* `simulation-controller`

Environment variables

The table below lists the environment variables that you can use to configure the pulsar-perf tool.

|Variable|Description|Default|
Expand All @@ -407,18 +408,21 @@ Options
|`--auth_params`|Authentication parameters in the form of key1:val1,key2:val2||
|`--auth_plugin`|Authentication plugin class name||
|`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
|`--acks-delay-millis`|Acknowlegments grouping delay in millis|100|
|`-k`, `--encryption-key-name`|The private key name to decrypt payload||
|`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
|`--conf-file`|Configuration file||
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|0|
|`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
|`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0|
|`-n`, `--num-producers`|The number of producers (per topic)|1|
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-n`, `--num-consumers`|Number of consumers (per topic)|1|
|`-t`, `--num-topic`|The number of topics|1|
|`-f`, `--payload-file`|Use payload from a file instead of an empty buffer||
|`-r`, `--rate`|Publish rate msg/s across topics|100|
|`-r`, `--rate`|Simulate a slow message consumer (rate in msg/s)|0|
|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
|`-u`, `--service-url`|Pulsar service URL||
|`-s`, `--size`|Message size (in bytes)|1024|
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
|`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
|`-st`, `--subscription-type`|Subscriber name prefix. Possible values are Exclusive, Shared, Failover.|Exclusive|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--use-tls`|Use TLS encryption on the connection|false|


### `produce`
Expand All @@ -437,7 +441,7 @@ Options
|`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB or ZSTD.||
|`--conf-file`|Configuration file||
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|0|
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
|`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0|
|`-n`, `--num-producers`|The number of producers (per topic)|1|
Expand All @@ -448,6 +452,9 @@ Options
|`-s`, `--size`|Message size (in bytes)|1024|
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
|`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--use-tls`|Use TLS encryption on the connection|false|
|`--warmup-time`|Warm-up time in seconds |1|



Expand Down Expand Up @@ -507,6 +514,7 @@ Commands


Environment variables

The table below lists the environment variables that you can use to configure the bookkeeper tool.

|Variable|Description|Default|
Expand Down

0 comments on commit 252ba22

Please sign in to comment.