diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-bookkeepermetadata.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-bookkeepermetadata.md new file mode 100644 index 0000000000000..ee8df2ba553dc --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-bookkeepermetadata.md @@ -0,0 +1,25 @@ +--- +id: cookbooks-bookkeepermetadata +title: BookKeeper Ledger Metadata +original_id: cookbooks-bookkeepermetadata +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +Pulsar stores data on BookKeeper ledgers, you can understand the contents of a ledger by inspecting the metadata attached to the ledger. +Such metadata are stored on ZooKeeper and they are readable using BookKeeper APIs. + +Description of current metadata: + +| Scope | Metadata name | Metadata value | +| ------------- | ------------- | ------------- | +| All ledgers | application | 'pulsar' | +| All ledgers | component | 'managed-ledger', 'schema', 'compacted-topic' | +| Managed ledgers | pulsar/managed-ledger | name of the ledger | +| Cursor | pulsar/cursor | name of the cursor | +| Compacted topic | pulsar/compactedTopic | name of the original topic | +| Compacted topic | pulsar/compactedTo | id of the last compacted message | + + diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-compaction.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-compaction.md new file mode 100644 index 0000000000000..ebcf4ba3237a7 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-compaction.md @@ -0,0 +1,146 @@ +--- +id: cookbooks-compaction +title: Topic compaction +sidebar_label: "Topic compaction" +original_id: cookbooks-compaction +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +Pulsar's [topic compaction](concepts-topic-compaction.md#compaction) feature enables you to create **compacted** topics in which older, "obscured" entries are pruned from the topic, allowing for faster reads through the topic's history (which messages are deemed obscured/outdated/irrelevant will depend on your use case). + +To use compaction: + +* You need to give messages keys, as topic compaction in Pulsar takes place on a *per-key basis* (i.e. messages are compacted based on their key). For a stock ticker use case, the stock symbol---e.g. `AAPL` or `GOOG`---could serve as the key (more on this [below](#when-should-i-use-compacted-topics)). Messages without keys will be left alone by the compaction process. +* Compaction can be configured to run [automatically](#configuring-compaction-to-run-automatically), or you can manually [trigger](#triggering-compaction-manually) compaction using the Pulsar administrative API. +* Your consumers must be [configured](#consumer-configuration) to read from compacted topics ([Java consumers](#java), for example, have a `readCompacted` setting that must be set to `true`). If this configuration is not set, consumers will still be able to read from the non-compacted topic. + + +> Compaction only works on messages that have keys (as in the stock ticker example the stock symbol serves as the key for each message). Keys can thus be thought of as the axis along which compaction is applied. Messages that don't have keys are simply ignored by compaction. + +## When should I use compacted topics? + +The classic example of a topic that could benefit from compaction would be a stock ticker topic through which consumers can access up-to-date values for specific stocks. Imagine a scenario in which messages carrying stock value data use the stock symbol as the key (`GOOG`, `AAPL`, `TWTR`, etc.). Compacting this topic would give consumers on the topic two options: + +* They can read from the "original," non-compacted topic in case they need access to "historical" values, i.e. the entirety of the topic's messages. +* They can read from the compacted topic if they only want to see the most up-to-date messages. + +Thus, if you're using a Pulsar topic called `stock-values`, some consumers could have access to all messages in the topic (perhaps because they're performing some kind of number crunching of all values in the last hour) while the consumers used to power the real-time stock ticker only see the compacted topic (and thus aren't forced to process outdated messages). Which variant of the topic any given consumer pulls messages from is determined by the consumer's [configuration](#consumer-configuration). + +> One of the benefits of compaction in Pulsar is that you aren't forced to choose between compacted and non-compacted topics, as the compaction process leaves the original topic as-is and essentially adds an alternate topic. In other words, you can run compaction on a topic and consumers that need access to the non-compacted version of the topic will not be adversely affected. + + +## Configuring compaction to run automatically + +Tenant administrators can configure a policy for compaction at the namespace level. The policy specifies how large the topic backlog can grow before compaction is triggered. + +For example, to trigger compaction when the backlog reaches 100MB: + +```bash + +$ bin/pulsar-admin namespaces set-compaction-threshold \ + --threshold 100M my-tenant/my-namespace + +``` + +Configuring the compaction threshold on a namespace will apply to all topics within that namespace. + +## Triggering compaction manually + +In order to run compaction on a topic, you need to use the [`topics compact`](reference-pulsar-admin.md#topics-compact) command for the [`pulsar-admin`](reference-pulsar-admin) CLI tool. Here's an example: + +```bash + +$ bin/pulsar-admin topics compact \ + persistent://my-tenant/my-namespace/my-topic + +``` + +The `pulsar-admin` tool runs compaction via the Pulsar {@inject: rest:REST:/} API. To run compaction in its own dedicated process, i.e. *not* through the REST API, you can use the [`pulsar compact-topic`](reference-cli-tools.md#pulsar-compact-topic) command. Here's an example: + +```bash + +$ bin/pulsar compact-topic \ + --topic persistent://my-tenant-namespace/my-topic + +``` + +> Running compaction in its own process is recommended when you want to avoid interfering with the broker's performance. Broker performance should only be affected, however, when running compaction on topics with a large keyspace (i.e when there are many keys on the topic). The first phase of the compaction process keeps a copy of each key in the topic, which can create memory pressure as the number of keys grows. Using the `pulsar-admin topics compact` command to run compaction through the REST API should present no issues in the overwhelming majority of cases; using `pulsar compact-topic` should correspondingly be considered an edge case. + +The `pulsar compact-topic` command communicates with [ZooKeeper](https://zookeeper.apache.org) directly. In order to establish communication with ZooKeeper, though, the `pulsar` CLI tool will need to have a valid [broker configuration](reference-configuration.md#broker). You can either supply a proper configuration in `conf/broker.conf` or specify a non-default location for the configuration: + +```bash + +$ bin/pulsar compact-topic \ + --broker-conf /path/to/broker.conf \ + --topic persistent://my-tenant/my-namespace/my-topic + +# If the configuration is in conf/broker.conf +$ bin/pulsar compact-topic \ + --topic persistent://my-tenant/my-namespace/my-topic + +``` + +#### When should I trigger compaction? + +How often you [trigger compaction](#triggering-compaction-manually) will vary widely based on the use case. If you want a compacted topic to be extremely speedy on read, then you should run compaction fairly frequently. + +## Consumer configuration + +Pulsar consumers and readers need to be configured to read from compacted topics. The sections below show you how to enable compacted topic reads for Pulsar's language clients. If the + +### Java + +In order to read from a compacted topic using a Java consumer, the `readCompacted` parameter must be set to `true`. Here's an example consumer for a compacted topic: + +```java + +Consumer compactedTopicConsumer = client.newConsumer() + .topic("some-compacted-topic") + .readCompacted(true) + .subscribe(); + +``` + +As mentioned above, topic compaction in Pulsar works on a *per-key basis*. That means that messages that you produce on compacted topics need to have keys (the content of the key will depend on your use case). Messages that don't have keys will be ignored by the compaction process. Here's an example Pulsar message with a key: + +```java + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; + +Message msg = MessageBuilder.create() + .setContent(someByteArray) + .setKey("some-key") + .build(); + +``` + +The example below shows a message with a key being produced on a compacted Pulsar topic: + +```java + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; + +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + +Producer compactedTopicProducer = client.newProducer() + .topic("some-compacted-topic") + .create(); + +Message msg = MessageBuilder.create() + .setContent(someByteArray) + .setKey("some-key") + .build(); + +compactedTopicProducer.send(msg); + +``` + diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-deduplication.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-deduplication.md new file mode 100644 index 0000000000000..f6dc69a52ab2c --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-deduplication.md @@ -0,0 +1,159 @@ +--- +id: cookbooks-deduplication +title: Message deduplication +sidebar_label: "Message deduplication" +original_id: cookbooks-deduplication +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +When **Message deduplication** is enabled, it ensures that each message produced on Pulsar topics is persisted to disk *only once*, even if the message is produced more than once. Message deduplication is handled automatically on the server side. + +To use message deduplication in Pulsar, you need to configure your Pulsar brokers and clients. + +## How it works + +You can enable or disable message deduplication at the namespace level or the topic level. By default, it is disabled on all namespaces or topics. You can enable it in the following ways: + +* Enable deduplication for all namespaces/topics at the broker-level. +* Enable deduplication for a specific namespace with the `pulsar-admin namespaces` interface. +* Enable deduplication for a specific topic with the `pulsar-admin topics` interface. + +## Configure message deduplication + +You can configure message deduplication in Pulsar using the [`broker.conf`](reference-configuration.md#broker) configuration file. The following deduplication-related parameters are available. + +Parameter | Description | Default +:---------|:------------|:------- +`brokerDeduplicationEnabled` | Sets the default behavior for message deduplication in the Pulsar broker. If it is set to `true`, message deduplication is enabled on all namespaces/topics. If it is set to `false`, you have to enable or disable deduplication at the namespace level or the topic level. | `false` +`brokerDeduplicationMaxNumberOfProducers` | The maximum number of producers for which information is stored for deduplication purposes. | `10000` +`brokerDeduplicationEntriesInterval` | The number of entries after which a deduplication informational snapshot is taken. A larger interval leads to fewer snapshots being taken, though this lengthens the topic recovery time (the time required for entries published after the snapshot to be replayed). | `1000` +`brokerDeduplicationProducerInactivityTimeoutMinutes` | The time of inactivity (in minutes) after which the broker discards deduplication information related to a disconnected producer. | `360` (6 hours) + +### Set default value at the broker-level + +By default, message deduplication is *disabled* on all Pulsar namespaces/topics. To enable it on all namespaces/topics, set the `brokerDeduplicationEnabled` parameter to `true` and re-start the broker. + +Even if you set the value for `brokerDeduplicationEnabled`, enabling or disabling via Pulsar admin CLI overrides the default settings at the broker-level. + +### Enable message deduplication + +Though message deduplication is disabled by default at the broker level, you can enable message deduplication for a specific namespace or topic using the [`pulsar-admin namespaces set-deduplication`](reference-pulsar-admin.md#namespace-set-deduplication) or the [`pulsar-admin topics set-deduplication`](reference-pulsar-admin.md#topic-set-deduplication) command. You can use the `--enable`/`-e` flag and specify the namespace/topic. + +The following example shows how to enable message deduplication at the namespace level. + +```bash + +$ bin/pulsar-admin namespaces set-deduplication \ + public/default \ + --enable # or just -e + +``` + +### Disable message deduplication + +Even if you enable message deduplication at the broker level, you can disable message deduplication for a specific namespace or topic using the [`pulsar-admin namespace set-deduplication`](reference-pulsar-admin.md#namespace-set-deduplication) or the [`pulsar-admin topics set-deduplication`](reference-pulsar-admin.md#topic-set-deduplication) command. Use the `--disable`/`-d` flag and specify the namespace/topic. + +The following example shows how to disable message deduplication at the namespace level. + +```bash + +$ bin/pulsar-admin namespaces set-deduplication \ + public/default \ + --disable # or just -d + +``` + +## Pulsar clients + +If you enable message deduplication in Pulsar brokers, you need complete the following tasks for your client producers: + +1. Specify a name for the producer. +1. Set the message timeout to `0` (namely, no timeout). + +The instructions for Java, Python, and C++ clients are different. + + + + +To enable message deduplication on a [Java producer](client-libraries-java.md#producers), set the producer name using the `producerName` setter, and set the timeout to `0` using the `sendTimeout` setter. + +```java + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import java.util.concurrent.TimeUnit; + +PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); +Producer producer = pulsarClient.newProducer() + .producerName("producer-1") + .topic("persistent://public/default/topic-1") + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + +``` + + + + +To enable message deduplication on a [Python producer](client-libraries-python.md#producers), set the producer name using `producer_name`, and set the timeout to `0` using `send_timeout_millis`. + +```python + +import pulsar + +client = pulsar.Client("pulsar://localhost:6650") +producer = client.create_producer( + "persistent://public/default/topic-1", + producer_name="producer-1", + send_timeout_millis=0) + +``` + + + + +To enable message deduplication on a [C++ producer](client-libraries-cpp.md#producer), set the producer name using `producer_name`, and set the timeout to `0` using `send_timeout_millis`. + +```cpp + +#include + +std::string serviceUrl = "pulsar://localhost:6650"; +std::string topic = "persistent://some-tenant/ns1/topic-1"; +std::string producerName = "producer-1"; + +Client client(serviceUrl); + +ProducerConfiguration producerConfig; +producerConfig.setSendTimeout(0); +producerConfig.setProducerName(producerName); + +Producer producer; + +Result result = client.createProducer(topic, producerConfig, producer); + +``` + + + + diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-encryption.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-encryption.md new file mode 100644 index 0000000000000..c782e69ddd047 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-encryption.md @@ -0,0 +1,188 @@ +--- +id: cookbooks-encryption +title: Pulsar Encryption +sidebar_label: "Encryption" +original_id: cookbooks-encryption +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +Pulsar encryption allows applications to encrypt messages at the producer and decrypt at the consumer. Encryption is performed using the public/private key pair configured by the application. Encrypted messages can only be decrypted by consumers with a valid key. + +## Asymmetric and symmetric encryption + +Pulsar uses dynamically generated symmetric AES key to encrypt messages(data). The AES key(data key) is encrypted using application provided ECDSA/RSA key pair, as a result there is no need to share the secret with everyone. + +Key is a public/private key pair used for encryption/decryption. The producer key is the public key, and the consumer key is the private key of the key pair. + +The application configures the producer with the public key. This key is used to encrypt the AES data key. The encrypted data key is sent as part of message header. Only entities with the private key(in this case the consumer) will be able to decrypt the data key which is used to decrypt the message. + +A message can be encrypted with more than one key. Any one of the keys used for encrypting the message is sufficient to decrypt the message + +Pulsar does not store the encryption key anywhere in the pulsar service. If you lose/delete the private key, your message is irretrievably lost, and is unrecoverable + +## Producer +![alt text](/assets/pulsar-encryption-producer.jpg "Pulsar Encryption Producer") + +## Consumer +![alt text](/assets/pulsar-encryption-consumer.jpg "Pulsar Encryption Consumer") + +## Here are the steps to get started: + +1. Create your ECDSA or RSA public/private key pair. + +```shell + +openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem +openssl ec -in test_ecdsa_privkey.pem -pubout -outform pkcs8 -out test_ecdsa_pubkey.pem + +``` + +2. Add the public and private key to the key management and configure your producers to retrieve public keys and consumers clients to retrieve private keys. +3. Implement CryptoKeyReader::getPublicKey() interface from producer and CryptoKeyReader::getPrivateKey() interface from consumer, which will be invoked by Pulsar client to load the key. +4. Add encryption key to producer configuration: conf.addEncryptionKey("myapp.key") +5. Add CryptoKeyReader implementation to producer/consumer config: conf.setCryptoKeyReader(keyReader) +6. Sample producer application: + +```java + +class RawFileKeyReader implements CryptoKeyReader { + + String publicKeyFile = ""; + String privateKeyFile = ""; + + RawFileKeyReader(String pubKeyFile, String privKeyFile) { + publicKeyFile = pubKeyFile; + privateKeyFile = privKeyFile; + } + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile))); + } catch (IOException e) { + System.out.println("ERROR: Failed to read public key from file " + publicKeyFile); + e.printStackTrace(); + } + return keyInfo; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile))); + } catch (IOException e) { + System.out.println("ERROR: Failed to read private key from file " + privateKeyFile); + e.printStackTrace(); + } + return keyInfo; + } +} +PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080"); + +ProducerConfiguration prodConf = new ProducerConfiguration(); +prodConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem")); +prodConf.addEncryptionKey("myappkey"); + +Producer producer = pulsarClient.createProducer("persistent://my-tenant/my-ns/my-topic", prodConf); + +for (int i = 0; i < 10; i++) { + producer.send("my-message".getBytes()); +} + +pulsarClient.close(); + +``` + +7. Sample Consumer Application: + +```java + +class RawFileKeyReader implements CryptoKeyReader { + + String publicKeyFile = ""; + String privateKeyFile = ""; + + RawFileKeyReader(String pubKeyFile, String privKeyFile) { + publicKeyFile = pubKeyFile; + privateKeyFile = privKeyFile; + } + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile))); + } catch (IOException e) { + System.out.println("ERROR: Failed to read public key from file " + publicKeyFile); + e.printStackTrace(); + } + return keyInfo; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile))); + } catch (IOException e) { + System.out.println("ERROR: Failed to read private key from file " + privateKeyFile); + e.printStackTrace(); + } + return keyInfo; + } +} + +ConsumerConfiguration consConf = new ConsumerConfiguration(); +consConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem")); +PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080"); +Consumer consumer = pulsarClient.subscribe("persistent://my-tenant//my-ns/my-topic", "my-subscriber-name", consConf); +Message msg = null; + +for (int i = 0; i < 10; i++) { + msg = consumer.receive(); + // do something + System.out.println("Received: " + new String(msg.getData())); +} + +// Acknowledge the consumption of all messages at once +consumer.acknowledgeCumulative(msg); +pulsarClient.close(); + +``` + +## Key rotation +Pulsar generates new AES data key every 4 hours or after a certain number of messages are published. The asymmetric public key is automatically fetched by producer every 4 hours by calling CryptoKeyReader::getPublicKey() to retrieve the latest version. + +## Enabling encryption at the producer application: +If you produce messages that are consumed across application boundaries, you need to ensure that consumers in other applications have access to one of the private keys that can decrypt the messages. This can be done in two ways: +1. The consumer application provides you access to their public key, which you add to your producer keys +1. You grant access to one of the private keys from the pairs used by producer + +In some cases, the producer may want to encrypt the messages with multiple keys. For this, add all such keys to the config. Consumer will be able to decrypt the message, as long as it has access to at least one of the keys. + +E.g: If messages needs to be encrypted using 2 keys myapp.messagekey1 and myapp.messagekey2, + +```java + +conf.addEncryptionKey("myapp.messagekey1"); +conf.addEncryptionKey("myapp.messagekey2"); + +``` + +## Decrypting encrypted messages at the consumer application: +Consumers require access one of the private keys to decrypt messages produced by the producer. If you would like to receive encrypted messages, create a public/private key and give your public key to the producer application to encrypt messages using your public key. + +## Handling Failures: +* Producer/ Consumer loses access to the key + * Producer action will fail indicating the cause of the failure. Application has the option to proceed with sending unencrypted message in such cases. Call conf.setCryptoFailureAction(ProducerCryptoFailureAction) to control the producer behavior. The default behavior is to fail the request. + * If consumption failed due to decryption failure or missing keys in consumer, application has the option to consume the encrypted message or discard it. Call conf.setCryptoFailureAction(ConsumerCryptoFailureAction) to control the consumer behavior. The default behavior is to fail the request. +Application will never be able to decrypt the messages if the private key is permanently lost. +* Batch messaging + * If decryption fails and the message contain batch messages, client will not be able to retrieve individual messages in the batch, hence message consumption fails even if conf.setCryptoFailureAction() is set to CONSUME. +* If decryption fails, the message consumption stops and application will notice backlog growth in addition to decryption failure messages in the client log. If application does not have access to the private key to decrypt the message, the only option is to skip/discard backlogged messages. + diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-message-queue.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-message-queue.md new file mode 100644 index 0000000000000..f30da44a1bbad --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-message-queue.md @@ -0,0 +1,130 @@ +--- +id: cookbooks-message-queue +title: Using Pulsar as a message queue +sidebar_label: "Message queue" +original_id: cookbooks-message-queue +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +Message queues are essential components of many large-scale data architectures. If every single work object that passes through your system absolutely *must* be processed in spite of the slowness or downright failure of this or that system component, there's a good chance that you'll need a message queue to step in and ensure that unprocessed data is retained---with correct ordering---until the required actions are taken. + +Pulsar is a great choice for a message queue because: + +* it was built with [persistent message storage](concepts-architecture-overview.md#persistent-storage) in mind +* it offers automatic load balancing across [consumers](reference-terminology.md#consumer) for messages on a topic (or custom load balancing if you wish) + +> You can use the same Pulsar installation to act as a real-time message bus and as a message queue if you wish (or just one or the other). You can set aside some topics for real-time purposes and other topics for message queue purposes (or use specific namespaces for either purpose if you wish). + + +# Client configuration changes + +To use a Pulsar [topic](reference-terminology.md#topic) as a message queue, you should distribute the receiver load on that topic across several consumers (the optimal number of consumers will depend on the load). Each consumer must: + +* Establish a [shared subscription](concepts-messaging.md#shared) and use the same subscription name as the other consumers (otherwise the subscription is not shared and the consumers can't act as a processing ensemble) +* If you'd like to have tight control over message dispatching across consumers, set the consumers' **receiver queue** size very low (potentially even to 0 if necessary). Each Pulsar [consumer](reference-terminology.md#consumer) has a receiver queue that determines how many messages the consumer will attempt to fetch at a time. A receiver queue of 1000 (the default), for example, means that the consumer will attempt to process 1000 messages from the topic's backlog upon connection. Setting the receiver queue to zero essentially means ensuring that each consumer is only doing one thing at a time. + + The downside to restricting the receiver queue size of consumers is that that limits the potential throughput of those consumers and cannot be used with [partitioned topics](reference-terminology.md#partitioned-topic). Whether the performance/control trade-off is worthwhile will depend on your use case. + +## Java clients + +Here's an example Java consumer configuration that uses a shared subscription: + +```java + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; + +String SERVICE_URL = "pulsar://localhost:6650"; +String TOPIC = "persistent://public/default/mq-topic-1"; +String subscription = "sub-1"; + +PulsarClient client = PulsarClient.builder() + .serviceUrl(SERVICE_URL) + .build(); + +Consumer consumer = client.newConsumer() + .topic(TOPIC) + .subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared) + // If you'd like to restrict the receiver queue size + .receiverQueueSize(10) + .subscribe(); + +``` + +## Python clients + +Here's an example Python consumer configuration that uses a shared subscription: + +```python + +from pulsar import Client, ConsumerType + +SERVICE_URL = "pulsar://localhost:6650" +TOPIC = "persistent://public/default/mq-topic-1" +SUBSCRIPTION = "sub-1" + +client = Client(SERVICE_URL) +consumer = client.subscribe( + TOPIC, + SUBSCRIPTION, + # If you'd like to restrict the receiver queue size + receiver_queue_size=10, + consumer_type=ConsumerType.Shared) + +``` + +## C++ clients + +Here's an example C++ consumer configuration that uses a shared subscription: + +```cpp + +#include + +std::string serviceUrl = "pulsar://localhost:6650"; +std::string topic = "persistent://public/defaultmq-topic-1"; +std::string subscription = "sub-1"; + +Client client(serviceUrl); + +ConsumerConfiguration consumerConfig; +consumerConfig.setConsumerType(ConsumerType.ConsumerShared); +// If you'd like to restrict the receiver queue size +consumerConfig.setReceiverQueueSize(10); + +Consumer consumer; + +Result result = client.subscribe(topic, subscription, consumerConfig, consumer); + +``` + +## Go clients + +Here is an example of a Go consumer configuration that uses the shared subscription. + +```go + +import "github.com/apache/pulsar-client-go/pulsar" +client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "pulsar://localhost:6650", +}) +if err != nil { + log.Fatal(err) +} +consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: "persistent://public/default/mq-topic-1", + SubscriptionName: "sub-1", + Type: pulsar.Shared, + ReceiverQueueSize: 10, // If you'd like to restrict the receiver queue size +}) +if err != nil { + log.Fatal(err) +} + +``` + diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-non-persistent.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-non-persistent.md new file mode 100644 index 0000000000000..f10d8374041ac --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-non-persistent.md @@ -0,0 +1,67 @@ +--- +id: cookbooks-non-persistent +title: Non-persistent messaging +sidebar_label: "Non-persistent messaging" +original_id: cookbooks-non-persistent +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +**Non-persistent topics** are Pulsar topics in which message data is *never* [persistently stored](concepts-architecture-overview.md#persistent-storage) and kept only in memory. This cookbook provides: + +* A basic [conceptual overview](#overview) of non-persistent topics +* Information about [configurable parameters](#configuration) related to non-persistent topics +* A guide to the [CLI interface](#cli) for managing non-persistent topics + +## Overview + +By default, Pulsar persistently stores *all* unacknowledged messages on multiple [BookKeeper](#persistent-storage) bookies (storage nodes). Data for messages on persistent topics can thus survive broker restarts and subscriber failover. + +Pulsar also, however, supports **non-persistent topics**, which are topics on which messages are *never* persisted to disk and live only in memory. When using non-persistent delivery, killing a Pulsar [broker](reference-terminology.md#broker) or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss. + +Non-persistent topics have names of this form (note the `non-persistent` in the name): + +```http + +non-persistent://tenant/namespace/topic + +``` + +> For more high-level information about non-persistent topics, see the [Concepts and Architecture](concepts-messaging.md#non-persistent-topics) documentation. + +## Using + +> In order to use non-persistent topics, they must be [enabled](#enabling) in your Pulsar broker configuration. + +In order to use non-persistent topics, you only need to differentiate them by name when interacting with them. This [`pulsar-client produce`](reference-cli-tools.md#pulsar-client-produce) command, for example, would produce one message on a non-persistent topic in a standalone cluster: + +```bash + +$ bin/pulsar-client produce non-persistent://public/default/example-np-topic \ + --num-produce 1 \ + --messages "This message will be stored only in memory" + +``` + +> For a more thorough guide to non-persistent topics from an administrative perspective, see the [Non-persistent topics](admin-api-topics) guide. + +## Enabling + +In order to enable non-persistent topics in a Pulsar broker, the [`enableNonPersistentTopics`](reference-configuration.md#broker-enableNonPersistentTopics) must be set to `true`. This is the default, and so you won't need to take any action to enable non-persistent messaging. + + +> #### Configuration for standalone mode +> If you're running Pulsar in standalone mode, the same configurable parameters are available but in the [`standalone.conf`](reference-configuration.md#standalone) configuration file. + +If you'd like to enable *only* non-persistent topics in a broker, you can set the [`enablePersistentTopics`](reference-configuration.md#broker-enablePersistentTopics) parameter to `false` and the `enableNonPersistentTopics` parameter to `true`. + +## Managing with cli + +Non-persistent topics can be managed using the [`pulsar-admin non-persistent`](reference-pulsar-admin.md#non-persistent) command-line interface. With that interface you can perform actions like [create a partitioned non-persistent topic](reference-pulsar-admin.md#non-persistent-create-partitioned-topic), get [stats](reference-pulsar-admin.md#non-persistent-stats) for a non-persistent topic, [list](reference-pulsar-admin) non-persistent topics under a namespace, and more. + +## Using with Pulsar clients + +You shouldn't need to make any changes to your Pulsar clients to use non-persistent messaging beyond making sure that you use proper [topic names](#using) with `non-persistent` as the topic type. + diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-partitioned.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-partitioned.md new file mode 100644 index 0000000000000..9442a1ef2c32a --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-partitioned.md @@ -0,0 +1,11 @@ +--- +id: cookbooks-partitioned +title: Partitioned topics +sidebar_label: "Partitioned Topics" +original_id: cookbooks-partitioned +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +For details of the content, refer to [manage topics](admin-api-topics). diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-retention-expiry.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-retention-expiry.md new file mode 100644 index 0000000000000..60441a3f28f66 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-retention-expiry.md @@ -0,0 +1,421 @@ +--- +id: cookbooks-retention-expiry +title: Message retention and expiry +sidebar_label: "Message retention and expiry" +original_id: cookbooks-retention-expiry +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +Pulsar brokers are responsible for handling messages that pass through Pulsar, including [persistent storage](concepts-architecture-overview.md#persistent-storage) of messages. By default, for each topic, brokers only retain messages that are in at least one backlog. A backlog is the set of unacknowledged messages for a particular subscription. As a topic can have multiple subscriptions, a topic can have multiple backlogs. + +As a consequence, no messages are retained (by default) on a topic that has not had any subscriptions created for it. + +(Note that messages that are no longer being stored are not necessarily immediately deleted, and may in fact still be accessible until the next ledger rollover. Because clients cannot predict when rollovers may happen, it is not wise to rely on a rollover not happening at an inconvenient point in time.) + +In Pulsar, you can modify this behavior, with namespace granularity, in two ways: + +* You can persistently store messages that are not within a backlog (because they've been acknowledged by on every existing subscription, or because there are no subscriptions) by setting [retention policies](#retention-policies). +* Messages that are not acknowledged within a specified timeframe can be automatically acknowledged, by specifying the [time to live](#time-to-live-ttl) (TTL). + +Pulsar's [admin interface](admin-api-overview) enables you to manage both retention policies and TTL with namespace granularity (and thus within a specific tenant and either on a specific cluster or in the [`global`](concepts-architecture-overview.md#global-cluster) cluster). + + +> #### Retention and TTL solve two different problems +> * Message retention: Keep the data for at least X hours (even if acknowledged) +> * Time-to-live: Discard data after some time (by automatically acknowledging) +> +> Most applications will want to use at most one of these. + + +## Retention policies + +By default, when a Pulsar message arrives at a broker, the message is stored until it has been acknowledged on all subscriptions, at which point it is marked for deletion. You can override this behavior and retain messages that have already been acknowledged on all subscriptions by setting a *retention policy* for all topics in a given namespace. Retention is based on both a *size limit* and a *time limit*. + +Retention policies are useful when you use the Reader interface. The Reader interface does not use acknowledgements, and messages do not exist within backlogs. It is required to configure retention for Reader-only use cases. + +When you set a retention policy on topics in a namespace, you must set **both** a *size limit* and a *time limit*. You can refer to the following table to set retention policies in `pulsar-admin` and Java. + +|Time limit|Size limit| Message retention | +|----------|----------|------------------------| +| -1 | -1 | Infinite retention | +| -1 | >0 | Based on the size limit | +| >0 | -1 | Based on the time limit | +| 0 | 0 | Disable message retention (by default) | +| 0 | >0 | Invalid | +| >0 | 0 | Invalid | +| >0 | >0 | Acknowledged messages or messages with no active subscription will not be retained when either time or size reaches the limit. | + +The retention settings apply to all messages on topics that do not have any subscriptions, or to messages that have been acknowledged by all subscriptions. The retention policy settings do not affect unacknowledged messages on topics with subscriptions. The unacknowledged messages are controlled by the backlog quota. + +When a retention limit on a topic is exceeded, the oldest message is marked for deletion until the set of retained messages falls within the specified limits again. + +### Defaults + +You can set message retention at instance level with the following two parameters: `defaultRetentionTimeInMinutes` and `defaultRetentionSizeInMB`. Both parameters are set to `0` by default. + +For more information of the two parameters, refer to the [`broker.conf`](reference-configuration.md#broker) configuration file. + +### Set retention policy + +You can set a retention policy for a namespace by specifying the namespace, a size limit and a time limit in `pulsar-admin`, REST API and Java. + + + + +You can use the [`set-retention`](reference-pulsar-admin.md#namespaces-set-retention) subcommand and specify a namespace, a size limit using the `-s`/`--size` flag, and a time limit using the `-t`/`--time` flag. + +In the following example, the size limit is set to 10 GB and the time limit is set to 3 hours for each topic within the `my-tenant/my-ns` namespace. +- When the size of messages reaches 10 GB on a topic within 3 hours, the acknowledged messages will not be retained. +- After 3 hours, even if the message size is less than 10 GB, the acknowledged messages will not be retained. + +```shell + +$ pulsar-admin namespaces set-retention my-tenant/my-ns \ + --size 10G \ + --time 3h + +``` + +In the following example, the time is not limited and the size limit is set to 1 TB. The size limit determines the retention. + +```shell + +$ pulsar-admin namespaces set-retention my-tenant/my-ns \ + --size 1T \ + --time -1 + +``` + +In the following example, the size is not limited and the time limit is set to 3 hours. The time limit determines the retention. + +```shell + +$ pulsar-admin namespaces set-retention my-tenant/my-ns \ + --size -1 \ + --time 3h + +``` + +To achieve infinite retention, set both values to `-1`. + +```shell + +$ pulsar-admin namespaces set-retention my-tenant/my-ns \ + --size -1 \ + --time -1 + +``` + +To disable the retention policy, set both values to `0`. + +```shell + +$ pulsar-admin namespaces set-retention my-tenant/my-ns \ + --size 0 \ + --time 0 + +``` + + + + +{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/retention|operation/setRetention?version=@pulsar:version_number@} + +:::note + +To disable the retention policy, you need to set both the size and time limit to `0`. Set either size or time limit to `0` is invalid. + +::: + + + + +```java + +int retentionTime = 10; // 10 minutes +int retentionSize = 500; // 500 megabytes +RetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize); +admin.namespaces().setRetention(namespace, policies); + +``` + + + + + +### Get retention policy + +You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: `retentionTimeInMinutes` and `retentionSizeInMB`. + +#### pulsar-admin + +Use the [`get-retention`](reference-pulsar-admin.md#namespaces) subcommand and specify the namespace. + +##### Example + +```shell + +$ pulsar-admin namespaces get-retention my-tenant/my-ns +{ + "retentionTimeInMinutes": 10, + "retentionSizeInMB": 500 +} + +``` + +#### REST API + +{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/retention|operation/getRetention?version=@pulsar:version_number@} + +#### Java + +```java + +admin.namespaces().getRetention(namespace); + +``` + +## Backlog quotas + +*Backlogs* are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged. + +You can control the allowable size of backlogs, at the namespace level, using *backlog quotas*. Setting a backlog quota involves setting: + +TODO: Expand on is this per backlog or per topic? + +* an allowable *size threshold* for each topic in the namespace +* a *retention policy* that determines which action the [broker](reference-terminology.md#broker) takes if the threshold is exceeded. + +The following retention policies are available: + +Policy | Action +:------|:------ +`producer_request_hold` | The broker will hold and not persist produce request payload +`producer_exception` | The broker will disconnect from the client by throwing an exception +`consumer_backlog_eviction` | The broker will begin discarding backlog messages + + +> #### Beware the distinction between retention policy types +> As you may have noticed, there are two definitions of the term "retention policy" in Pulsar, one that applies to persistent storage of messages not in backlogs, and one that applies to messages within backlogs. + + +Backlog quotas are handled at the namespace level. They can be managed via: + +### Set size thresholds and backlog retention policies + +You can set a size threshold and backlog retention policy for all of the topics in a [namespace](reference-terminology.md#namespace) by specifying the namespace, a size limit, and a policy by name. + +#### pulsar-admin + +Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand and specify a namespace, a size limit using the `-l`/`--limit` flag, and a retention policy using the `-p`/`--policy` flag. + +##### Example + +```shell + +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \ + --limit 2G \ + --policy producer_request_hold + +``` + +#### REST API + +{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/getBacklogQuotaMap?version=@pulsar:version_number@} + +#### Java + +```java + +long sizeLimit = 2147483648L; +BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold; +BacklogQuota quota = new BacklogQuota(sizeLimit, policy); +admin.namespaces().setBacklogQuota(namespace, quota); + +``` + +### Get backlog threshold and backlog retention policy + +You can see which size threshold and backlog retention policy has been applied to a namespace. + +#### pulsar-admin + +Use the [`get-backlog-quotas`](reference-pulsar-admin.md#pulsar-admin-namespaces-get-backlog-quotas) subcommand and specify a namespace. Here's an example: + +```shell + +$ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns +{ + "destination_storage": { + "limit" : 2147483648, + "policy" : "producer_request_hold" + } +} + +``` + +#### REST API + +{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap|operation/getBacklogQuotaMap?version=@pulsar:version_number@} + +#### Java + +```java + +Map quotas = + admin.namespaces().getBacklogQuotas(namespace); + +``` + +### Remove backlog quotas + +#### pulsar-admin + +Use the [`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota) subcommand and specify a namespace. Here's an example: + +```shell + +$ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns + +``` + +#### REST API + +{@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/removeBacklogQuota?version=@pulsar:version_number@} + +#### Java + +```java + +admin.namespaces().removeBacklogQuota(namespace); + +``` + +### Clear backlog + +#### pulsar-admin + +Use the [`clear-backlog`](reference-pulsar-admin.md#pulsar-admin-namespaces-clear-backlog) subcommand. + +##### Example + +```shell + +$ pulsar-admin namespaces clear-backlog my-tenant/my-ns + +``` + +By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the `-f`/`--force` flag. + +## Time to live (TTL) + +By default, Pulsar stores all unacknowledged messages forever. This can lead to heavy disk space usage in cases where a lot of messages are going unacknowledged. If disk space is a concern, you can set a time to live (TTL) that determines how long unacknowledged messages will be retained. + +### Set the TTL for a namespace + +#### pulsar-admin + +Use the [`set-message-ttl`](reference-pulsar-admin.md#pulsar-admin-namespaces-set-message-ttl) subcommand and specify a namespace and a TTL (in seconds) using the `-ttl`/`--messageTTL` flag. + +##### Example + +```shell + +$ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \ + --messageTTL 120 # TTL of 2 minutes + +``` + +#### REST API + +{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/setNamespaceMessageTTL?version=@pulsar:version_number@} + +#### Java + +```java + +admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds); + +``` + +### Get the TTL configuration for a namespace + +#### pulsar-admin + +Use the [`get-message-ttl`](reference-pulsar-admin.md#pulsar-admin-namespaces-get-message-ttl) subcommand and specify a namespace. + +##### Example + +```shell + +$ pulsar-admin namespaces get-message-ttl my-tenant/my-ns +60 + +``` + +#### REST API + +{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/getNamespaceMessageTTL?version=@pulsar:version_number@} + +#### Java + +```java + +admin.namespaces().getNamespaceMessageTTL(namespace) + +``` + +### Remove the TTL configuration for a namespace + +#### pulsar-admin + +Use the [`remove-message-ttl`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-message-ttl) subcommand and specify a namespace. + +##### Example + +```shell + +$ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns + +``` + +#### REST API + +{@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/removeNamespaceMessageTTL?version=@pulsar:version_number@} + +#### Java + +```java + +admin.namespaces().removeNamespaceMessageTTL(namespace) + +``` + +## Delete messages from namespaces + +If you do not have any retention period and that you never have much of a backlog, the upper limit for retaining messages, which are acknowledged, equals to the Pulsar segment rollover period + entry log rollover period + (garbage collection interval * garbage collection ratios). + +- **Segment rollover period**: basically, the segment rollover period is how often a new segment is created. Once a new segment is created, the old segment will be deleted. By default, this happens either when you have written 50,000 entries (messages) or have waited 240 minutes. You can tune this in your broker. + +- **Entry log rollover period**: multiple ledgers in BookKeeper are interleaved into an [entry log](https://bookkeeper.apache.org/docs/4.11.1/getting-started/concepts/#entry-logs). In order for a ledger that has been deleted, the entry log must all be rolled over. +The entry log rollover period is configurable, but is purely based on the entry log size. For details, see [here](https://bookkeeper.apache.org/docs/4.11.1/reference/config/#entry-log-settings). Once the entry log is rolled over, the entry log can be garbage collected. + +- **Garbage collection interval**: because entry logs have interleaved ledgers, to free up space, the entry logs need to be rewritten. The garbage collection interval is how often BookKeeper performs garbage collection. which is related to minor compaction and major compaction of entry logs. For details, see [here](https://bookkeeper.apache.org/docs/4.11.1/reference/config/#entry-log-compaction-settings). diff --git a/site2/website-next/versioned_docs/version-2.7.2/cookbooks-tiered-storage.md b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-tiered-storage.md new file mode 100644 index 0000000000000..9f5aa6287b0e1 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/cookbooks-tiered-storage.md @@ -0,0 +1,334 @@ +--- +id: cookbooks-tiered-storage +title: Tiered Storage +sidebar_label: "Tiered Storage" +original_id: cookbooks-tiered-storage +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +Pulsar's **Tiered Storage** feature allows older backlog data to be offloaded to long term storage, thereby freeing up space in BookKeeper and reducing storage costs. This cookbook walks you through using tiered storage in your Pulsar cluster. + +* Tiered storage uses [Apache jclouds](https://jclouds.apache.org) to support [Amazon S3](https://aws.amazon.com/s3/) and [Google Cloud Storage](https://cloud.google.com/storage/)(GCS for short) for long term storage. With Jclouds, it is easy to add support for more [cloud storage providers](https://jclouds.apache.org/reference/providers/#blobstore-providers) in the future. + +* Tiered storage uses [Apache Hadoop](http://hadoop.apache.org/) to support filesystem for long term storage. With Hadoop, it is easy to add support for more filesystem in the future. + +## When should I use Tiered Storage? + +Tiered storage should be used when you have a topic for which you want to keep a very long backlog for a long time. For example, if you have a topic containing user actions which you use to train your recommendation systems, you may want to keep that data for a long time, so that if you change your recommendation algorithm you can rerun it against your full user history. + +## The offloading mechanism + +A topic in Pulsar is backed by a log, known as a managed ledger. This log is composed of an ordered list of segments. Pulsar only every writes to the final segment of the log. All previous segments are sealed. The data within the segment is immutable. This is known as a segment oriented architecture. + +![Tiered storage](/assets/pulsar-tiered-storage.png "Tiered Storage") + +The Tiered Storage offloading mechanism takes advantage of this segment oriented architecture. When offloading is requested, the segments of the log are copied, one-by-one, to tiered storage. All segments of the log, apart from the segment currently being written to can be offloaded. + +On the broker, the administrator must configure the bucket and credentials for the cloud storage service. +The configured bucket must exist before attempting to offload. If it does not exist, the offload operation will fail. + +Pulsar uses multi-part objects to upload the segment data. It is possible that a broker could crash while uploading the data. +We recommend you add a life cycle rule your bucket to expire incomplete multi-part upload after a day or two to avoid +getting charged for incomplete uploads. + +When ledgers are offloaded to long term storage, you can still query data in the offloaded ledgers with Pulsar SQL. + +## Configuring the offload driver + +Offloading is configured in ```broker.conf```. + +At a minimum, the administrator must configure the driver, the bucket and the authenticating credentials. +There is also some other knobs to configure, like the bucket region, the max block size in backed storage, etc. + +Currently we support driver of types: + +- `aws-s3`: [Simple Cloud Storage Service](https://aws.amazon.com/s3/) +- `google-cloud-storage`: [Google Cloud Storage](https://cloud.google.com/storage/) +- `filesystem`: [Filesystem Storage](http://hadoop.apache.org/) + +> Driver names are case-insensitive for driver's name. There is a third driver type, `s3`, which is identical to `aws-s3`, +> though it requires that you specify an endpoint url using `s3ManagedLedgerOffloadServiceEndpoint`. This is useful if +> using a S3 compatible data store, other than AWS. + +```conf + +managedLedgerOffloadDriver=aws-s3 + +``` + +### "aws-s3" Driver configuration + +#### Bucket and Region + +Buckets are the basic containers that hold your data. +Everything that you store in Cloud Storage must be contained in a bucket. +You can use buckets to organize your data and control access to your data, +but unlike directories and folders, you cannot nest buckets. + +```conf + +s3ManagedLedgerOffloadBucket=pulsar-topic-offload + +``` + +Bucket Region is the region where bucket located. Bucket Region is not a required +but a recommended configuration. If it is not configured, It will use the default region. + +With AWS S3, the default region is `US East (N. Virginia)`. Page [AWS Regions and Endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html) contains more information. + +```conf + +s3ManagedLedgerOffloadRegion=eu-west-3 + +``` + +#### Authentication with AWS + +To be able to access AWS S3, you need to authenticate with AWS S3. +Pulsar does not provide any direct means of configuring authentication for AWS S3, +but relies on the mechanisms supported by the [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). + +Once you have created a set of credentials in the AWS IAM console, they can be configured in a number of ways. + +1. Using ec2 instance metadata credentials + +If you are on AWS instance with an instance profile that provides credentials, Pulsar will use these credentials +if no other mechanism is provided + +2. Set the environment variables **AWS_ACCESS_KEY_ID** and **AWS_SECRET_ACCESS_KEY** in ```conf/pulsar_env.sh```. + +```bash + +export AWS_ACCESS_KEY_ID=ABC123456789 +export AWS_SECRET_ACCESS_KEY=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c + +``` + +> \"export\" is important so that the variables are made available in the environment of spawned processes. + + +3. Add the Java system properties *aws.accessKeyId* and *aws.secretKey* to **PULSAR_EXTRA_OPTS** in `conf/pulsar_env.sh`. + +```bash + +PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS} ${PULSAR_MEM} ${PULSAR_GC} -Daws.accessKeyId=ABC123456789 -Daws.secretKey=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024" + +``` + +4. Set the access credentials in ```~/.aws/credentials```. + +```conf + +[default] +aws_access_key_id=ABC123456789 +aws_secret_access_key=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c + +``` + +5. Assuming an IAM role + +If you want to assume an IAM role, this can be done via specifying the following: + +```conf + +s3ManagedLedgerOffloadRole= +s3ManagedLedgerOffloadRoleSessionName=pulsar-s3-offload + +``` + +This will use the `DefaultAWSCredentialsProviderChain` for assuming this role. + +> The broker must be rebooted for credentials specified in pulsar_env to take effect. + +#### Configuring the size of block read/write + +Pulsar also provides some knobs to configure the size of requests sent to AWS S3. + +- ```s3ManagedLedgerOffloadMaxBlockSizeInBytes``` configures the maximum size of + a "part" sent during a multipart upload. This cannot be smaller than 5MB. Default is 64MB. +- ```s3ManagedLedgerOffloadReadBufferSizeInBytes``` configures the block size for + each individual read when reading back data from AWS S3. Default is 1MB. + +In both cases, these should not be touched unless you know what you are doing. + +### "google-cloud-storage" Driver configuration + +Buckets are the basic containers that hold your data. Everything that you store in +Cloud Storage must be contained in a bucket. You can use buckets to organize your data and +control access to your data, but unlike directories and folders, you cannot nest buckets. + +```conf + +gcsManagedLedgerOffloadBucket=pulsar-topic-offload + +``` + +Bucket Region is the region where bucket located. Bucket Region is not a required but +a recommended configuration. If it is not configured, It will use the default region. + +Regarding GCS, buckets are default created in the `us multi-regional location`, +page [Bucket Locations](https://cloud.google.com/storage/docs/bucket-locations) contains more information. + +```conf + +gcsManagedLedgerOffloadRegion=europe-west3 + +``` + +#### Authentication with GCS + +The administrator needs to configure `gcsManagedLedgerOffloadServiceAccountKeyFile` in `broker.conf` +for the broker to be able to access the GCS service. `gcsManagedLedgerOffloadServiceAccountKeyFile` is +a Json file, containing the GCS credentials of a service account. +[Service Accounts section of this page](https://support.google.com/googleapi/answer/6158849) contains +more information of how to create this key file for authentication. More information about google cloud IAM +is available [here](https://cloud.google.com/storage/docs/access-control/iam). + +To generate service account credentials or view the public credentials that you've already generated, follow the following steps: + +1. Open the [Service accounts page](https://console.developers.google.com/iam-admin/serviceaccounts). +2. Select a project or create a new one. +3. Click **Create service account**. +4. In the **Create service account** window, type a name for the service account, and select **Furnish a new private key**. If you want to [grant G Suite domain-wide authority](https://developers.google.com/identity/protocols/OAuth2ServiceAccount#delegatingauthority) to the service account, also select **Enable G Suite Domain-wide Delegation**. +5. Click **Create**. + +> Notes: Make ensure that the service account you create has permission to operate GCS, you need to assign **Storage Admin** permission to your service account in [here](https://cloud.google.com/storage/docs/access-control/iam). + +```conf + +gcsManagedLedgerOffloadServiceAccountKeyFile="/Users/hello/Downloads/project-804d5e6a6f33.json" + +``` + +#### Configuring the size of block read/write + +Pulsar also provides some knobs to configure the size of requests sent to GCS. + +- ```gcsManagedLedgerOffloadMaxBlockSizeInBytes``` configures the maximum size of a "part" sent + during a multipart upload. This cannot be smaller than 5MB. Default is 64MB. +- ```gcsManagedLedgerOffloadReadBufferSizeInBytes``` configures the block size for each individual + read when reading back data from GCS. Default is 1MB. + +In both cases, these should not be touched unless you know what you are doing. + +### "filesystem" Driver configuration + + +#### Configure connection address + +You can configure the connection address in the `broker.conf` file. + +```conf + +fileSystemURI="hdfs://127.0.0.1:9000" + +``` + +#### Configure Hadoop profile path + +The configuration file is stored in the Hadoop profile path. It contains various settings, such as base path, authentication, and so on. + +```conf + +fileSystemProfilePath="../conf/filesystem_offload_core_site.xml" + +``` + +The model for storing topic data uses `org.apache.hadoop.io.MapFile`. You can use all of the configurations in `org.apache.hadoop.io.MapFile` for Hadoop. + +**Example** + +```conf + + + fs.defaultFS + + + + + hadoop.tmp.dir + pulsar + + + + io.file.buffer.size + 4096 + + + + io.seqfile.compress.blocksize + 1000000 + + + + io.seqfile.compression.type + BLOCK + + + + io.map.index.interval + 128 + + +``` + +For more information about the configurations in `org.apache.hadoop.io.MapFile`, see [Filesystem Storage](http://hadoop.apache.org/). +## Configuring offload to run automatically + +Namespace policies can be configured to offload data automatically once a threshold is reached. The threshold is based on the size of data that the topic has stored on the pulsar cluster. Once the topic reaches the threshold, an offload operation will be triggered. Setting a negative value to the threshold will disable automatic offloading. Setting the threshold to 0 will cause the broker to offload data as soon as it possiby can. + +```bash + +$ bin/pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace + +``` + +> Automatic offload runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offload will not until the current segment is full. + + +## Triggering offload manually + +Offloading can manually triggered through a REST endpoint on the Pulsar broker. We provide a CLI which will call this rest endpoint for you. + +When triggering offload, you must specify the maximum size, in bytes, of backlog which will be retained locally on the bookkeeper. The offload mechanism will offload segments from the start of the topic backlog until this condition is met. + +```bash + +$ bin/pulsar-admin topics offload --size-threshold 10M my-tenant/my-namespace/topic1 +Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1 + +``` + +The command to triggers an offload will not wait until the offload operation has completed. To check the status of the offload, use offload-status. + +```bash + +$ bin/pulsar-admin topics offload-status my-tenant/my-namespace/topic1 +Offload is currently running + +``` + +To wait for offload to complete, add the -w flag. + +```bash + +$ bin/pulsar-admin topics offload-status -w my-tenant/my-namespace/topic1 +Offload was a success + +``` + +If there is an error offloading, the error will be propagated to the offload-status command. + +```bash + +$ bin/pulsar-admin topics offload-status persistent://public/default/topic1 +Error in offload +null + +Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: com.amazonaws.services.s3.model.AmazonS3Exception: Anonymous users cannot initiate multipart uploads. Please authenticate. (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 798758DE3F1776DF; S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g=), S3 Extended Request ID: dhBFz/lZm1oiG/oBEepeNlhrtsDlzoOhocuYMpKihQGXe6EG8puRGOkK6UwqzVrMXTWBxxHcS+g= + +``` + diff --git a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json index 4866df2528e0d..371ec59675ae7 100644 --- a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json +++ b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json @@ -396,6 +396,35 @@ }, { "type": "category", + "label": "Cookbooks", + "items": [ + { + "type": "doc", + "id": "version-2.7.2/cookbooks-compaction" + }, + { + "type": "doc", + "id": "version-2.7.2/cookbooks-deduplication" + }, + { + "type": "doc", + "id": "version-2.7.2/cookbooks-non-persistent" + }, + { + "type": "doc", + "id": "version-2.7.2/cookbooks-retention-expiry" + }, + { + "type": "doc", + "id": "version-2.7.2/cookbooks-encryption" + }, + { + "type": "doc", + "id": "version-2.7.2/cookbooks-message-queue" + }, + { + "type": "doc", + "id": "version-2.7.2/cookbooks-bookkeepermetadata" "label": "Adaptors", "items": [ {