diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-binary-protocol.md b/site2/website-next/versioned_docs/version-2.7.2/developing-binary-protocol.md new file mode 100644 index 0000000000000..b233f10530af6 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/developing-binary-protocol.md @@ -0,0 +1,581 @@ +--- +id: develop-binary-protocol +title: Pulsar binary protocol specification +sidebar_label: "Binary protocol" +original_id: develop-binary-protocol +--- + +Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgements and flow control, while ensuring maximum transport and implementation efficiency. + +Clients and brokers exchange *commands* with each other. Commands are formatted as binary [protocol buffer](https://developers.google.com/protocol-buffers/) (aka *protobuf*) messages. The format of protobuf commands is specified in the [`PulsarApi.proto`](https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto) file and also documented in the [Protobuf interface](#protobuf-interface) section below. + +> ### Connection sharing +> Commands for different producers and consumers can be interleaved and sent through the same connection without restriction. + +All commands associated with Pulsar's protocol are contained in a [`BaseCommand`](#pulsar.proto.BaseCommand) protobuf message that includes a [`Type`](#pulsar.proto.Type) [enum](https://developers.google.com/protocol-buffers/docs/proto#enum) with all possible subcommands as optional fields. `BaseCommand` messages can specify only one subcommand. + +## Framing + +Since protobuf doesn't provide any sort of message frame, all messages in the Pulsar protocol are prepended with a 4-byte field that specifies the size of the frame. The maximum allowable size of a single frame is 5 MB. + +The Pulsar protocol allows for two types of commands: + +1. **Simple commands** that do not carry a message payload. +2. **Payload commands** that bear a payload that is used when publishing or delivering messages. In payload commands, the protobuf command data is followed by protobuf [metadata](#message-metadata) and then the payload, which is passed in raw format outside of protobuf. All sizes are passed as 4-byte unsigned big endian integers. + +> Message payloads are passed in raw format rather than protobuf format for efficiency reasons. + +### Simple commands + +Simple (payload-free) commands have this basic structure: + +| Component | Description | Size (in bytes) | +|:------------|:----------------------------------------------------------------------------------------|:----------------| +| totalSize | The size of the frame, counting everything that comes after it (in bytes) | 4 | +| commandSize | The size of the protobuf-serialized command | 4 | +| message | The protobuf message serialized in a raw binary format (rather than in protobuf format) | | + +### Payload commands + +Payload commands have this basic structure: + +| Component | Description | Size (in bytes) | +|:-------------|:--------------------------------------------------------------------------------------------|:----------------| +| totalSize | The size of the frame, counting everything that comes after it (in bytes) | 4 | +| commandSize | The size of the protobuf-serialized command | 4 | +| message | The protobuf message serialized in a raw binary format (rather than in protobuf format) | | +| magicNumber | A 2-byte byte array (`0x0e01`) identifying the current format | 2 | +| checksum | A [CRC32-C checksum](http://www.evanjones.ca/crc32c.html) of everything that comes after it | 4 | +| metadataSize | The size of the message [metadata](#message-metadata) | 4 | +| metadata | The message [metadata](#message-metadata) stored as a binary protobuf message | | +| payload | Anything left in the frame is considered the payload and can include any sequence of bytes | | + +## Message metadata + +Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed on unchanged to the consumer. + +| Field | Description | +|:-------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `producer_name` | The name of the producer that published the message | +| `sequence_id` | The sequence ID of the message, assigned by producer | +| `publish_time` | The publish timestamp in Unix time (i.e. as the number of milliseconds since January 1st, 1970 in UTC) | +| `properties` | A sequence of key/value pairs (using the [`KeyValue`](https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto#L32) message). These are application-defined keys and values with no special meaning to Pulsar. | +| `replicated_from` *(optional)* | Indicates that the message has been replicated and specifies the name of the [cluster](reference-terminology.md#cluster) where the message was originally published | +| `partition_key` *(optional)* | While publishing on a partition topic, if the key is present, the hash of the key is used to determine which partition to choose | +| `compression` *(optional)* | Signals that payload has been compressed and with which compression library | +| `uncompressed_size` *(optional)* | If compression is used, the producer must fill the uncompressed size field with the original payload size | +| `num_messages_in_batch` *(optional)* | If this message is really a [batch](#batch-messages) of multiple entries, this field must be set to the number of messages in the batch | + +### Batch messages + +When using batch messages, the payload will be containing a list of entries, +each of them with its individual metadata, defined by the `SingleMessageMetadata` +object. + + +For a single batch, the payload format will look like this: + + +| Field | Description | +|:--------------|:------------------------------------------------------------| +| metadataSizeN | The size of the single message metadata serialized Protobuf | +| metadataN | Single message metadata | +| payloadN | Message payload passed by application | + +Each metadata field looks like this; + +| Field | Description | +|:---------------------------|:--------------------------------------------------------| +| properties | Application-defined properties | +| partition key *(optional)* | Key to indicate the hashing to a particular partition | +| payload_size | Size of the payload for the single message in the batch | + +When compression is enabled, the whole batch will be compressed at once. + +## Interactions + +### Connection establishment + +After opening a TCP connection to a broker, typically on port 6650, the client +is responsible to initiate the session. + +![Connect interaction](/assets/binary-protocol-connect.png) + +After receiving a `Connected` response from the broker, the client can +consider the connection ready to use. Alternatively, if the broker doesn't +validate the client authentication, it will reply with an `Error` command and +close the TCP connection. + +Example: + +```protobuf + +message CommandConnect { + "client_version" : "Pulsar-Client-Java-v1.15.2", + "auth_method_name" : "my-authentication-plugin", + "auth_data" : "my-auth-data", + "protocol_version" : 6 +} + +``` + +Fields: + * `client_version` → String based identifier. Format is not enforced + * `auth_method_name` → *(optional)* Name of the authentication plugin if auth + enabled + * `auth_data` → *(optional)* Plugin specific authentication data + * `protocol_version` → Indicates the protocol version supported by the + client. Broker will not send commands introduced in newer revisions of the + protocol. Broker might be enforcing a minimum version + +```protobuf + +message CommandConnected { + "server_version" : "Pulsar-Broker-v1.15.2", + "protocol_version" : 6 +} + +``` + +Fields: + * `server_version` → String identifier of broker version + * `protocol_version` → Protocol version supported by the broker. Client + must not attempt to send commands introduced in newer revisions of the + protocol + +### Keep Alive + +To identify prolonged network partitions between clients and brokers or cases +in which a machine crashes without interrupting the TCP connection on the remote +end (eg: power outage, kernel panic, hard reboot...), we have introduced a +mechanism to probe for the availability status of the remote peer. + +Both clients and brokers are sending `Ping` commands periodically and they will +close the socket if a `Pong` response is not received within a timeout (default +used by broker is 60s). + +A valid implementation of a Pulsar client is not required to send the `Ping` +probe, though it is required to promptly reply after receiving one from the +broker in order to prevent the remote side from forcibly closing the TCP connection. + + +### Producer + +In order to send messages, a client needs to establish a producer. When creating +a producer, the broker will first verify that this particular client is +authorized to publish on the topic. + +Once the client gets confirmation of the producer creation, it can publish +messages to the broker, referring to the producer id negotiated before. + +![Producer interaction](/assets/binary-protocol-producer.png) + +##### Command Producer + +```protobuf + +message CommandProducer { + "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", + "producer_id" : 1, + "request_id" : 1 +} + +``` + +Parameters: + * `topic` → Complete topic name to where you want to create the producer on + * `producer_id` → Client generated producer identifier. Needs to be unique + within the same connection + * `request_id` → Identifier for this request. Used to match the response with + the originating request. Needs to be unique within the same connection + * `producer_name` → *(optional)* If a producer name is specified, the name will + be used, otherwise the broker will generate a unique name. Generated + producer name is guaranteed to be globally unique. Implementations are + expected to let the broker generate a new producer name when the producer + is initially created, then reuse it when recreating the producer after + reconnections. + +The broker will reply with either `ProducerSuccess` or `Error` commands. + +##### Command ProducerSuccess + +```protobuf + +message CommandProducerSuccess { + "request_id" : 1, + "producer_name" : "generated-unique-producer-name" +} + +``` + +Parameters: + * `request_id` → Original id of the `CreateProducer` request + * `producer_name` → Generated globally unique producer name or the name + specified by the client, if any. + +##### Command Send + +Command `Send` is used to publish a new message within the context of an +already existing producer. This command is used in a frame that includes command +as well as message payload, for which the complete format is specified in the [payload commands](#payload-commands) section. + +```protobuf + +message CommandSend { + "producer_id" : 1, + "sequence_id" : 0, + "num_messages" : 1 +} + +``` + +Parameters: + * `producer_id` → id of an existing producer + * `sequence_id` → each message has an associated sequence id which is expected + to be implemented with a counter starting at 0. The `SendReceipt` that + acknowledges the effective publishing of a messages will refer to it by + its sequence id. + * `num_messages` → *(optional)* Used when publishing a batch of messages at + once. + +##### Command SendReceipt + +After a message has been persisted on the configured number of replicas, the +broker will send the acknowledgment receipt to the producer. + +```protobuf + +message CommandSendReceipt { + "producer_id" : 1, + "sequence_id" : 0, + "message_id" : { + "ledgerId" : 123, + "entryId" : 456 + } +} + +``` + +Parameters: + * `producer_id` → id of producer originating the send request + * `sequence_id` → sequence id of the published message + * `message_id` → message id assigned by the system to the published message + Unique within a single cluster. Message id is composed of 2 longs, `ledgerId` + and `entryId`, that reflect that this unique id is assigned when appending + to a BookKeeper ledger + + +##### Command CloseProducer + +**Note**: *This command can be sent by either producer or broker*. + +When receiving a `CloseProducer` command, the broker will stop accepting any +more messages for the producer, wait until all pending messages are persisted +and then reply `Success` to the client. + +The broker can send a `CloseProducer` command to client when it's performing +a graceful failover (eg: broker is being restarted, or the topic is being unloaded +by load balancer to be transferred to a different broker). + +When receiving the `CloseProducer`, the client is expected to go through the +service discovery lookup again and recreate the producer again. The TCP +connection is not affected. + +### Consumer + +A consumer is used to attach to a subscription and consume messages from it. +After every reconnection, a client needs to subscribe to the topic. If a +subscription is not already there, a new one will be created. + +![Consumer](/assets/binary-protocol-consumer.png) + +#### Flow control + +After the consumer is ready, the client needs to *give permission* to the +broker to push messages. This is done with the `Flow` command. + +A `Flow` command gives additional *permits* to send messages to the consumer. +A typical consumer implementation will use a queue to accumulate these messages +before the application is ready to consume them. + +After the application has dequeued half of the messages in the queue, the consumer +sends permits to the broker to ask for more messages (equals to half of the messages in the queue). + +For example, if the queue size is 1000 and the consumer consumes 500 messages in the queue. +Then the consumer sends permits to the broker to ask for 500 messages. + +##### Command Subscribe + +```protobuf + +message CommandSubscribe { + "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", + "subscription" : "my-subscription-name", + "subType" : "Exclusive", + "consumer_id" : 1, + "request_id" : 1 +} + +``` + +Parameters: + * `topic` → Complete topic name to where you want to create the consumer on + * `subscription` → Subscription name + * `subType` → Subscription type: Exclusive, Shared, Failover, Key_Shared + * `consumer_id` → Client generated consumer identifier. Needs to be unique + within the same connection + * `request_id` → Identifier for this request. Used to match the response with + the originating request. Needs to be unique within the same connection + * `consumer_name` → *(optional)* Clients can specify a consumer name. This + name can be used to track a particular consumer in the stats. Also, in + Failover subscription type, the name is used to decide which consumer is + elected as *master* (the one receiving messages): consumers are sorted by + their consumer name and the first one is elected master. + +##### Command Flow + +```protobuf + +message CommandFlow { + "consumer_id" : 1, + "messagePermits" : 1000 +} + +``` + +Parameters: +* `consumer_id` → Id of an already established consumer +* `messagePermits` → Number of additional permits to grant to the broker for + pushing more messages + +##### Command Message + +Command `Message` is used by the broker to push messages to an existing consumer, +within the limits of the given permits. + + +This command is used in a frame that includes the message payload as well, for +which the complete format is specified in the [payload commands](#payload-commands) +section. + +```protobuf + +message CommandMessage { + "consumer_id" : 1, + "message_id" : { + "ledgerId" : 123, + "entryId" : 456 + } +} + +``` + +##### Command Ack + +An `Ack` is used to signal to the broker that a given message has been +successfully processed by the application and can be discarded by the broker. + +In addition, the broker will also maintain the consumer position based on the +acknowledged messages. + +```protobuf + +message CommandAck { + "consumer_id" : 1, + "ack_type" : "Individual", + "message_id" : { + "ledgerId" : 123, + "entryId" : 456 + } +} + +``` + +Parameters: + * `consumer_id` → Id of an already established consumer + * `ack_type` → Type of acknowledgment: `Individual` or `Cumulative` + * `message_id` → Id of the message to acknowledge + * `validation_error` → *(optional)* Indicates that the consumer has discarded + the messages due to: `UncompressedSizeCorruption`, + `DecompressionError`, `ChecksumMismatch`, `BatchDeSerializeError` + +##### Command CloseConsumer + +***Note***: **This command can be sent by either producer or broker*. + +This command behaves the same as [`CloseProducer`](#command-closeproducer) + +##### Command RedeliverUnacknowledgedMessages + +A consumer can ask the broker to redeliver some or all of the pending messages +that were pushed to that particular consumer and not yet acknowledged. + +The protobuf object accepts a list of message ids that the consumer wants to +be redelivered. If the list is empty, the broker will redeliver all the +pending messages. + +On redelivery, messages can be sent to the same consumer or, in the case of a +shared subscription, spread across all available consumers. + + +##### Command ReachedEndOfTopic + +This is sent by a broker to a particular consumer, whenever the topic +has been "terminated" and all the messages on the subscription were +acknowledged. + +The client should use this command to notify the application that no more +messages are coming from the consumer. + +##### Command ConsumerStats + +This command is sent by the client to retrieve Subscriber and Consumer level +stats from the broker. +Parameters: + * `request_id` → Id of the request, used to correlate the request + and the response. + * `consumer_id` → Id of an already established consumer. + +##### Command ConsumerStatsResponse + +This is the broker's response to ConsumerStats request by the client. +It contains the Subscriber and Consumer level stats of the `consumer_id` sent in the request. +If the `error_code` or the `error_message` field is set it indicates that the request has failed. + +##### Command Unsubscribe + +This command is sent by the client to unsubscribe the `consumer_id` from the associated topic. +Parameters: + * `request_id` → Id of the request. + * `consumer_id` → Id of an already established consumer which needs to unsubscribe. + + +## Service discovery + +### Topic lookup + +Topic lookup needs to be performed each time a client needs to create or +reconnect a producer or a consumer. Lookup is used to discover which particular +broker is serving the topic we are about to use. + +Lookup can be done with a REST call as described in the [admin API](admin-api-topics.md#lookup-of-topic) +docs. + +Since Pulsar-1.16 it is also possible to perform the lookup within the binary +protocol. + +For the sake of example, let's assume we have a service discovery component +running at `pulsar://broker.example.com:6650` + +Individual brokers will be running at `pulsar://broker-1.example.com:6650`, +`pulsar://broker-2.example.com:6650`, ... + +A client can use a connection to the discovery service host to issue a +`LookupTopic` command. The response can either be a broker hostname to +connect to, or a broker hostname to which retry the lookup. + +The `LookupTopic` command has to be used in a connection that has already +gone through the `Connect` / `Connected` initial handshake. + +![Topic lookup](/assets/binary-protocol-topic-lookup.png) + +```protobuf + +message CommandLookupTopic { + "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", + "request_id" : 1, + "authoritative" : false +} + +``` + +Fields: + * `topic` → Topic name to lookup + * `request_id` → Id of the request that will be passed with its response + * `authoritative` → Initial lookup request should use false. When following a + redirect response, client should pass the same value contained in the + response + +##### LookupTopicResponse + +Example of response with successful lookup: + +```protobuf + +message CommandLookupTopicResponse { + "request_id" : 1, + "response" : "Connect", + "brokerServiceUrl" : "pulsar://broker-1.example.com:6650", + "brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651", + "authoritative" : true +} + +``` + +Example of lookup response with redirection: + +```protobuf + +message CommandLookupTopicResponse { + "request_id" : 1, + "response" : "Redirect", + "brokerServiceUrl" : "pulsar://broker-2.example.com:6650", + "brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651", + "authoritative" : true +} + +``` + +In this second case, we need to reissue the `LookupTopic` command request +to `broker-2.example.com` and this broker will be able to give a definitive +answer to the lookup request. + +### Partitioned topics discovery + +Partitioned topics metadata discovery is used to find out if a topic is a +"partitioned topic" and how many partitions were set up. + +If the topic is marked as "partitioned", the client is expected to create +multiple producers or consumers, one for each partition, using the `partition-X` +suffix. + +This information only needs to be retrieved the first time a producer or +consumer is created. There is no need to do this after reconnections. + +The discovery of partitioned topics metadata works very similar to the topic +lookup. The client send a request to the service discovery address and the +response will contain actual metadata. + +##### Command PartitionedTopicMetadata + +```protobuf + +message CommandPartitionedTopicMetadata { + "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic", + "request_id" : 1 +} + +``` + +Fields: + * `topic` → the topic for which to check the partitions metadata + * `request_id` → Id of the request that will be passed with its response + + +##### Command PartitionedTopicMetadataResponse + +Example of response with metadata: + +```protobuf + +message CommandPartitionedTopicMetadataResponse { + "request_id" : 1, + "response" : "Success", + "partitions" : 32 +} + +``` + +## Protobuf interface + +All Pulsar's Protobuf definitions can be found {@inject: github:here:/pulsar-common/src/main/proto/PulsarApi.proto}. diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-cpp.md b/site2/website-next/versioned_docs/version-2.7.2/developing-cpp.md new file mode 100644 index 0000000000000..9da7a3a4131a4 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/developing-cpp.md @@ -0,0 +1,114 @@ +--- +id: develop-cpp +title: Building Pulsar C++ client +sidebar_label: "Building Pulsar C++ client" +original_id: develop-cpp +--- + +## Supported platforms + +The Pulsar C++ client has been successfully tested on **MacOS** and **Linux**. + +## System requirements + +You need to have the following installed to use the C++ client: + +* [CMake](https://cmake.org/) +* [Boost](http://www.boost.org/) +* [Protocol Buffers](https://developers.google.com/protocol-buffers/) 2.6 +* [Log4CXX](https://logging.apache.org/log4cxx) +* [libcurl](https://curl.haxx.se/libcurl/) +* [Google Test](https://github.com/google/googletest) +* [JsonCpp](https://github.com/open-source-parsers/jsoncpp) + +## Compilation + +There are separate compilation instructions for [MacOS](#macos) and [Linux](#linux). For both systems, start by cloning the Pulsar repository: + +```shell + +$ git clone https://github.com/apache/pulsar + +``` + +### Linux + +First, install all of the necessary dependencies: + +```shell + +$ apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \ + libprotobuf-dev protobuf-compiler libboost-all-dev google-mock libgtest-dev libjsoncpp-dev + +``` + +Then compile and install [Google Test](https://github.com/google/googletest): + +```shell + +# libgtest-dev version is 1.18.0 or above +$ cd /usr/src/googletest +$ sudo cmake . +$ sudo make +$ sudo cp ./googlemock/libgmock.a ./googlemock/gtest/libgtest.a /usr/lib/ + +# less than 1.18.0 +$ cd /usr/src/gtest +$ sudo cmake . +$ sudo make +$ sudo cp libgtest.a /usr/lib + +$ cd /usr/src/gmock +$ sudo cmake . +$ sudo make +$ sudo cp libgmock.a /usr/lib + +``` + +Finally, compile the Pulsar client library for C++ inside the Pulsar repo: + +```shell + +$ cd pulsar-client-cpp +$ cmake . +$ make + +``` + +The resulting files, `libpulsar.so` and `libpulsar.a`, will be placed in the `lib` folder of the repo while two tools, `perfProducer` and `perfConsumer`, will be placed in the `perf` directory. + +### MacOS + +First, install all of the necessary dependencies: + +```shell + +# OpenSSL installation +$ brew install openssl +$ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/ +$ export OPENSSL_ROOT_DIR=/usr/local/opt/openssl/ + +# Protocol Buffers installation +$ brew tap homebrew/versions +$ brew install protobuf260 +$ brew install boost +$ brew install log4cxx + +# Google Test installation +$ git clone https://github.com/google/googletest.git +$ cd googletest +$ cmake . +$ make install + +``` + +Then compile the Pulsar client library in the repo that you cloned: + +```shell + +$ cd pulsar-client-cpp +$ cmake . +$ make + +``` + diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-load-manager.md b/site2/website-next/versioned_docs/version-2.7.2/developing-load-manager.md new file mode 100644 index 0000000000000..509209b6a852d --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/developing-load-manager.md @@ -0,0 +1,227 @@ +--- +id: develop-load-manager +title: Modular load manager +sidebar_label: "Modular load manager" +original_id: develop-load-manager +--- + +The *modular load manager*, implemented in [`ModularLoadManagerImpl`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java), is a flexible alternative to the previously implemented load manager, [`SimpleLoadManagerImpl`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java), which attempts to simplify how load is managed while also providing abstractions so that complex load management strategies may be implemented. + +## Usage + +There are two ways that you can enable the modular load manager: + +1. Change the value of the `loadManagerClassName` parameter in `conf/broker.conf` from `org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl` to `org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl`. +2. Using the `pulsar-admin` tool. Here's an example: + + ```shell + + $ pulsar-admin update-dynamic-config \ + --config loadManagerClassName \ + --value org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl + + ``` + + You can use the same method to change back to the original value. In either case, any mistake in specifying the load manager will cause Pulsar to default to `SimpleLoadManagerImpl`. + +## Verification + +There are a few different ways to determine which load manager is being used: + +1. Use `pulsar-admin` to examine the `loadManagerClassName` element: + + ```shell + + $ bin/pulsar-admin brokers get-all-dynamic-config + { + "loadManagerClassName" : "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl" + } + + ``` + + If there is no `loadManagerClassName` element, then the default load manager is used. + +2. Consult a ZooKeeper load report. With the module load manager, the load report in `/loadbalance/brokers/...` will have many differences. for example the `systemResourceUsage` sub-elements (`bandwidthIn`, `bandwidthOut`, etc.) are now all at the top level. Here is an example load report from the module load manager: + + ```json + + { + "bandwidthIn": { + "limit": 10240000.0, + "usage": 4.256510416666667 + }, + "bandwidthOut": { + "limit": 10240000.0, + "usage": 5.287239583333333 + }, + "bundles": [], + "cpu": { + "limit": 2400.0, + "usage": 5.7353247655435915 + }, + "directMemory": { + "limit": 16384.0, + "usage": 1.0 + } + } + + ``` + + With the simple load manager, the load report in `/loadbalance/brokers/...` will look like this: + + ```json + + { + "systemResourceUsage": { + "bandwidthIn": { + "limit": 10240000.0, + "usage": 0.0 + }, + "bandwidthOut": { + "limit": 10240000.0, + "usage": 0.0 + }, + "cpu": { + "limit": 2400.0, + "usage": 0.0 + }, + "directMemory": { + "limit": 16384.0, + "usage": 1.0 + }, + "memory": { + "limit": 8192.0, + "usage": 3903.0 + } + } + } + + ``` + +3. The command-line [broker monitor](reference-cli-tools.md#monitor-brokers) will have a different output format depending on which load manager implementation is being used. + + Here is an example from the modular load manager: + + ``` + + =================================================================================================================== + ||SYSTEM |CPU % |MEMORY % |DIRECT % |BW IN % |BW OUT % |MAX % || + || |0.00 |48.33 |0.01 |0.00 |0.00 |48.33 || + ||COUNT |TOPIC |BUNDLE |PRODUCER |CONSUMER |BUNDLE + |BUNDLE - || + || |4 |4 |0 |2 |4 |0 || + ||LATEST |MSG/S IN |MSG/S OUT |TOTAL |KB/S IN |KB/S OUT |TOTAL || + || |0.00 |0.00 |0.00 |0.00 |0.00 |0.00 || + ||SHORT |MSG/S IN |MSG/S OUT |TOTAL |KB/S IN |KB/S OUT |TOTAL || + || |0.00 |0.00 |0.00 |0.00 |0.00 |0.00 || + ||LONG |MSG/S IN |MSG/S OUT |TOTAL |KB/S IN |KB/S OUT |TOTAL || + || |0.00 |0.00 |0.00 |0.00 |0.00 |0.00 || + =================================================================================================================== + + ``` + + Here is an example from the simple load manager: + + ``` + + =================================================================================================================== + ||COUNT |TOPIC |BUNDLE |PRODUCER |CONSUMER |BUNDLE + |BUNDLE - || + || |4 |4 |0 |2 |0 |0 || + ||RAW SYSTEM |CPU % |MEMORY % |DIRECT % |BW IN % |BW OUT % |MAX % || + || |0.25 |47.94 |0.01 |0.00 |0.00 |47.94 || + ||ALLOC SYSTEM |CPU % |MEMORY % |DIRECT % |BW IN % |BW OUT % |MAX % || + || |0.20 |1.89 | |1.27 |3.21 |3.21 || + ||RAW MSG |MSG/S IN |MSG/S OUT |TOTAL |KB/S IN |KB/S OUT |TOTAL || + || |0.00 |0.00 |0.00 |0.01 |0.01 |0.01 || + ||ALLOC MSG |MSG/S IN |MSG/S OUT |TOTAL |KB/S IN |KB/S OUT |TOTAL || + || |54.84 |134.48 |189.31 |126.54 |320.96 |447.50 || + =================================================================================================================== + + ``` + +It is important to note that the module load manager is _centralized_, meaning that all requests to assign a bundle---whether it's been seen before or whether this is the first time---only get handled by the _lead_ broker (which can change over time). To determine the current lead broker, examine the `/loadbalance/leader` node in ZooKeeper. + +## Implementation + +### Data + +The data monitored by the modular load manager is contained in the [`LoadData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java) class. +Here, the available data is subdivided into the bundle data and the broker data. + +#### Broker + +The broker data is contained in the [`BrokerData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/BrokerData.java) class. It is further subdivided into two parts, +one being the local data which every broker individually writes to ZooKeeper, and the other being the historical broker +data which is written to ZooKeeper by the leader broker. + +##### Local Broker Data +The local broker data is contained in the class [`LocalBrokerData`](https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java) and provides information about the following resources: + +* CPU usage +* JVM heap memory usage +* Direct memory usage +* Bandwidth in/out usage +* Most recent total message rate in/out across all bundles +* Total number of topics, bundles, producers, and consumers +* Names of all bundles assigned to this broker +* Most recent changes in bundle assignments for this broker + +The local broker data is updated periodically according to the service configuration +"loadBalancerReportUpdateMaxIntervalMinutes". After any broker updates their local broker data, the leader broker will +receive the update immediately via a ZooKeeper watch, where the local data is read from the ZooKeeper node +`/loadbalance/brokers/` + +##### Historical Broker Data + +The historical broker data is contained in the [`TimeAverageBrokerData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/TimeAverageBrokerData.java) class. + +In order to reconcile the need to make good decisions in a steady-state scenario and make reactive decisions in a critical scenario, the historical data is split into two parts: the short-term data for reactive decisions, and the long-term data for steady-state decisions. Both time frames maintain the following information: + +* Message rate in/out for the entire broker +* Message throughput in/out for the entire broker + +Unlike the bundle data, the broker data does not maintain samples for the global broker message rates and throughputs, which is not expected to remain steady as new bundles are removed or added. Instead, this data is aggregated over the short-term and long-term data for the bundles. See the section on bundle data to understand how that data is collected and maintained. + +The historical broker data is updated for each broker in memory by the leader broker whenever any broker writes their local data to ZooKeeper. Then, the historical data is written to ZooKeeper by the leader broker periodically according to the configuration `loadBalancerResourceQuotaUpdateIntervalMinutes`. + +##### Bundle Data + +The bundle data is contained in the [`BundleData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/BundleData.java). Like the historical broker data, the bundle data is split into a short-term and a long-term time frame. The information maintained in each time frame: + +* Message rate in/out for this bundle +* Message Throughput In/Out for this bundle +* Current number of samples for this bundle + +The time frames are implemented by maintaining the average of these values over a set, limited number of samples, where +the samples are obtained through the message rate and throughput values in the local data. Thus, if the update interval +for the local data is 2 minutes, the number of short samples is 10 and the number of long samples is 1000, the +short-term data is maintained over a period of `10 samples * 2 minutes / sample = 20 minutes`, while the long-term +data is similarly over a period of 2000 minutes. Whenever there are not enough samples to satisfy a given time frame, +the average is taken only over the existing samples. When no samples are available, default values are assumed until +they are overwritten by the first sample. Currently, the default values are + +* Message rate in/out: 50 messages per second both ways +* Message throughput in/out: 50KB per second both ways + +The bundle data is updated in memory on the leader broker whenever any broker writes their local data to ZooKeeper. +Then, the bundle data is written to ZooKeeper by the leader broker periodically at the same time as the historical +broker data, according to the configuration `loadBalancerResourceQuotaUpdateIntervalMinutes`. + +### Traffic Distribution + +The modular load manager uses the abstraction provided by [`ModularLoadManagerStrategy`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java) to make decisions about bundle assignment. The strategy makes a decision by considering the service configuration, the entire load data, and the bundle data for the bundle to be assigned. Currently, the only supported strategy is [`LeastLongTermMessageRate`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java), though soon users will have the ability to inject their own strategies if desired. + +#### Least Long Term Message Rate Strategy + +As its name suggests, the least long term message rate strategy attempts to distribute bundles across brokers so that +the message rate in the long-term time window for each broker is roughly the same. However, simply balancing load based +on message rate does not handle the issue of asymmetric resource burden per message on each broker. Thus, the system +resource usages, which are CPU, memory, direct memory, bandwidth in, and bandwidth out, are also considered in the +assignment process. This is done by weighting the final message rate according to +`1 / (overload_threshold - max_usage)`, where `overload_threshold` corresponds to the configuration +`loadBalancerBrokerOverloadedThresholdPercentage` and `max_usage` is the maximum proportion among the system resources +that is being utilized by the candidate broker. This multiplier ensures that machines with are being more heavily taxed +by the same message rates will receive less load. In particular, it tries to ensure that if one machine is overloaded, +then all machines are approximately overloaded. In the case in which a broker's max usage exceeds the overload +threshold, that broker is not considered for bundle assignment. If all brokers are overloaded, the bundle is randomly +assigned. + diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-tools.md b/site2/website-next/versioned_docs/version-2.7.2/developing-tools.md new file mode 100644 index 0000000000000..b5457790b8081 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/developing-tools.md @@ -0,0 +1,111 @@ +--- +id: develop-tools +title: Simulation tools +sidebar_label: "Simulation tools" +original_id: develop-tools +--- + +It is sometimes necessary create an test environment and incur artificial load to observe how well load managers +handle the load. The load simulation controller, the load simulation client, and the broker monitor were created as an +effort to make create this load and observe the effects on the managers more easily. + +## Simulation Client +The simulation client is a machine which will create and subscribe to topics with configurable message rates and sizes. +Because it is sometimes necessary in simulating large load to use multiple client machines, the user does not interact +with the simulation client directly, but instead delegates their requests to the simulation controller, which will then +send signals to clients to start incurring load. The client implementation is in the class +`org.apache.pulsar.testclient.LoadSimulationClient`. + +### Usage +To Start a simulation client, use the `pulsar-perf` script with the command `simulation-client` as follows: + +``` + +pulsar-perf simulation-client --port --service-url + +``` + +The client will then be ready to receive controller commands. +## Simulation Controller +The simulation controller send signals to the simulation clients, requesting them to create new topics, stop old +topics, change the load incurred by topics, as well as several other tasks. It is implemented in the class +`org.apache.pulsar.testclient.LoadSimulationController` and presents a shell to the user as an interface to send +command with. + +### Usage +To start a simulation controller, use the `pulsar-perf` script with the command `simulation-controller` as follows: + +``` + +pulsar-perf simulation-controller --cluster --client-port +--clients + +``` + +The clients should already be started before the controller is started. You will then be presented with a simple prompt, +where you can issue commands to simulation clients. Arguments often refer to tenant names, namespace names, and topic +names. In all cases, the BASE name of the tenants, namespaces, and topics are used. For example, for the topic +`persistent://my_tenant/my_cluster/my_namespace/my_topic`, the tenant name is `my_tenant`, the namespace name is +`my_namespace`, and the topic name is `my_topic`. The controller can perform the following actions: + +* Create a topic with a producer and a consumer + * `trade [--rate ] + [--rand-rate ,] + [--size ]` +* Create a group of topics with a producer and a consumer + * `trade_group [--rate ] + [--rand-rate ,] + [--separation ] [--size ] + [--topics-per-namespace ]` +* Change the configuration of an existing topic + * `change [--rate ] + [--rand-rate ,] + [--size ]` +* Change the configuration of a group of topics + * `change_group [--rate ] [--rand-rate ,] + [--size ] [--topics-per-namespace ]` +* Shutdown a previously created topic + * `stop ` +* Shutdown a previously created group of topics + * `stop_group ` +* Copy the historical data from one ZooKeeper to another and simulate based on the message rates and sizes in that history + * `copy [--rate-multiplier value]` +* Simulate the load of the historical data on the current ZooKeeper (should be same ZooKeeper being simulated on) + * `simulate [--rate-multiplier value]` +* Stream the latest data from the given active ZooKeeper to simulate the real-time load of that ZooKeeper. + * `stream [--rate-multiplier value]` + +The "group" arguments in these commands allow the user to create or affect multiple topics at once. Groups are created +when calling the `trade_group` command, and all topics from these groups may be subsequently modified or stopped +with the `change_group` and `stop_group` commands respectively. All ZooKeeper arguments are of the form +`zookeeper_host:port`. + +### Difference Between Copy, Simulate, and Stream +The commands `copy`, `simulate`, and `stream` are very similar but have significant differences. `copy` is used when +you want to simulate the load of a static, external ZooKeeper on the ZooKeeper you are simulating on. Thus, +`source zookeeper` should be the ZooKeeper you want to copy and `target zookeeper` should be the ZooKeeper you are +simulating on, and then it will get the full benefit of the historical data of the source in both load manager +implementations. `simulate` on the other hand takes in only one ZooKeeper, the one you are simulating on. It assumes +that you are simulating on a ZooKeeper that has historical data for `SimpleLoadManagerImpl` and creates equivalent +historical data for `ModularLoadManagerImpl`. Then, the load according to the historical data is simulated by the +clients. Finally, `stream` takes in an active ZooKeeper different than the ZooKeeper being simulated on and streams +load data from it and simulates the real-time load. In all cases, the optional `rate-multiplier` argument allows the +user to simulate some proportion of the load. For instance, using `--rate-multiplier 0.05` will cause messages to +be sent at only `5%` of the rate of the load that is being simulated. + +## Broker Monitor +To observe the behavior of the load manager in these simulations, one may utilize the broker monitor, which is +implemented in `org.apache.pulsar.testclient.BrokerMonitor`. The broker monitor will print tabular load data to the +console as it is updated using watchers. + +### Usage +To start a broker monitor, use the `monitor-brokers` command in the `pulsar-perf` script: + +``` + +pulsar-perf monitor-brokers --connect-string + +``` + +The console will then continuously print load data until it is interrupted. + diff --git a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json index 7110f11c8c62e..8ae4e71def8c1 100644 --- a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json +++ b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json @@ -86,6 +86,48 @@ } ] }, + { + "type": "category", + "label": "Pulsar Functions", + "items": [ + { + "type": "doc", + "id": "version-2.7.2/functions-overview" + }, + { + "type": "doc", + "id": "version-2.7.2/functions-worker" + }, + { + "type": "doc", + "id": "version-2.7.2/functions-runtime" + }, + { + "type": "doc", + "id": "version-2.7.2/functions-develop" + }, + { + "type": "doc", + "id": "version-2.7.2/functions-package" + }, + { + "type": "doc", + "id": "version-2.7.2/functions-debug" + }, + { + "type": "doc", + "id": "version-2.7.2/functions-deploy" + }, + { + "type": "doc", + "id": "version-2.7.2/functions-cli" + }, + { + "type": "doc", + "id": "version-2.7.2/window-functions-context" + } + ] + }, { "type": "category", "label": "Pulsar IO", @@ -172,6 +214,24 @@ } ] }, + { + "type": "category", + "label": "Transactions", + "items": [ + { + "type": "doc", + "id": "version-2.7.2/transactions" + }, + { + "type": "doc", + "id": "version-2.7.2/transactions-guarantee" + }, + { + "type": "doc", + "id": "version-2.7.2/transactions-api" + } + ] + }, { "type": "category", "label": "Kubernetes (Helm)", @@ -204,19 +264,35 @@ }, { "type": "category", - "label": "Transactions", + "label": "Deployment", "items": [ { "type": "doc", - "id": "version-2.7.2/transactions" + "id": "version-2.7.2/deploy-aws" }, { "type": "doc", - "id": "version-2.7.2/transactions-guarantee" + "id": "version-2.7.2/deploy-kubernetes" }, { "type": "doc", - "id": "version-2.7.2/transactions-api" + "id": "version-2.7.2/deploy-bare-metal" + }, + { + "type": "doc", + "id": "version-2.7.2/deploy-bare-metal-multi-cluster" + }, + { + "type": "doc", + "id": "version-2.7.2/deploy-dcos" + }, + { + "type": "doc", + "id": "version-2.7.2/deploy-docker" + }, + { + "type": "doc", + "id": "version-2.7.2/deploy-monitoring" } ] }, @@ -260,73 +336,55 @@ }, { "type": "category", - "label": "Deployment", + "label": "Security", "items": [ { "type": "doc", - "id": "version-2.7.2/deploy-aws" - }, - { - "type": "doc", - "id": "version-2.7.2/deploy-kubernetes" - }, - { - "type": "doc", - "id": "version-2.7.2/deploy-bare-metal" + "id": "version-2.7.2/security-overview" }, { "type": "doc", - "id": "version-2.7.2/deploy-bare-metal-multi-cluster" + "id": "version-2.7.2/security-tls-transport" }, { "type": "doc", - "id": "version-2.7.2/deploy-dcos" + "id": "version-2.7.2/security-tls-authentication" }, { "type": "doc", - "id": "version-2.7.2/deploy-docker" + "id": "version-2.7.2/security-tls-keystore" }, { "type": "doc", - "id": "version-2.7.2/deploy-monitoring" - } - ] - }, - { - "type": "category", - "label": "Client Libraries", - "items": [ - { - "type": "doc", - "id": "version-2.7.2/client-libraries" + "id": "version-2.7.2/security-jwt" }, { "type": "doc", - "id": "version-2.7.2/client-libraries-java" + "id": "version-2.7.2/security-athenz" }, { "type": "doc", - "id": "version-2.7.2/client-libraries-go" + "id": "version-2.7.2/security-kerberos" }, { "type": "doc", - "id": "version-2.7.2/client-libraries-python" + "id": "version-2.7.2/security-oauth2" }, { "type": "doc", - "id": "version-2.7.2/client-libraries-cpp" + "id": "version-2.7.2/security-authorization" }, { "type": "doc", - "id": "version-2.7.2/client-libraries-node" + "id": "version-2.7.2/security-encryption" }, { "type": "doc", - "id": "version-2.7.2/client-libraries-websocket" + "id": "version-2.7.2/security-extending" }, { "type": "doc", - "id": "version-2.7.2/client-libraries-dotnet" + "id": "version-2.7.2/security-bouncy-castle" } ] }, @@ -342,103 +400,95 @@ }, { "type": "category", - "label": "Security", + "label": "Client Libraries", "items": [ { "type": "doc", - "id": "version-2.7.2/security-overview" + "id": "version-2.7.2/client-libraries" }, { "type": "doc", - "id": "version-2.7.2/security-tls-transport" + "id": "version-2.7.2/client-libraries-java" }, { "type": "doc", - "id": "version-2.7.2/security-tls-authentication" + "id": "version-2.7.2/client-libraries-go" }, { "type": "doc", - "id": "version-2.7.2/security-tls-keystore" + "id": "version-2.7.2/client-libraries-python" }, { "type": "doc", - "id": "version-2.7.2/security-jwt" + "id": "version-2.7.2/client-libraries-cpp" }, { "type": "doc", - "id": "version-2.7.2/security-athenz" + "id": "version-2.7.2/client-libraries-node" }, { "type": "doc", - "id": "version-2.7.2/security-kerberos" + "id": "version-2.7.2/client-libraries-websocket" }, { "type": "doc", - "id": "version-2.7.2/security-oauth2" - }, + "id": "version-2.7.2/client-libraries-dotnet" + } + ] + }, + { + "type": "category", + "label": "Admin API", + "items": [ { "type": "doc", - "id": "version-2.7.2/security-authorization" + "id": "version-2.7.2/admin-api-overview" }, { "type": "doc", - "id": "version-2.7.2/security-encryption" + "id": "version-2.7.2/admin-api-clusters" }, { "type": "doc", - "id": "version-2.7.2/security-extending" + "id": "version-2.7.2/admin-api-tenants" }, { "type": "doc", - "id": "version-2.7.2/security-bouncy-castle" - } - ] - }, - { - "type": "category", - "label": "Reference", - "items": [ + "id": "version-2.7.2/admin-api-brokers" + }, { "type": "doc", - "id": "version-2.7.2/reference-terminology" + "id": "version-2.7.2/admin-api-namespaces" }, { "type": "doc", - "id": "version-2.7.2/reference-cli-tools" + "id": "version-2.7.2/admin-api-permissions" }, { "type": "doc", - "id": "version-2.7.2/reference-configuration" + "id": "version-2.7.2/admin-api-topics" }, { "type": "doc", - "id": "version-2.7.2/reference-metrics" + "id": "version-2.7.2/admin-api-functions" } ] }, { "type": "category", - "label": "Development", + "label": "Adaptors", "items": [ { "type": "doc", - "id": "version-2.7.2/develop-tools" - }, - { - "type": "doc", - "id": "version-2.7.2/develop-binary-protocol" - }, - { - "type": "doc", - "id": "version-2.7.2/develop-schema" + "id": "version-2.7.2/adaptors-kafka" }, { "type": "doc", - "id": "version-2.7.2/develop-load-manager" + "id": "version-2.7.2/adaptors-spark" }, { "type": "doc", - "id": "version-2.7.2/develop-cpp" + "id": "version-2.7.2/adaptors-storm" } ] }, @@ -478,57 +528,49 @@ }, { "type": "category", - "label": "Adaptors", + "label": "Development", "items": [ { "type": "doc", - "id": "version-2.7.2/adaptors-kafka" - }, - { - "type": "doc", - "id": "version-2.7.2/adaptors-spark" + "id": "version-2.7.2/develop-tools" }, { "type": "doc", - "id": "version-2.7.2/adaptors-storm" - } - ] - }, - { - "type": "category", - "label": "Admin API", - "items": [ - { - "type": "doc", - "id": "version-2.7.2/admin-api-overview" + "id": "version-2.7.2/develop-binary-protocol" }, { "type": "doc", - "id": "version-2.7.2/admin-api-clusters" + "id": "version-2.7.2/develop-schema" }, { "type": "doc", - "id": "version-2.7.2/admin-api-tenants" + "id": "version-2.7.2/develop-load-manager" }, { "type": "doc", - "id": "version-2.7.2/admin-api-brokers" - }, + "id": "version-2.7.2/develop-cpp" + } + ] + }, + { + "type": "category", + "label": "Reference", + "items": [ { "type": "doc", - "id": "version-2.7.2/admin-api-namespaces" + "id": "version-2.7.2/reference-terminology" }, { "type": "doc", - "id": "version-2.7.2/admin-api-permissions" + "id": "version-2.7.2/reference-cli-tools" }, { "type": "doc", - "id": "version-2.7.2/admin-api-topics" + "id": "version-2.7.2/reference-configuration" }, { "type": "doc", - "id": "version-2.7.2/admin-api-functions" + "id": "version-2.7.2/reference-metrics" } ] }