Skip to content

Commit

Permalink
[Doc] Add 4 metadatastore parameters and a section for ZK batching op…
Browse files Browse the repository at this point in the history
…erations; Improve content for chunking; Bug fixes and more (apache#13751)
  • Loading branch information
momo-jun authored Feb 1, 2022
1 parent e7dca35 commit a65c887
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 39 deletions.
26 changes: 9 additions & 17 deletions site2/docs/administration-zk-bk.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,19 @@ $ bin/pulsar-daemon start configuration-store

### ZooKeeper configuration

In Pulsar, ZooKeeper configuration is handled by two separate configuration files in the `conf` directory of your Pulsar installation: `conf/zookeeper.conf` for [local ZooKeeper](#local-zookeeper) and `conf/global-zookeeper.conf` for [configuration store](#configuration-store).
In Pulsar, ZooKeeper configuration is handled by two separate configuration files in the `conf` directory of your Pulsar installation:
* The `conf/zookeeper.conf` file handles the configuration for local ZooKeeper.
* The `conf/global-zookeeper.conf` file handles the configuration for configuration store.
See [parameters](reference-configuration.md#zookeeper) for more details.

#### Local ZooKeeper
#### Configure batching operations
Using the batching operations reduces the remote procedure call (RPC) traffic between ZooKeeper client and servers. It also reduces the number of write transactions, because each batching operation corresponds to a single ZooKeeper transaction, containing multiple read and write operations.

The [`conf/zookeeper.conf`](reference-configuration.md#zookeeper) file handles the configuration for local ZooKeeper. The table below shows the available parameters:
The following figure demonstrates a basic benchmark of batching read/write operations that can be requested to ZooKeeper in one second:

|Name|Description|Default|
|---|---|---|
|tickTime| The tick is the basic unit of time in ZooKeeper, measured in milliseconds and used to regulate things like heartbeats and timeouts. tickTime is the length of a single tick. |2000|
|initLimit| The maximum time, in ticks, that the leader ZooKeeper server allows follower ZooKeeper servers to successfully connect and sync. The tick time is set in milliseconds using the tickTime parameter. |10|
|syncLimit| The maximum time, in ticks, that a follower ZooKeeper server is allowed to sync with other ZooKeeper servers. The tick time is set in milliseconds using the tickTime parameter. |5|
|dataDir| The location where ZooKeeper stores in-memory database snapshots as well as the transaction log of updates to the database. |data/zookeeper|
|clientPort| The port on which the ZooKeeper server listens for connections. |2181|
|autopurge.snapRetainCount| In ZooKeeper, auto purge determines how many recent snapshots of the database stored in dataDir to retain within the time interval specified by autopurge.purgeInterval (while deleting the rest). |3|
|autopurge.purgeInterval| The time interval, in hours, which triggers the ZooKeeper database purge task. Setting to a non-zero number enables auto purge; setting to 0 disables. Read this guide before enabling auto purge. |1|
|maxClientCnxns| The maximum number of client connections. Increase this if you need to handle more ZooKeeper clients. |60|
![Zookeeper batching benchmark](assets/zookeeper-batching.png)


#### Configuration Store

The [`conf/global-zookeeper.conf`](reference-configuration.md#configuration-store) file handles the configuration for configuration store. The table below shows the available parameters:
To enable batching operations, set the [`metadataStoreBatchingEnabled`](reference-configuration.md#broker) parameter to `true` on the broker side.


## BookKeeper
Expand Down
Binary file added site2/docs/assets/zookeeper-batching.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
39 changes: 27 additions & 12 deletions site2/docs/concepts-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,44 @@ To avoid redelivering acknowledged messages in a batch to the consumer, Pulsar i
By default, batch index acknowledgement is disabled (`acknowledgmentAtBatchIndexLevelEnabled=false`). You can enable batch index acknowledgement by setting the `acknowledgmentAtBatchIndexLevelEnabled` parameter to `true` at the broker side. Enabling batch index acknowledgement results in more memory overheads.

### Chunking
Before you enable chunking, read the following instructions.
- Batching and chunking cannot be enabled simultaneously. To enable chunking, you must disable batching in advance.
- Chunking is only supported for persisted topics.
- Chunking is only supported for Exclusive and Failover subscription types.
Message chunking enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages at the consumer side.

When chunking is enabled (`chunkingEnabled=true`), if the message size is greater than the allowed maximum publish-payload size, the producer splits the original message into chunked messages and publishes them with chunked metadata to the broker separately and in order. At the broker side, the chunked messages are stored in the managed-ledger in the same way as that of ordinary messages. The only difference is that the consumer needs to buffer the chunked messages and combines them into the real message when all chunked messages have been collected. The chunked messages in the managed-ledger can be interwoven with ordinary messages. If producer fails to publish all the chunks of a message, the consumer can expire incomplete chunks if consumer fail to receive all chunks in expire time. By default, the expire time is set to one minute.
With message chunking enabled, when the size of a message exceeds the allowed maximum payload size (the `maxMessageSize` parameter of broker), the workflow of messaging is as follows:
1. The producer splits the original message into chunked messages and publishes them with chunked metadata to the broker separately and in order.
2. The broker stores the chunked messages in one managed-ledger in the same way as that of ordinary messages, and it uses the `chunkedMessageRate` parameter to record chunked message rate on the topic.
3. The consumer buffers the chunked messages and aggregates them into the receiver queue when it receives all the chunks of a message.
4. The client consumes the aggregated message from the receiver queue.

The consumer consumes the chunked messages and buffers them until the consumer receives all the chunks of a message. And then the consumer stitches chunked messages together and places them into the receiver-queue. Clients consume messages from the receiver-queue. Once the consumer consumes the entire large message and acknowledges it, the consumer internally sends acknowledgement of all the chunk messages associated to that large message. You can set the `maxPendingChunkedMessage` parameter on the consumer. When the threshold is reached, the consumer drops the unchunked messages by silently acknowledging them or asking the broker to redeliver them later by marking them unacknowledged.
**Limitations:**
- Chunking is only available for persisted topics.
- Chunking is only available for the exclusive and failover subscription types.
- Chunking cannot be enabled simultaneously with batching.

The broker does not require any changes to support chunking for non-shared subscription. The broker only uses `chunkedMessageRate` to record chunked message rate on the topic.
#### Handle consecutive chunked messages with one ordered consumer

#### Handle chunked messages with one producer and one ordered consumer

As shown in the following figure, when a topic has one producer which publishes large message payload in chunked messages along with regular non-chunked messages. The producer publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. The broker stores all the three chunked messages in the managed-ledger and dispatches to the ordered (exclusive/failover) consumer in the same order. The consumer buffers all the chunked messages in memory until it receives all the chunked messages, combines them into one message and then hands over the original message M1 to the client.
The following figure shows a topic with one producer which publishes a large message payload in chunked messages along with regular non-chunked messages. The producer publishes message M1 in three chunks labeled M1-C1, M1-C2 and M1-C3. The broker stores all the three chunked messages in the managed-ledger and dispatches them to the ordered (exclusive/failover) consumer in the same order. The consumer buffers all the chunked messages in memory until it receives all the chunked messages, aggregates them into one message and then hands over the original message M1 to the client.

![](assets/chunking-01.png)

#### Handle chunked messages with multiple producers and one ordered consumer
#### Handle interwoven chunked messages with one ordered consumer

When multiple publishers publish chunked messages into a single topic, the broker stores all the chunked messages coming from different publishers in the same managed-ledger. As shown below, Producer 1 publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. Producer 2 publishes message M2 in three chunks M2-C1, M2-C2 and M2-C3. All chunked messages of the specific message are still in order but might not be consecutive in the managed-ledger. This brings some memory pressure to the consumer because the consumer keeps separate buffer for each large message to aggregate all chunks of the large message and combine them into one message.
When multiple producers publish chunked messages into a single topic, the broker stores all the chunked messages coming from different producers in the same managed-ledger. The chunked messages in the managed-ledger can be interwoven with each other. As shown below, Producer 1 publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. Producer 2 publishes message M2 in three chunks M2-C1, M2-C2 and M2-C3. All chunked messages of the specific message are still in order but might not be consecutive in the managed-ledger.

![](assets/chunking-02.png)

> **Note**
> In this case, interwoven chunked messages may bring some memory pressure to the consumer because the consumer keeps a separate buffer for each large message to aggregate all its chunks in one message. You can limit the maximum number of chunked messages a consumer maintains concurrently by configuring the `maxPendingChunkedMessage` parameter. When the threshold is reached, the consumer drops pending messages by silently acknowledging them or asking the broker to redeliver them later, optimizing memory utilization.

#### Enable Message Chunking

**Prerequisite:** Disable batching by setting the `enableBatching` parameter to `false`.

The message chunking feature is OFF by default.
To enable message chunking, set the `chunkingEnabled` parameter to `true` when creating a producer.

> **Note**
> If the consumer fails to receive all chunks of a message within a specified time period, it expires incomplete chunks. The default value is 1 minute. For more information about the `expireTimeOfIncompleteChunkedMessage` parameter, refer to [org.apache.pulsar.client.api](https://pulsar.apache.org/api/client/).

## Consumers

A consumer is a process that attaches to a topic via a subscription and then receives messages.
Expand Down
Loading

0 comments on commit a65c887

Please sign in to comment.