Skip to content

Commit

Permalink
[website][upgrade]feat: website upgrade / docs migration - 2.6.2 /Ada…
Browse files Browse the repository at this point in the history
…ptors/ Cookbooks/Development/Reference (apache#12879)

* commit the first part

* commit part 2

* commit part3

* commit part4

* commit part5

Co-authored-by: Anonymitaet <[email protected]>
  • Loading branch information
zeo1995 and Anonymitaet authored Nov 19, 2021
1 parent 7c906ad commit 9e6f31f
Show file tree
Hide file tree
Showing 23 changed files with 8,289 additions and 0 deletions.
274 changes: 274 additions & 0 deletions site2/website-next/versioned_docs/version-2.6.2/adaptors-kafka.md

Large diffs are not rendered by default.

79 changes: 79 additions & 0 deletions site2/website-next/versioned_docs/version-2.6.2/adaptors-spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
---
id: adaptors-spark
title: Pulsar adaptor for Apache Spark
sidebar_label: "Apache Spark"
original_id: adaptors-spark
---

The Spark Streaming receiver for Pulsar is a custom receiver that enables Apache [Spark Streaming](https://spark.apache.org/streaming/) to receive raw data from Pulsar.

An application can receive data in [Resilient Distributed Dataset](https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds) (RDD) format via the Spark Streaming receiver and can process it in a variety of ways.

## Prerequisites

To use the receiver, include a dependency for the `pulsar-spark` library in your Java configuration.

### Maven

If you're using Maven, add this to your `pom.xml`:

```xml

<!-- in your <properties> block -->
<pulsar.version>@pulsar:version@</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>

```

### Gradle

If you're using Gradle, add this to your `build.gradle` file:

```groovy
def pulsarVersion = "@pulsar:version@"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
```

## Usage

Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` method in `JavaStreamingContext`:

```java

String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));

ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();

Set<String> set = new HashSet();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);

SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());

JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

```

For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java). In this example, the number of messages that contain the string "Pulsar" in received messages is counted.

96 changes: 96 additions & 0 deletions site2/website-next/versioned_docs/version-2.6.2/adaptors-storm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
---
id: adaptors-storm
title: Pulsar adaptor for Apache Storm
sidebar_label: "Apache Storm"
original_id: adaptors-storm
---

Pulsar Storm is an adaptor for integrating with [Apache Storm](http://storm.apache.org/) topologies. It provides core Storm implementations for sending and receiving data.

An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.

## Using the Pulsar Storm Adaptor

Include dependency for Pulsar Storm Adaptor:

```xml

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-storm</artifactId>
<version>${pulsar.version}</version>
</dependency>

```

## Pulsar Spout

The Pulsar Spout allows for the data published on a topic to be consumed by a Storm topology. It emits a Storm tuple based on the message received and the `MessageToValuesMapper` provided by the client.

The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is acknowledged by the consumer. Here's an example construction of a spout:

```java

MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {

@Override
public Values toValues(Message msg) {
return new Values(new String(msg.getData()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
declarer.declare(new Fields("string"));
}
};

// Configure a Pulsar Spout
PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
spoutConf.setSubscriptionName("my-subscriber-name1");
spoutConf.setMessageToValuesMapper(messageToValuesMapper);

// Create a Pulsar Spout
PulsarSpout spout = new PulsarSpout(spoutConf);

```

For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java).

## Pulsar Bolt

The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the `TupleToMessageMapper` provided by the client.

A partitioned topic can also be used to publish messages on different topics. In the implementation of the `TupleToMessageMapper`, a "key" will need to be provided in the message which will send the messages with the same key to the same topic. Here's an example bolt:

```java

TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {

@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
String receivedMessage = tuple.getString(0);
// message processing
String processedMsg = receivedMessage + "-processed";
return msgBuilder.value(processedMsg.getBytes());
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
}
};

// Configure a Pulsar Bolt
PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
boltConf.setTupleToMessageMapper(tupleToMessageMapper);

// Create a Pulsar Bolt
PulsarBolt bolt = new PulsarBolt(boltConf);

```

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
id: cookbooks-bookkeepermetadata
title: BookKeeper Ledger Metadata
original_id: cookbooks-bookkeepermetadata
---

Pulsar stores data on BookKeeper ledgers, you can understand the contents of a ledger by inspecting the metadata attached to the ledger.
Such metadata are stored on ZooKeeper and they are readable using BookKeeper APIs.

Description of current metadata:

| Scope | Metadata name | Metadata value |
| ------------- | ------------- | ------------- |
| All ledgers | application | 'pulsar' |
| All ledgers | component | 'managed-ledger', 'schema', 'compacted-topic' |
| Managed ledgers | pulsar/managed-ledger | name of the ledger |
| Cursor | pulsar/cursor | name of the cursor |
| Compacted topic | pulsar/compactedTopic | name of the original topic |
| Compacted topic | pulsar/compactedTo | id of the last compacted message |


Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
---
id: cookbooks-compaction
title: Topic compaction
sidebar_label: "Topic compaction"
original_id: cookbooks-compaction
---

Pulsar's [topic compaction](concepts-topic-compaction.md#compaction) feature enables you to create **compacted** topics in which older, "obscured" entries are pruned from the topic, allowing for faster reads through the topic's history (which messages are deemed obscured/outdated/irrelevant will depend on your use case).

To use compaction:

* You need to give messages keys, as topic compaction in Pulsar takes place on a *per-key basis* (i.e. messages are compacted based on their key). For a stock ticker use case, the stock symbol---e.g. `AAPL` or `GOOG`---could serve as the key (more on this [below](#when-should-i-use-compacted-topics)). Messages without keys will be left alone by the compaction process.
* Compaction can be configured to run [automatically](#configuring-compaction-to-run-automatically), or you can manually [trigger](#trigger) compaction using the Pulsar administrative API.
* Your consumers must be [configured](#consumer-configuration) to read from compacted topics ([Java consumers](#java), for example, have a `readCompacted` setting that must be set to `true`). If this configuration is not set, consumers will still be able to read from the non-compacted topic.


> Compaction only works on messages that have keys (as in the stock ticker example the stock symbol serves as the key for each message). Keys can thus be thought of as the axis along which compaction is applied. Messages that don't have keys are simply ignored by compaction.
## When should I use compacted topics?

The classic example of a topic that could benefit from compaction would be a stock ticker topic through which consumers can access up-to-date values for specific stocks. Imagine a scenario in which messages carrying stock value data use the stock symbol as the key (`GOOG`, `AAPL`, `TWTR`, etc.). Compacting this topic would give consumers on the topic two options:

* They can read from the "original," non-compacted topic in case they need access to "historical" values, i.e. the entirety of the topic's messages.
* They can read from the compacted topic if they only want to see the most up-to-date messages.

Thus, if you're using a Pulsar topic called `stock-values`, some consumers could have access to all messages in the topic (perhaps because they're performing some kind of number crunching of all values in the last hour) while the consumers used to power the real-time stock ticker only see the compacted topic (and thus aren't forced to process outdated messages). Which variant of the topic any given consumer pulls messages from is determined by the consumer's [configuration](#consumer-configuration).

> One of the benefits of compaction in Pulsar is that you aren't forced to choose between compacted and non-compacted topics, as the compaction process leaves the original topic as-is and essentially adds an alternate topic. In other words, you can run compaction on a topic and consumers that need access to the non-compacted version of the topic will not be adversely affected.

## Configuring compaction to run automatically

Tenant administrators can configure a policy for compaction at the namespace level. The policy specifies how large the topic backlog can grow before compaction is triggered.

For example, to trigger compaction when the backlog reaches 100MB:

```bash

$ bin/pulsar-admin namespaces set-compaction-threshold \
--threshold 100M my-tenant/my-namespace

```

Configuring the compaction threshold on a namespace will apply to all topics within that namespace.

## Triggering compaction manually

In order to run compaction on a topic, you need to use the [`topics compact`](reference-pulsar-admin.md#topics-compact) command for the [`pulsar-admin`](reference-pulsar-admin) CLI tool. Here's an example:

```bash

$ bin/pulsar-admin topics compact \
persistent://my-tenant/my-namespace/my-topic

```

The `pulsar-admin` tool runs compaction via the Pulsar {@inject: rest:REST:/} API. To run compaction in its own dedicated process, i.e. *not* through the REST API, you can use the [`pulsar compact-topic`](reference-cli-tools.md#pulsar-compact-topic) command. Here's an example:

```bash

$ bin/pulsar compact-topic \
--topic persistent://my-tenant-namespace/my-topic

```

> Running compaction in its own process is recommended when you want to avoid interfering with the broker's performance. Broker performance should only be affected, however, when running compaction on topics with a large keyspace (i.e when there are many keys on the topic). The first phase of the compaction process keeps a copy of each key in the topic, which can create memory pressure as the number of keys grows. Using the `pulsar-admin topics compact` command to run compaction through the REST API should present no issues in the overwhelming majority of cases; using `pulsar compact-topic` should correspondingly be considered an edge case.
The `pulsar compact-topic` command communicates with [ZooKeeper](https://zookeeper.apache.org) directly. In order to establish communication with ZooKeeper, though, the `pulsar` CLI tool will need to have a valid [broker configuration](reference-configuration.md#broker). You can either supply a proper configuration in `conf/broker.conf` or specify a non-default location for the configuration:

```bash

$ bin/pulsar compact-topic \
--broker-conf /path/to/broker.conf \
--topic persistent://my-tenant/my-namespace/my-topic

# If the configuration is in conf/broker.conf
$ bin/pulsar compact-topic \
--topic persistent://my-tenant/my-namespace/my-topic

```

#### When should I trigger compaction?

How often you [trigger compaction](#triggering-compaction-manually) will vary widely based on the use case. If you want a compacted topic to be extremely speedy on read, then you should run compaction fairly frequently.

## Consumer configuration

Pulsar consumers and readers need to be configured to read from compacted topics. The sections below show you how to enable compacted topic reads for Pulsar's language clients. If the

### Java

In order to read from a compacted topic using a Java consumer, the `readCompacted` parameter must be set to `true`. Here's an example consumer for a compacted topic:

```java

Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
.topic("some-compacted-topic")
.readCompacted(true)
.subscribe();

```

As mentioned above, topic compaction in Pulsar works on a *per-key basis*. That means that messages that you produce on compacted topics need to have keys (the content of the key will depend on your use case). Messages that don't have keys will be ignored by the compaction process. Here's an example Pulsar message with a key:

```java

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;

Message<byte[]> msg = MessageBuilder.create()
.setContent(someByteArray)
.setKey("some-key")
.build();

```

The example below shows a message with a key being produced on a compacted Pulsar topic:

```java

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();

Producer<byte[]> compactedTopicProducer = client.newProducer()
.topic("some-compacted-topic")
.create();

Message<byte[]> msg = MessageBuilder.create()
.setContent(someByteArray)
.setKey("some-key")
.build();

compactedTopicProducer.send(msg);

```

Loading

0 comments on commit 9e6f31f

Please sign in to comment.