Skip to content

Commit

Permalink
Add 2.7.3 docs for repo (apache#10666)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfstudy authored May 26, 2021
1 parent c48b285 commit e0fef93
Show file tree
Hide file tree
Showing 159 changed files with 38,973 additions and 0 deletions.
265 changes: 265 additions & 0 deletions site2/website/versioned_docs/version-2.7.3/adaptors-kafka.md

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions site2/website/versioned_docs/version-2.7.3/adaptors-spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
id: version-2.7.3-adaptors-spark
title: Pulsar adaptor for Apache Spark
sidebar_label: Apache Spark
original_id: adaptors-spark
---

The Spark Streaming receiver for Pulsar is a custom receiver that enables Apache [Spark Streaming](https://spark.apache.org/streaming/) to receive data from Pulsar.

An application can receive data in [Resilient Distributed Dataset](https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds) (RDD) format via the Spark Streaming Pulsar receiver and can process it in a variety of ways.

## Prerequisites

To use the receiver, include a dependency for the `pulsar-spark` library in your Java configuration.

### Maven

If you're using Maven, add this to your `pom.xml`:

```xml
<!-- in your <properties> block -->
<pulsar.version>{{pulsar:version}}</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>
```

### Gradle

If you're using Gradle, add this to your `build.gradle` file:

```groovy
def pulsarVersion = "{{pulsar:version}}"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
```

## Usage

Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` method in `JavaStreamingContext`:

```java
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));

ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();

Set<String> set = new HashSet<>();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);

SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());

JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
```

For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java). In this example, the number of messages that contain the string "Pulsar" in received messages is counted.

89 changes: 89 additions & 0 deletions site2/website/versioned_docs/version-2.7.3/adaptors-storm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
id: version-2.7.3-adaptors-storm
title: Pulsar adaptor for Apache Storm
sidebar_label: Apache Storm
original_id: adaptors-storm
---

Pulsar Storm is an adaptor for integrating with [Apache Storm](http://storm.apache.org/) topologies. It provides core Storm implementations for sending and receiving data.

An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.

## Using the Pulsar Storm Adaptor

Include dependency for Pulsar Storm Adaptor:

```xml
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-storm</artifactId>
<version>${pulsar.version}</version>
</dependency>
```

## Pulsar Spout

The Pulsar Spout allows for the data published on a topic to be consumed by a Storm topology. It emits a Storm tuple based on the message received and the `MessageToValuesMapper` provided by the client.

The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is acknowledged by the consumer. Here's an example construction of a spout:

```java
MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {

@Override
public Values toValues(Message msg) {
return new Values(new String(msg.getData()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
declarer.declare(new Fields("string"));
}
};

// Configure a Pulsar Spout
PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");
spoutConf.setSubscriptionName("my-subscriber-name1");
spoutConf.setMessageToValuesMapper(messageToValuesMapper);

// Create a Pulsar Spout
PulsarSpout spout = new PulsarSpout(spoutConf);
```

For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java).

## Pulsar Bolt

The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the `TupleToMessageMapper` provided by the client.

A partitioned topic can also be used to publish messages on different topics. In the implementation of the `TupleToMessageMapper`, a "key" will need to be provided in the message which will send the messages with the same key to the same topic. Here's an example bolt:

```java
TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {

@Override
public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
String receivedMessage = tuple.getString(0);
// message processing
String processedMsg = receivedMessage + "-processed";
return msgBuilder.value(processedMsg.getBytes());
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declare the output fields
}
};

// Configure a Pulsar Bolt
PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");
boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");
boltConf.setTupleToMessageMapper(tupleToMessageMapper);

// Create a Pulsar Bolt
PulsarBolt bolt = new PulsarBolt(boltConf);
```
158 changes: 158 additions & 0 deletions site2/website/versioned_docs/version-2.7.3/admin-api-brokers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
---
id: version-2.7.3-admin-api-brokers
title: Managing Brokers
sidebar_label: Brokers
original_id: admin-api-brokers
---

Pulsar brokers consist of two components:

1. An HTTP server exposing a {@inject: rest:REST:/} interface administration and [topic](reference-terminology.md#topic) lookup.
2. A dispatcher that handles all Pulsar [message](reference-terminology.md#message) transfers.

[Brokers](reference-terminology.md#broker) can be managed via:

* The [`brokers`](reference-pulsar-admin.md#brokers) command of the [`pulsar-admin`](reference-pulsar-admin.md) tool
* The `/admin/v2/brokers` endpoint of the admin {@inject: rest:REST:/} API
* The `brokers` method of the {@inject: javadoc:PulsarAdmin:/admin/org/apache/pulsar/client/admin/PulsarAdmin.html} object in the [Java API](client-libraries-java.md)

In addition to being configurable when you start them up, brokers can also be [dynamically configured](#dynamic-broker-configuration).

> See the [Configuration](reference-configuration.md#broker) page for a full listing of broker-specific configuration parameters.
## Brokers resources

### List active brokers

Fetch all available active brokers that are serving traffic.

<!--DOCUSAURUS_CODE_TABS-->
<!--pulsar-admin-->

```shell
$ pulsar-admin brokers list use
```

```
broker1.use.org.com:8080
```

<!--REST API-->

{@inject: endpoint|GET|/admin/v2/brokers/:cluster|operation/getActiveBrokers?version=[[pulsar:version_number]]}

<!--JAVA-->

```java
admin.brokers().getActiveBrokers(clusterName)
```

<!--END_DOCUSAURUS_CODE_TABS-->

#### list of namespaces owned by a given broker

It finds all namespaces which are owned and served by a given broker.

<!--DOCUSAURUS_CODE_TABS-->
<!--pulsar-admin-->

```shell
$ pulsar-admin brokers namespaces use \
--url broker1.use.org.com:8080
```

```json
{
"my-property/use/my-ns/0x00000000_0xffffffff": {
"broker_assignment": "shared",
"is_controlled": false,
"is_active": true
}
}
```
<!--REST API-->

{@inject: endpoint|GET|/admin/v2/brokers/:cluster/:broker/ownedNamespaces|operation/getOwnedNamespaes?version=[[pulsar:version_number]]}

<!--JAVA-->

```java
admin.brokers().getOwnedNamespaces(cluster,brokerUrl);
```
<!--END_DOCUSAURUS_CODE_TABS-->

### Dynamic broker configuration

One way to configure a Pulsar [broker](reference-terminology.md#broker) is to supply a [configuration](reference-configuration.md#broker) when the broker is [started up](reference-cli-tools.md#pulsar-broker).

But since all broker configuration in Pulsar is stored in ZooKeeper, configuration values can also be dynamically updated *while the broker is running*. When you update broker configuration dynamically, ZooKeeper will notify the broker of the change and the broker will then override any existing configuration values.

* The [`brokers`](reference-pulsar-admin.md#brokers) command for the [`pulsar-admin`](reference-pulsar-admin.md) tool has a variety of subcommands that enable you to manipulate a broker's configuration dynamically, enabling you to [update config values](#update-dynamic-configuration) and more.
* In the Pulsar admin {@inject: rest:REST:/} API, dynamic configuration is managed through the `/admin/v2/brokers/configuration` endpoint.

### Update dynamic configuration

<!--DOCUSAURUS_CODE_TABS-->
<!--pulsar-admin-->

The [`update-dynamic-config`](reference-pulsar-admin.md#brokers-update-dynamic-config) subcommand will update existing configuration. It takes two arguments: the name of the parameter and the new value using the `config` and `value` flag respectively. Here's an example for the [`brokerShutdownTimeoutMs`](reference-configuration.md#broker-brokerShutdownTimeoutMs) parameter:

```shell
$ pulsar-admin brokers update-dynamic-config --config brokerShutdownTimeoutMs --value 100
```

<!--REST API-->

{@inject: endpoint|POST|/admin/v2/brokers/configuration/:configName/:configValue|operation/updateDynamicConfiguration?version=[[pulsar:version_number]]}

<!--JAVA-->

```java
admin.brokers().updateDynamicConfiguration(configName, configValue);
```
<!--END_DOCUSAURUS_CODE_TABS-->

### List updated values

Fetch a list of all potentially updatable configuration parameters.
<!--DOCUSAURUS_CODE_TABS-->
<!--pulsar-admin-->

```shell
$ pulsar-admin brokers list-dynamic-config
brokerShutdownTimeoutMs
```

<!--REST API-->

{@inject: endpoint|GET|/admin/v2/brokers/configuration|operation/getDynamicConfigurationName?version=[[pulsar:version_number]]}

<!--JAVA-->

```java
admin.brokers().getDynamicConfigurationNames();
```
<!--END_DOCUSAURUS_CODE_TABS-->

### List all

Fetch a list of all parameters that have been dynamically updated.

<!--DOCUSAURUS_CODE_TABS-->
<!--pulsar-admin-->

```shell
$ pulsar-admin brokers get-all-dynamic-config
brokerShutdownTimeoutMs:100
```

<!--REST API-->

{@inject: endpoint|GET|/admin/v2/brokers/configuration/values|operation/getAllDynamicConfigurations?version=[[pulsar:version_number]]}

<!--JAVA-->

```java
admin.brokers().getAllDynamicConfigurations();
```
<!--END_DOCUSAURUS_CODE_TABS-->
Loading

0 comments on commit e0fef93

Please sign in to comment.