Skip to content

Commit

Permalink
[website][upgrade]feat: website upgrade / docs migration - 2.6.3 / ad…
Browse files Browse the repository at this point in the history
…min/adaptors/cookbooks (apache#12864)

* [website][upgrade]feat: website upgrade / docs migration - 2.6.3 / functions/io/sql

Signed-off-by: LiLi <[email protected]>

* [website][upgrade]feat: website upgrade / docs migration - 2.6.3 / helm/deployment/administration

Signed-off-by: LiLi <[email protected]>

* [website][upgrade]feat: website upgrade / docs migration - 2.6.3 / security/performance/clients

Signed-off-by: LiLi <[email protected]>

* [website][upgrade]feat: website upgrade / docs migration - 2.6.3 / admin/adaptors/cookbooks

Signed-off-by: LiLi <[email protected]>

Co-authored-by: Anonymitaet <[email protected]>
  • Loading branch information
urfreespace and Anonymitaet authored Nov 19, 2021
1 parent 09571b9 commit 1fa1e05
Show file tree
Hide file tree
Showing 24 changed files with 5,985 additions and 0 deletions.
274 changes: 274 additions & 0 deletions site2/website-next/versioned_docs/version-2.6.3/adaptors-kafka.md

Large diffs are not rendered by default.

78 changes: 78 additions & 0 deletions site2/website-next/versioned_docs/version-2.6.3/adaptors-spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
id: adaptors-spark
title: Pulsar adaptor for Apache Spark
sidebar_label: "Apache Spark"
original_id: adaptors-spark
---

The Spark Streaming receiver for Pulsar is a custom receiver that enables Apache [Spark Streaming](https://spark.apache.org/streaming/) to receive raw data from Pulsar.

An application can receive data in [Resilient Distributed Dataset](https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds) (RDD) format via the Spark Streaming receiver and can process it in a variety of ways.

## Prerequisites

To use the receiver, include a dependency for the `pulsar-spark` library in your Java configuration.

### Maven

If you're using Maven, add this to your `pom.xml`:

```xml

<!-- in your <properties> block -->
<pulsar.version>@pulsar:version@</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>

```

### Gradle

If you're using Gradle, add this to your `build.gradle` file:

```groovy
def pulsarVersion = "@pulsar:version@"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
```

## Usage

Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` method in `JavaStreamingContext`:

```java

String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));

ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();

Set<String> set = new HashSet();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);

SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());

JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

```

For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java). In this example, the number of messages that contain the string "Pulsar" in received messages is counted.
96 changes: 96 additions & 0 deletions site2/website-next/versioned_docs/version-2.6.3/adaptors-storm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
---
id: adaptors-storm
title: Pulsar adaptor for Apache Storm
sidebar_label: "Apache Storm"
original_id: adaptors-storm
---

Pulsar Storm is an adaptor for integrating with [Apache Storm](http://storm.apache.org/) topologies. It provides core Storm implementations for sending and receiving data.

An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.

## Using the Pulsar Storm Adaptor

Include dependency for Pulsar Storm Adaptor:

```xml

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-storm</artifactId>
<version>${pulsar.version}</version>
</dependency>

```

## Pulsar Spout

The Pulsar Spout allows for the data published on a topic to be consumed by a Storm topology. It emits a Storm tuple based on the message received and the `MessageToValuesMapper` provided by the client.

The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is acknowledged by the consumer. Here's an example construction of a spout:

```java

MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {

@Override
public Values toValues(Message msg) {
return new Values(new String(msg.getData()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
declarer.declare(new Fields("string"));
}
};

// Configure a Pulsar Spout
PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
spoutConf.setSubscriptionName("my-subscriber-name1");
spoutConf.setMessageToValuesMapper(messageToValuesMapper);

// Create a Pulsar Spout
PulsarSpout spout = new PulsarSpout(spoutConf);

```

For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java).

## Pulsar Bolt

The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the `TupleToMessageMapper` provided by the client.

A partitioned topic can also be used to publish messages on different topics. In the implementation of the `TupleToMessageMapper`, a "key" will need to be provided in the message which will send the messages with the same key to the same topic. Here's an example bolt:

```java

TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {

@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
String receivedMessage = tuple.getString(0);
// message processing
String processedMsg = receivedMessage + "-processed";
return msgBuilder.value(processedMsg.getBytes());
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
}
};

// Configure a Pulsar Bolt
PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
boltConf.setTupleToMessageMapper(tupleToMessageMapper);

// Create a Pulsar Bolt
PulsarBolt bolt = new PulsarBolt(boltConf);

```

174 changes: 174 additions & 0 deletions site2/website-next/versioned_docs/version-2.6.3/admin-api-brokers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
---
id: admin-api-brokers
title: Managing Brokers
sidebar_label: "Brokers"
original_id: admin-api-brokers
---

Pulsar brokers consist of two components:

1. An HTTP server exposing a {@inject: rest:REST:/} interface administration and [topic](reference-terminology.md#topic) lookup.
2. A dispatcher that handles all Pulsar [message](reference-terminology.md#message) transfers.

[Brokers](reference-terminology.md#broker) can be managed via:

* The [`brokers`](reference-pulsar-admin.md#brokers) command of the [`pulsar-admin`](reference-pulsar-admin) tool
* The `/admin/v2/brokers` endpoint of the admin {@inject: rest:REST:/} API
* The `brokers` method of the {@inject: javadoc:PulsarAdmin:/admin/org/apache/pulsar/client/admin/PulsarAdmin.html} object in the [Java API](client-libraries-java)

In addition to being configurable when you start them up, brokers can also be [dynamically configured](#dynamic-broker-configuration).

> See the [Configuration](reference-configuration.md#broker) page for a full listing of broker-specific configuration parameters.
## Brokers resources

### List active brokers

Fetch all available active brokers that are serving traffic.

#### pulsar-admin

```shell

$ pulsar-admin brokers list use

```

```
broker1.use.org.com:8080
```

###### REST

{@inject: endpoint|GET|/admin/v2/brokers/:cluster|operation/getActiveBrokers?version=@pulsar:version_number@}

###### Java

```java

admin.brokers().getActiveBrokers(clusterName)

```

#### list of namespaces owned by a given broker

It finds all namespaces which are owned and served by a given broker.

###### CLI

```shell

$ pulsar-admin brokers namespaces use \
--url broker1.use.org.com:8080

```

```json

{
"my-property/use/my-ns/0x00000000_0xffffffff": {
"broker_assignment": "shared",
"is_controlled": false,
"is_active": true
}
}

```

###### REST

{@inject: endpoint|GET|/admin/v2/brokers/:cluster/:broker/ownedNamespaces|operation/getOwnedNamespaes?version=@pulsar:version_number@}

###### Java

```java

admin.brokers().getOwnedNamespaces(cluster,brokerUrl);

```

### Dynamic broker configuration

One way to configure a Pulsar [broker](reference-terminology.md#broker) is to supply a [configuration](reference-configuration.md#broker) when the broker is [started up](reference-cli-tools.md#pulsar-broker).

But since all broker configuration in Pulsar is stored in ZooKeeper, configuration values can also be dynamically updated *while the broker is running*. When you update broker configuration dynamically, ZooKeeper will notify the broker of the change and the broker will then override any existing configuration values.

* The [`brokers`](reference-pulsar-admin.md#brokers) command for the [`pulsar-admin`](reference-pulsar-admin) tool has a variety of subcommands that enable you to manipulate a broker's configuration dynamically, enabling you to [update config values](#update-dynamic-configuration) and more.
* In the Pulsar admin {@inject: rest:REST:/} API, dynamic configuration is managed through the `/admin/v2/brokers/configuration` endpoint.

### Update dynamic configuration

#### pulsar-admin

The [`update-dynamic-config`](reference-pulsar-admin.md#brokers-update-dynamic-config) subcommand will update existing configuration. It takes two arguments: the name of the parameter and the new value using the `config` and `value` flag respectively. Here's an example for the [`brokerShutdownTimeoutMs`](reference-configuration.md#broker-brokerShutdownTimeoutMs) parameter:

```shell

$ pulsar-admin brokers update-dynamic-config --config brokerShutdownTimeoutMs --value 100

```

#### REST API

{@inject: endpoint|POST|/admin/v2/brokers/configuration/:configName/:configValue|operation/updateDynamicConfiguration?version=@pulsar:version_number@}

#### Java

```java

admin.brokers().updateDynamicConfiguration(configName, configValue);

```

### List updated values

Fetch a list of all potentially updatable configuration parameters.

#### pulsar-admin

```shell

$ pulsar-admin brokers list-dynamic-config
brokerShutdownTimeoutMs

```

#### REST API

{@inject: endpoint|GET|/admin/v2/brokers/configuration|operation/getDynamicConfigurationName?version=@pulsar:version_number@}

#### Java

```java

admin.brokers().getDynamicConfigurationNames();

```

### List all

Fetch a list of all parameters that have been dynamically updated.

#### pulsar-admin

```shell

$ pulsar-admin brokers get-all-dynamic-config
brokerShutdownTimeoutMs:100

```

#### REST API

{@inject: endpoint|GET|/admin/v2/brokers/configuration/values|operation/getAllDynamicConfigurations?version=@pulsar:version_number@}

#### Java

```java

admin.brokers().getAllDynamicConfigurations();

```

Loading

0 comments on commit 1fa1e05

Please sign in to comment.