Skip to content

Commit

Permalink
Message deduplication documentation (apache#1271)
Browse files Browse the repository at this point in the history
* add deduplication diagram

* add CLI docs to YAML file

* add docs for new broker.conf settings

* add link to streamlio blog post and remove errant TODOs

* add more to theory section

* add producer idempotency section

* add message deduplication cookbook

* add multiple new sections to cookbook

* add note about dedup and namespaces

* add new badge to new docs

* update description of configs

* finish admin section of doc

* fix minor markdown error

* fix merge conflicts in gemfile.lock
  • Loading branch information
lucperkins authored and merlimat committed Apr 6, 2018
1 parent b5781a1 commit ec210cb
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 12 deletions.
6 changes: 3 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ brokerDeduplicationEnabled=false
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A bigger interval will lead to less snapshots being taken though it would
# increase the topic recovery time, when the entries published after the
# snapshot need to be replayed
# A larger interval will lead to fewer snapshots being taken, though it would
# increase the topic recovery time when the entries published after the
# snapshot need to be replayed.
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public interface ProducerBuilder<T> extends Serializable, Cloneable {
* Set the send timeout <i>(default: 30 seconds)</i>
* <p>
* If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
* Setting the timeout to zero, for example <code>setTimeout(0, TimeUnit.SECONDS)</code> will set the timeout
* to infinity, which can be useful when using Pulsar's message deduplication feature.
*
* @param sendTimeout
* the send timeout
Expand Down
2 changes: 1 addition & 1 deletion site/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ GEM
rouge (3.1.1)
ruby_dep (1.5.0)
safe_yaml (1.0.4)
sass (3.5.5)
sass (3.5.6)
sass-listen (~> 4.0.0)
sass-listen (4.0.0)
rb-fsevent (~> 0.9, >= 0.9.4)
Expand Down
10 changes: 10 additions & 0 deletions site/_data/cli/pulsar-admin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,16 @@ commands:
- name: delete
description: Deletes a namespace
argument: property/cluster/namespace
- name: set-deduplication
description: Enable or disable message deduplication on a namespace
argument: property/cluster/namespace
options:
- flags: --enable, -e
description: Enable message deduplication on the specified namespace
default: 'false'
- flags: --disable, -d
description: Disable message deduplication on the specified namespace
default: 'false'
- name: permissions
description: Get the permissions on a namespace
argument: property/cluster/namespace
Expand Down
14 changes: 13 additions & 1 deletion site/_data/config/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,22 @@ configs:
description: Hostname or IP address the service binds on, default is 0.0.0.0.
- name: advertisedAddress
default: ''
description: Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
description: Hostname or IP address the service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used.
- name: clusterName
default: ''
description: Name of the cluster to which this broker belongs to
- name: brokerDeduplicationEnabled
default: 'false'
description: Sets the default behavior for message deduplication in the broker. If enabled, the broker will reject messages that were already stored in the topic. This setting can be overridden on a per-namespace basis.
- name: brokerDeduplicationMaxNumberOfProducers
default: '10000'
description: The maximum number of producers for which information will be stored for deduplication purposes.
- name: brokerDeduplicationEntriesInterval
default: '1000'
description: The number of entries after which a deduplication informational snapshot is taken. A larger interval will lead to fewer snapshots being taken, though this would also lengthen the topic recovery time (the time required for entries published after the snapshot to be replayed).
- name: brokerDeduplicationProducerInactivityTimeoutMinutes
default: '360'
description: The time of inactivity (in minutes) after which the broker will discard deduplication information related to a disconnected producer.
- name: zooKeeperSessionTimeoutMillis
default: '30000'
description: Zookeeper session timeout in milliseconds
Expand Down
6 changes: 4 additions & 2 deletions site/_data/sidebar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ groups:
- title: Apache Storm
endpoint: PulsarStorm

- title: Advanced
dir: advanced
- title: Cookbooks
dir: cookbooks
docs:
- title: Message deduplication
endpoint: message-deduplication
- title: Partitioned topics
endpoint: PartitionedTopics
- title: Retention and expiry
Expand Down
2 changes: 1 addition & 1 deletion site/docs/latest/admin/ZooKeeperBookKeeper.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Configuration for global ZooKeeper is handled by the [`conf/global-zookeeper.con
{% popover BookKeeper %} is responsible for all durable message storage in Pulsar. BookKeeper is a distributed [write-ahead log](https://en.wikipedia.org/wiki/Write-ahead_logging) WAL system that guarantees read consistency of independent message logs called {% popover ledgers %}. Individual BookKeeper servers are also called *bookies*.

{% include admonition.html type="info" content="
For a guide to managing message persistence, retention, and expiry in Pulsar, see [this guide](../../advanced/RetentionExpiry).
For a guide to managing message persistence, retention, and expiry in Pulsar, see [this cookbook](../../cookbooks/RetentionExpiry).
" %}

### Deploying BookKeeper
Expand Down
2 changes: 1 addition & 1 deletion site/docs/latest/clients/Java.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Producer producer = client.newProducer()

### Message routing

When using {% popover partitioned topics %}, you can specify the routing mode whenever you publish messages using a {% popover producer %}. For more on specifying a routing mode using the Java client, see the [Partitioned Topics](../../advanced/PartitionedTopics) guide.
When using {% popover partitioned topics %}, you can specify the routing mode whenever you publish messages using a {% popover producer %}. For more on specifying a routing mode using the Java client, see the [Partitioned Topics](../../cookbooks/PartitionedTopics) cookbook.

### Async send

Expand Down
File renamed without changes.
121 changes: 121 additions & 0 deletions site/docs/latest/cookbooks/message-deduplication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
---
title: Message deduplication
tags: [admin, deduplication, cookbook]
new: true
---

**Message deduplication** is a feature of Pulsar that, when enabled, ensures that each message produced on Pulsar {% popover topics %} is persisted to disk *only once*, even if the message is produced more than once. Message deduplication essentially unburdens Pulsar applications of the responsibility of ensuring deduplication and instead handles it automatically on the server side.

Using message deduplication in Pulsar involves making some [configuration changes](#configuration) to your Pulsar brokers as well as some minor changes to the behavior of Pulsar [clients](#clients).

{% include admonition.html type="info" content="For a more thorough theoretical explanation of message deduplication, see the [Concepts and Architecture](../../getting-started/ConceptsAndArchitecture#message-deduplication) document." %}

## How it works

Message deduplication can be enabled and disabled on a per-{% popover namespace %} basis. By default, it is *disabled* on all namespaces and can enabled in the following ways:

* Using the [`pulsar-admin namespaces`](#enabling) interface
* As a {% popover broker %}-level [default](#default) for all namespaces

## Configuration for message deduplication {#configuration}

You can configure message deduplication in Pulsar using the [`broker.conf`](../../reference/Configuration#broker) configuration file. The following deduplication-related parameters are available:

Parameter | Description | Default
:---------|:------------|:-------
`brokerDeduplicationEnabled` | Sets the default behavior for message deduplication in the Pulsar {% popover broker %}. If set to `true`, message deduplication will be enabled by default on all namespaces; if set to `false` (the default), deduplication will have to be [enabled](#enabling) and [disabled](#disabling) on a per-namespace basis. | `false`
`brokerDeduplicationMaxNumberOfProducers` | The maximum number of producers for which information will be stored for deduplication purposes. | `10000`
`brokerDeduplicationEntriesInterval` | The number of entries after which a deduplication informational snapshot is taken. A larger interval will lead to fewer snapshots being taken, though this would also lengthen the topic recovery time (the time required for entries published after the snapshot to be replayed). | `1000`
`brokerDeduplicationProducerInactivityTimeoutMinutes` | The time of inactivity (in minutes) after which the broker will discard deduplication information related to a disconnected producer. | `360` (6 hours)

Any configuration changes you make won't take effect until you re-start the broker.

### Setting the broker-level default {#default}

By default, message deduplication is *disabled* on all Pulsar namespaces. To enable it by default on all namespaces, set the `brokerDeduplicationEnabled` parameter to `true` and re-start the broker.

Regardless of the value of `brokerDeduplicationEnabled`, [enabling](#enabling) and [disabling](#disabling) via the CLI will override the broker-level default.

### Enabling message deduplication {#enabling}

You can enable message deduplication on specific namespaces, regardless of the the [default](#default) for the broker, using the [`pulsar-admin namespace set-deduplication`](../../CliTools#pulsar-admin-namespace-set-deduplication) command. You can use the `--enable`/`-e` flag and specify the namespace. Here's an example:

```bash
$ bin/pulsar-admin namespaces set-deduplication \
persistent://sample/standalone/ns1/topic-1 \
--enable # or just -e
```

### Disabling message deduplication {#disabling}

You can disable message deduplication on a specific namespace using the same method shown [above](#enabling), except using the `--disable`/`-d` flag instead. Here's an example:

```bash
$ bin/pulsar-admin namespaces set-deduplication \
persistent://sample/standalone/ns1/topic-1 \
--disable # or just -d
```

## Message deduplication and Pulsar clients {#clients}

If you enable message deduplication in your Pulsar {% popover brokers %}, you won't need to make any major changes to your Pulsar clients. There are, however, two settings that you need to provide for your client {% popover producers %}:

1. The producer must be given a name
1. The message send timeout needs to be set to infinity (i.e. no timeout)

Instructions for [Java](#java), [Python](#python), and [C++](#cpp) clients can be found below.

### Java clients {#java}

To enable message deduplication on a [Java producer](../../clients/Java#producers), set the producer name using the `producerName` setter and set the timeout to 0 using the `sendTimeout` setter. Here's an example:

```java
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.concurrent.TimeUnit;

PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer producer = pulsarClient.newProducer()
.producerName("producer-1")
.topic("persistent://sample/standalone/ns1/topic-1")
.sendTimeout(0, TimeUnit.SECONDS)
.create();
```

### Python clients {#python}

To enable message deduplication on a [Python producer](../../clients/Python#producers), set the producer name using `producer_name` and the timeout to 0 using `send_timeout_millis`. Here's an example:

```python
import pulsar

client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer(
"persistent://sample/standalone/ns1/topic-1",
producer_name="producer-1",
send_timeout_millis=0)
```

## C++ clients {#cpp}

To enable message deduplication on a [C++ producer](../../clients/Cpp#producer), set the producer name using `producer_name` and the timeout to 0 using `send_timeout_millis`. Here's an example:

```cpp
#include <pulsar/Client.h>

std::string serviceUrl = "pulsar://localhost:6650";
std::string topic = "persistent://prop/unit/ns1/topic-1";
std::string producerName = "producer-1";

Client client(serviceUrl);

ProducerConfiguration producerConfig;
producerConfig.setSendTimeout(0);
producerConfig.setProducerName(producerName);

Producer producer;

Result result = client.createProducer("persistent://sample/standalone/ns1/my-topic", producerConfig, producer);
```
33 changes: 30 additions & 3 deletions site/docs/latest/getting-started/ConceptsAndArchitecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ title: Pulsar concepts and architecture
lead: A high-level overview of Pulsar's moving pieces
tags:
- architecture
- deduplication
---

<!--
Expand Down Expand Up @@ -315,7 +316,7 @@ Pulsar has two features, however, that enable you to override this default behav
* Message **retention** enables you to store messages that have been acknowledged by a consumer
* Message **expiry** enables you to set a time to live (TTL) for messages that have not yet been acknowledged

{% include admonition.html type="info" content='All message retention and expiry is managed at the [namespace](#namespaces) level. For a how-to, see the [Message retention and expiry](../../advanced/RetentionExpiry) admin documentation.' %}
{% include admonition.html type="info" content='All message retention and expiry is managed at the [namespace](#namespaces) level. For a how-to, see the [Message retention and expiry](../../cookbooks/RetentionExpiry) cookbook.' %}

The diagram below illustrates both concepts:

Expand All @@ -333,9 +334,35 @@ For an in-depth look at Pulsar Functions, see the [Pulsar Functions overview](..

Pulsar enables messages to be produced and consumed in different geo-locations. For instance, your application may be publishing data in one region or market and you would like to process it for consumption in other regions or markets. [Geo-replication](../../admin/GeoReplication) in Pulsar enables you to do that.

## Message deduplication

Message **duplication** occurs when a message is [persisted](#persistent-storage) by Pulsar more than once. Message ***de*duplication** is an optional Pulsar feature that prevents unnecessary message duplication by processing each message only once, *even if the message is received more than once*.

The following diagram illustrates what happens when message deduplication is disabled vs. enabled:

{% img /img/message-deduplication.png 75 %}

Message deduplication is disabled in the scenario shown at the top. Here, a producer publishes message 1 on a topic; the message reaches a Pulsar {% popover broker %} and is [persisted](#persistent-storage) to BookKeeper. The producer then sends message 1 again (in this case due to some retry logic), and the message is received by the broker and stored in BookKeeper again, which means that duplication has occurred.

In the second scenario at the bottom, the producer publishes message 1, which is received by the broker and persisted, as in the first scenario. When the producer attempts to publish the message again, however, the broker knows that it has already seen message 1 and thus does not persist the message.

{% include admonition.html type="info" content='Message deduplication is handled at the namespace level. For more instructions, see the [message deduplication cookbook](../../cookbooks/message-deduplication).' %}

### Producer idempotency

The other available approach to message deduplication is to ensure that each message is *only produced once*. This approach is typically called **producer idempotency**. The drawback of this approach is that it defers the work of message deduplication to the application. In Pulsar, this is handled at the {% popover broker %} level, which means that you don't need to modify your Pulsar client code. Instead, you only need to make administrative changes (see the [Managing message deduplication](../../cookbooks/message-deduplication) cookbook for a guide).

### Deduplication and effectively-once semantics

Message deduplication makes Pulsar an ideal messaging system to be used in conjunction with stream processing engines (SPEs) and other systems seeking to provide [effectively-once](https://blog.streaml.io/exactly-once/) processing semantics. Messaging systems that don't offer automatic message deduplication require the SPE or other system to guarantee deduplication, which means that strict message ordering comes at the cost of burdening the application with the responsibility of deduplication. With Pulsar, strict ordering guarantees come at no application-level cost.

{% include admonition.html type="info" content='
More in-depth information can be found in [this post](https://blog.streaml.io/pulsar-effectively-once/) on the [Streamlio blog](https://blog.streaml.io).
' %}

## Multi-tenancy

Pulsar was created from the ground up as a {% popover multi-tenant %} system. To support multi-tenancy, Pulsar has a concept of {% popover properties %}. Properties can be spread across {% popover clusters %} and can each have their own [authentication and authorization](../../admin/Authz) scheme applied to them. They are also the administrative unit at which [storage quotas](TODO), [message TTL](../../advanced/RetentionExpiry#time-to-live-ttl), and isolation policies can be managed.
Pulsar was created from the ground up as a {% popover multi-tenant %} system. To support multi-tenancy, Pulsar has a concept of {% popover properties %}. Properties can be spread across {% popover clusters %} and can each have their own [authentication and authorization](../../admin/Authz) scheme applied to them. They are also the administrative unit at which [storage quotas](TODO), [message TTL](../../cookbooks/RetentionExpiry#time-to-live-ttl), and isolation policies can be managed.

The multi-tenant nature of Pulsar is reflected mostly visibly in topic URLs, which have this structure:

Expand Down Expand Up @@ -411,7 +438,7 @@ The **reader interface** for Pulsar enables applications to manually manage curs
* The **latest** available message in the topic
* Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache.

The reader interface is helpful for use cases like using Pulsar to provide [effectively-once](https://streaml.io/blog/exactly-once/) processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic.
The reader interface is helpful for use cases like using Pulsar to provide [effectively-once](https://blog.streaml.io/exactly-once/) processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic.

<img src="/img/pulsar-reader-consumer-interfaces.png" alt="The Pulsar consumer and reader interfaces" width="80%">

Expand Down
Binary file added site/img/message-deduplication.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit ec210cb

Please sign in to comment.