forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
commit chapter Cookbooks (apache#12815)
Co-authored-by: Anonymitaet <[email protected]>
- Loading branch information
1 parent
77b12fb
commit 56b7d4d
Showing
10 changed files
with
1,510 additions
and
0 deletions.
There are no files selected for viewing
25 changes: 25 additions & 0 deletions
25
site2/website-next/versioned_docs/version-2.7.2/cookbooks-bookkeepermetadata.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
--- | ||
id: cookbooks-bookkeepermetadata | ||
title: BookKeeper Ledger Metadata | ||
original_id: cookbooks-bookkeepermetadata | ||
--- | ||
|
||
import Tabs from '@theme/Tabs'; | ||
import TabItem from '@theme/TabItem'; | ||
|
||
|
||
Pulsar stores data on BookKeeper ledgers, you can understand the contents of a ledger by inspecting the metadata attached to the ledger. | ||
Such metadata are stored on ZooKeeper and they are readable using BookKeeper APIs. | ||
|
||
Description of current metadata: | ||
|
||
| Scope | Metadata name | Metadata value | | ||
| ------------- | ------------- | ------------- | | ||
| All ledgers | application | 'pulsar' | | ||
| All ledgers | component | 'managed-ledger', 'schema', 'compacted-topic' | | ||
| Managed ledgers | pulsar/managed-ledger | name of the ledger | | ||
| Cursor | pulsar/cursor | name of the cursor | | ||
| Compacted topic | pulsar/compactedTopic | name of the original topic | | ||
| Compacted topic | pulsar/compactedTo | id of the last compacted message | | ||
|
||
|
146 changes: 146 additions & 0 deletions
146
site2/website-next/versioned_docs/version-2.7.2/cookbooks-compaction.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
--- | ||
id: cookbooks-compaction | ||
title: Topic compaction | ||
sidebar_label: "Topic compaction" | ||
original_id: cookbooks-compaction | ||
--- | ||
|
||
import Tabs from '@theme/Tabs'; | ||
import TabItem from '@theme/TabItem'; | ||
|
||
|
||
Pulsar's [topic compaction](concepts-topic-compaction.md#compaction) feature enables you to create **compacted** topics in which older, "obscured" entries are pruned from the topic, allowing for faster reads through the topic's history (which messages are deemed obscured/outdated/irrelevant will depend on your use case). | ||
|
||
To use compaction: | ||
|
||
* You need to give messages keys, as topic compaction in Pulsar takes place on a *per-key basis* (i.e. messages are compacted based on their key). For a stock ticker use case, the stock symbol---e.g. `AAPL` or `GOOG`---could serve as the key (more on this [below](#when-should-i-use-compacted-topics)). Messages without keys will be left alone by the compaction process. | ||
* Compaction can be configured to run [automatically](#configuring-compaction-to-run-automatically), or you can manually [trigger](#triggering-compaction-manually) compaction using the Pulsar administrative API. | ||
* Your consumers must be [configured](#consumer-configuration) to read from compacted topics ([Java consumers](#java), for example, have a `readCompacted` setting that must be set to `true`). If this configuration is not set, consumers will still be able to read from the non-compacted topic. | ||
|
||
|
||
> Compaction only works on messages that have keys (as in the stock ticker example the stock symbol serves as the key for each message). Keys can thus be thought of as the axis along which compaction is applied. Messages that don't have keys are simply ignored by compaction. | ||
## When should I use compacted topics? | ||
|
||
The classic example of a topic that could benefit from compaction would be a stock ticker topic through which consumers can access up-to-date values for specific stocks. Imagine a scenario in which messages carrying stock value data use the stock symbol as the key (`GOOG`, `AAPL`, `TWTR`, etc.). Compacting this topic would give consumers on the topic two options: | ||
|
||
* They can read from the "original," non-compacted topic in case they need access to "historical" values, i.e. the entirety of the topic's messages. | ||
* They can read from the compacted topic if they only want to see the most up-to-date messages. | ||
|
||
Thus, if you're using a Pulsar topic called `stock-values`, some consumers could have access to all messages in the topic (perhaps because they're performing some kind of number crunching of all values in the last hour) while the consumers used to power the real-time stock ticker only see the compacted topic (and thus aren't forced to process outdated messages). Which variant of the topic any given consumer pulls messages from is determined by the consumer's [configuration](#consumer-configuration). | ||
|
||
> One of the benefits of compaction in Pulsar is that you aren't forced to choose between compacted and non-compacted topics, as the compaction process leaves the original topic as-is and essentially adds an alternate topic. In other words, you can run compaction on a topic and consumers that need access to the non-compacted version of the topic will not be adversely affected. | ||
|
||
## Configuring compaction to run automatically | ||
|
||
Tenant administrators can configure a policy for compaction at the namespace level. The policy specifies how large the topic backlog can grow before compaction is triggered. | ||
|
||
For example, to trigger compaction when the backlog reaches 100MB: | ||
|
||
```bash | ||
|
||
$ bin/pulsar-admin namespaces set-compaction-threshold \ | ||
--threshold 100M my-tenant/my-namespace | ||
|
||
``` | ||
|
||
Configuring the compaction threshold on a namespace will apply to all topics within that namespace. | ||
|
||
## Triggering compaction manually | ||
|
||
In order to run compaction on a topic, you need to use the [`topics compact`](reference-pulsar-admin.md#topics-compact) command for the [`pulsar-admin`](reference-pulsar-admin) CLI tool. Here's an example: | ||
|
||
```bash | ||
|
||
$ bin/pulsar-admin topics compact \ | ||
persistent://my-tenant/my-namespace/my-topic | ||
|
||
``` | ||
|
||
The `pulsar-admin` tool runs compaction via the Pulsar {@inject: rest:REST:/} API. To run compaction in its own dedicated process, i.e. *not* through the REST API, you can use the [`pulsar compact-topic`](reference-cli-tools.md#pulsar-compact-topic) command. Here's an example: | ||
|
||
```bash | ||
|
||
$ bin/pulsar compact-topic \ | ||
--topic persistent://my-tenant-namespace/my-topic | ||
|
||
``` | ||
|
||
> Running compaction in its own process is recommended when you want to avoid interfering with the broker's performance. Broker performance should only be affected, however, when running compaction on topics with a large keyspace (i.e when there are many keys on the topic). The first phase of the compaction process keeps a copy of each key in the topic, which can create memory pressure as the number of keys grows. Using the `pulsar-admin topics compact` command to run compaction through the REST API should present no issues in the overwhelming majority of cases; using `pulsar compact-topic` should correspondingly be considered an edge case. | ||
The `pulsar compact-topic` command communicates with [ZooKeeper](https://zookeeper.apache.org) directly. In order to establish communication with ZooKeeper, though, the `pulsar` CLI tool will need to have a valid [broker configuration](reference-configuration.md#broker). You can either supply a proper configuration in `conf/broker.conf` or specify a non-default location for the configuration: | ||
|
||
```bash | ||
|
||
$ bin/pulsar compact-topic \ | ||
--broker-conf /path/to/broker.conf \ | ||
--topic persistent://my-tenant/my-namespace/my-topic | ||
|
||
# If the configuration is in conf/broker.conf | ||
$ bin/pulsar compact-topic \ | ||
--topic persistent://my-tenant/my-namespace/my-topic | ||
|
||
``` | ||
|
||
#### When should I trigger compaction? | ||
|
||
How often you [trigger compaction](#triggering-compaction-manually) will vary widely based on the use case. If you want a compacted topic to be extremely speedy on read, then you should run compaction fairly frequently. | ||
|
||
## Consumer configuration | ||
|
||
Pulsar consumers and readers need to be configured to read from compacted topics. The sections below show you how to enable compacted topic reads for Pulsar's language clients. If the | ||
|
||
### Java | ||
|
||
In order to read from a compacted topic using a Java consumer, the `readCompacted` parameter must be set to `true`. Here's an example consumer for a compacted topic: | ||
|
||
```java | ||
|
||
Consumer<byte[]> compactedTopicConsumer = client.newConsumer() | ||
.topic("some-compacted-topic") | ||
.readCompacted(true) | ||
.subscribe(); | ||
|
||
``` | ||
|
||
As mentioned above, topic compaction in Pulsar works on a *per-key basis*. That means that messages that you produce on compacted topics need to have keys (the content of the key will depend on your use case). Messages that don't have keys will be ignored by the compaction process. Here's an example Pulsar message with a key: | ||
|
||
```java | ||
|
||
import org.apache.pulsar.client.api.Message; | ||
import org.apache.pulsar.client.api.MessageBuilder; | ||
|
||
Message<byte[]> msg = MessageBuilder.create() | ||
.setContent(someByteArray) | ||
.setKey("some-key") | ||
.build(); | ||
|
||
``` | ||
|
||
The example below shows a message with a key being produced on a compacted Pulsar topic: | ||
|
||
```java | ||
|
||
import org.apache.pulsar.client.api.Message; | ||
import org.apache.pulsar.client.api.MessageBuilder; | ||
import org.apache.pulsar.client.api.Producer; | ||
import org.apache.pulsar.client.api.PulsarClient; | ||
|
||
PulsarClient client = PulsarClient.builder() | ||
.serviceUrl("pulsar://localhost:6650") | ||
.build(); | ||
|
||
Producer<byte[]> compactedTopicProducer = client.newProducer() | ||
.topic("some-compacted-topic") | ||
.create(); | ||
|
||
Message<byte[]> msg = MessageBuilder.create() | ||
.setContent(someByteArray) | ||
.setKey("some-key") | ||
.build(); | ||
|
||
compactedTopicProducer.send(msg); | ||
|
||
``` | ||
|
159 changes: 159 additions & 0 deletions
159
site2/website-next/versioned_docs/version-2.7.2/cookbooks-deduplication.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
--- | ||
id: cookbooks-deduplication | ||
title: Message deduplication | ||
sidebar_label: "Message deduplication" | ||
original_id: cookbooks-deduplication | ||
--- | ||
|
||
import Tabs from '@theme/Tabs'; | ||
import TabItem from '@theme/TabItem'; | ||
|
||
|
||
When **Message deduplication** is enabled, it ensures that each message produced on Pulsar topics is persisted to disk *only once*, even if the message is produced more than once. Message deduplication is handled automatically on the server side. | ||
|
||
To use message deduplication in Pulsar, you need to configure your Pulsar brokers and clients. | ||
|
||
## How it works | ||
|
||
You can enable or disable message deduplication at the namespace level or the topic level. By default, it is disabled on all namespaces or topics. You can enable it in the following ways: | ||
|
||
* Enable deduplication for all namespaces/topics at the broker-level. | ||
* Enable deduplication for a specific namespace with the `pulsar-admin namespaces` interface. | ||
* Enable deduplication for a specific topic with the `pulsar-admin topics` interface. | ||
|
||
## Configure message deduplication | ||
|
||
You can configure message deduplication in Pulsar using the [`broker.conf`](reference-configuration.md#broker) configuration file. The following deduplication-related parameters are available. | ||
|
||
Parameter | Description | Default | ||
:---------|:------------|:------- | ||
`brokerDeduplicationEnabled` | Sets the default behavior for message deduplication in the Pulsar broker. If it is set to `true`, message deduplication is enabled on all namespaces/topics. If it is set to `false`, you have to enable or disable deduplication at the namespace level or the topic level. | `false` | ||
`brokerDeduplicationMaxNumberOfProducers` | The maximum number of producers for which information is stored for deduplication purposes. | `10000` | ||
`brokerDeduplicationEntriesInterval` | The number of entries after which a deduplication informational snapshot is taken. A larger interval leads to fewer snapshots being taken, though this lengthens the topic recovery time (the time required for entries published after the snapshot to be replayed). | `1000` | ||
`brokerDeduplicationProducerInactivityTimeoutMinutes` | The time of inactivity (in minutes) after which the broker discards deduplication information related to a disconnected producer. | `360` (6 hours) | ||
|
||
### Set default value at the broker-level | ||
|
||
By default, message deduplication is *disabled* on all Pulsar namespaces/topics. To enable it on all namespaces/topics, set the `brokerDeduplicationEnabled` parameter to `true` and re-start the broker. | ||
|
||
Even if you set the value for `brokerDeduplicationEnabled`, enabling or disabling via Pulsar admin CLI overrides the default settings at the broker-level. | ||
|
||
### Enable message deduplication | ||
|
||
Though message deduplication is disabled by default at the broker level, you can enable message deduplication for a specific namespace or topic using the [`pulsar-admin namespaces set-deduplication`](reference-pulsar-admin.md#namespace-set-deduplication) or the [`pulsar-admin topics set-deduplication`](reference-pulsar-admin.md#topic-set-deduplication) command. You can use the `--enable`/`-e` flag and specify the namespace/topic. | ||
|
||
The following example shows how to enable message deduplication at the namespace level. | ||
|
||
```bash | ||
|
||
$ bin/pulsar-admin namespaces set-deduplication \ | ||
public/default \ | ||
--enable # or just -e | ||
|
||
``` | ||
|
||
### Disable message deduplication | ||
|
||
Even if you enable message deduplication at the broker level, you can disable message deduplication for a specific namespace or topic using the [`pulsar-admin namespace set-deduplication`](reference-pulsar-admin.md#namespace-set-deduplication) or the [`pulsar-admin topics set-deduplication`](reference-pulsar-admin.md#topic-set-deduplication) command. Use the `--disable`/`-d` flag and specify the namespace/topic. | ||
|
||
The following example shows how to disable message deduplication at the namespace level. | ||
|
||
```bash | ||
|
||
$ bin/pulsar-admin namespaces set-deduplication \ | ||
public/default \ | ||
--disable # or just -d | ||
|
||
``` | ||
|
||
## Pulsar clients | ||
|
||
If you enable message deduplication in Pulsar brokers, you need complete the following tasks for your client producers: | ||
|
||
1. Specify a name for the producer. | ||
1. Set the message timeout to `0` (namely, no timeout). | ||
|
||
The instructions for Java, Python, and C++ clients are different. | ||
|
||
<Tabs | ||
defaultValue="Java clients" | ||
values={[ | ||
{ | ||
"label": "Java clients", | ||
"value": "Java clients" | ||
}, | ||
{ | ||
"label": "Python clients", | ||
"value": "Python clients" | ||
}, | ||
{ | ||
"label": "C++ clients", | ||
"value": "C++ clients" | ||
} | ||
]}> | ||
<TabItem value="Java clients"> | ||
|
||
To enable message deduplication on a [Java producer](client-libraries-java.md#producers), set the producer name using the `producerName` setter, and set the timeout to `0` using the `sendTimeout` setter. | ||
|
||
```java | ||
|
||
import org.apache.pulsar.client.api.Producer; | ||
import org.apache.pulsar.client.api.PulsarClient; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
PulsarClient pulsarClient = PulsarClient.builder() | ||
.serviceUrl("pulsar://localhost:6650") | ||
.build(); | ||
Producer producer = pulsarClient.newProducer() | ||
.producerName("producer-1") | ||
.topic("persistent://public/default/topic-1") | ||
.sendTimeout(0, TimeUnit.SECONDS) | ||
.create(); | ||
|
||
``` | ||
|
||
</TabItem> | ||
<TabItem value="Python clients"> | ||
|
||
To enable message deduplication on a [Python producer](client-libraries-python.md#producers), set the producer name using `producer_name`, and set the timeout to `0` using `send_timeout_millis`. | ||
|
||
```python | ||
|
||
import pulsar | ||
|
||
client = pulsar.Client("pulsar://localhost:6650") | ||
producer = client.create_producer( | ||
"persistent://public/default/topic-1", | ||
producer_name="producer-1", | ||
send_timeout_millis=0) | ||
|
||
``` | ||
|
||
</TabItem> | ||
<TabItem value="C++ clients"> | ||
|
||
To enable message deduplication on a [C++ producer](client-libraries-cpp.md#producer), set the producer name using `producer_name`, and set the timeout to `0` using `send_timeout_millis`. | ||
|
||
```cpp | ||
|
||
#include <pulsar/Client.h> | ||
|
||
std::string serviceUrl = "pulsar://localhost:6650"; | ||
std::string topic = "persistent://some-tenant/ns1/topic-1"; | ||
std::string producerName = "producer-1"; | ||
|
||
Client client(serviceUrl); | ||
|
||
ProducerConfiguration producerConfig; | ||
producerConfig.setSendTimeout(0); | ||
producerConfig.setProducerName(producerName); | ||
|
||
Producer producer; | ||
|
||
Result result = client.createProducer(topic, producerConfig, producer); | ||
|
||
``` | ||
</TabItem> | ||
</Tabs> |
Oops, something went wrong.