From 879e93d07c36567f6ba7792cc9274142ff4c0d74 Mon Sep 17 00:00:00 2001 From: fengtao1998 <37149842+fengtao1998@users.noreply.github.com> Date: Wed, 29 Sep 2021 15:12:15 +0800 Subject: [PATCH] [Docs]Update docs for Client libraries C++ (#12220) * Update client-libraries-cpp.md * Update client-libraries-cpp.md * Update client-libraries-cpp.md * Update client-libraries-cpp.md * Update client-libraries-cpp.md --- site2/docs/client-libraries-cpp.md | 17 +- .../version-2.8.1/client-libraries-cpp.md | 391 +++++++++++++++--- 2 files changed, 339 insertions(+), 69 deletions(-) diff --git a/site2/docs/client-libraries-cpp.md b/site2/docs/client-libraries-cpp.md index de6fbb52cf5b1..e9f81fa0f73cf 100644 --- a/site2/docs/client-libraries-cpp.md +++ b/site2/docs/client-libraries-cpp.md @@ -10,7 +10,7 @@ All the methods in producer, consumer, and reader of a C++ client are thread-saf ## Supported platforms -Pulsar C++ client is supported on **Linux** and **MacOS** platforms. +Pulsar C++ client is supported on **Linux** ,**MacOS** and **Windows** platforms. [Doxygen](http://www.doxygen.nl/)-generated API docs for the C++ client are available [here](/api/cpp). @@ -258,6 +258,11 @@ cmake -B ./build -A x64 -DBUILD_PYTHON_WRAPPER=OFF -DBUILD_TESTS=OFF -DVCPKG_TRI cmake --build ./build --config Release ``` +> **NOTE** +> +> 1. For Windows 32-bit, you need to use `-A Win32` and `-DVCPKG_TRIPLET=x86-windows`. +> 2. For MSVC Debug mode, you need to replace `Release` with `Debug` for both `CMAKE_BUILD_TYPE` variable and `--config` option. + 4. Client libraries are available in the following places. ``` @@ -288,8 +293,8 @@ pulsar+ssl://pulsar.us-west.example.com:6651 ## Create a consumer To use Pulsar as a consumer, you need to create a consumer on the C++ client. There are two main ways of using the consumer: -- Blocking style: synchronously calling `receive(msg)`. -- Non-blocking (event based) style: using a message listener. +- [Blocking style](#blocking-example): synchronously calling `receive(msg)`. +- [Non-blocking](#consumer-with-a-message-listener) (event based) style: using a message listener. ### Blocking example @@ -335,7 +340,7 @@ int main() { ### Consumer with a message listener -We can avoid the need to run a loop with blocking calls with an event based style by using a message listener which is invoked for each message that is received. +You can avoid running a loop with blocking calls with an event based style by using a message listener which is invoked for each message that is received. This example starts a subscription at the earliest offset and consumes 100 messages. @@ -386,8 +391,8 @@ int main() { ## Create a producer To use Pulsar as a producer, you need to create a producer on the C++ client. There are two main ways of using a producer: -- Blocking style where each call to `send` waits for an ack from the broker. -- Non-blocking asynchronous style where `sendAsync` is called instead of `send` and a callback is supplied for when the ack is received from the broker. +- [Blocking style](#simple-blocking-example) : each call to `send` waits for an ack from the broker. +- [Non-blocking asynchronous style](#non-blocking-example) : `sendAsync` is called instead of `send` and a callback is supplied for when the ack is received from the broker. ### Simple blocking example diff --git a/site2/website/versioned_docs/version-2.8.1/client-libraries-cpp.md b/site2/website/versioned_docs/version-2.8.1/client-libraries-cpp.md index 2c679d1d7c574..d1b946bf42ca2 100644 --- a/site2/website/versioned_docs/version-2.8.1/client-libraries-cpp.md +++ b/site2/website/versioned_docs/version-2.8.1/client-libraries-cpp.md @@ -11,7 +11,7 @@ All the methods in producer, consumer, and reader of a C++ client are thread-saf ## Supported platforms -Pulsar C++ client is supported on **Linux** and **MacOS** platforms. +Pulsar C++ client is supported on **Linux** ,**MacOS** and **Windows** platforms. [Doxygen](http://www.doxygen.nl/)-generated API docs for the C++ client are available [here](/api/cpp). @@ -21,8 +21,8 @@ You need to install the following components before using the C++ client: * [CMake](https://cmake.org/) * [Boost](http://www.boost.org/) -* [Protocol Buffers](https://developers.google.com/protocol-buffers/) 2.6 -* [libcurl](https://curl.haxx.se/libcurl/) +* [Protocol Buffers](https://developers.google.com/protocol-buffers/) >= 3 +* [libcurl](https://curl.se/libcurl/) * [Google Test](https://github.com/google/googletest) ## Linux @@ -128,6 +128,9 @@ $ rpm -ivh apache-pulsar-client*.rpm After you install RPM successfully, Pulsar libraries are in the `/usr/lib` directory. +> **Note** +> If you get the error that `libpulsar.so: cannot open shared object file: No such file or directory` when starting Pulsar client, you may need to run `ldconfig` first. + ### Install Debian 1. Download a Debian package from the links in the table. @@ -206,10 +209,8 @@ $ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/ $ export OPENSSL_ROOT_DIR=/usr/local/opt/openssl/ # Protocol Buffers installation -$ brew tap homebrew/versions -$ brew install protobuf260 -$ brew install boost -$ brew install log4cxx +$ brew install protobuf boost boost-python log4cxx +# If you are using python3, you need to install boost-python3 # Google Test installation $ git clone https://github.com/google/googletest.git @@ -234,6 +235,42 @@ Pulsar releases are available in the [Homebrew](https://brew.sh/) core repositor brew install libpulsar ``` +## Windows (64-bit) + +### Compilation + +1. Clone the Pulsar repository. + +```shell +$ git clone https://github.com/apache/pulsar +``` + +2. Install all necessary dependencies. + +```shell +cd ${PULSAR_HOME}/pulsar-client-cpp +vcpkg install --feature-flags=manifests --triplet x64-windows +``` + +3. Build C++ libraries. + +```shell +cmake -B ./build -A x64 -DBUILD_PYTHON_WRAPPER=OFF -DBUILD_TESTS=OFF -DVCPKG_TRIPLET=x64-windows -DCMAKE_BUILD_TYPE=Release -S . +cmake --build ./build --config Release +``` + +> **NOTE** +> +> 1. For Windows 32-bit, you need to use `-A Win32` and `-DVCPKG_TRIPLET=x86-windows`. +> 2. For MSVC Debug mode, you need to replace `Release` with `Debug` for both `CMAKE_BUILD_TYPE` variable and `--config` option. + +4. Client libraries are available in the following places. + +``` +${PULSAR_HOME}/pulsar-client-cpp/build/lib/Release/pulsar.lib +${PULSAR_HOME}/pulsar-client-cpp/build/lib/Release/pulsar.dll +``` + ## Connection URLs To connect Pulsar using client libraries, you need to specify a Pulsar protocol URL. @@ -256,52 +293,227 @@ pulsar+ssl://pulsar.us-west.example.com:6651 ## Create a consumer -To use Pulsar as a consumer, you need to create a consumer on the C++ client. The following is an example. +To use Pulsar as a consumer, you need to create a consumer on the C++ client. There are two main ways of using the consumer: +- [Blocking style](#blocking-example): synchronously calling `receive(msg)`. +- [Non-blocking](#consumer-with-a-message-listener) (event based) style: using a message listener. -```c++ -Client client("pulsar://localhost:6650"); +### Blocking example + +The benefit of this approach is that it is the simplest code. Simply keeps calling `receive(msg)` which blocks until a message is received. + +This example starts a subscription at the earliest offset and consumes 100 messages. -Consumer consumer; -Result result = client.subscribe("my-topic", "my-subscription-name", consumer); -if (result != ResultOk) { - LOG_ERROR("Failed to subscribe: " << result); - return -1; +```c++ +#include + +using namespace pulsar; + +int main() { + Client client("pulsar://localhost:6650"); + + Consumer consumer; + ConsumerConfiguration config; + config.setSubscriptionInitialPosition(InitialPositionEarliest); + Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer); + if (result != ResultOk) { + std::cout << "Failed to subscribe: " << result << std::endl; + return -1; + } + + Message msg; + int ctr = 0; + // consume 100 messages + while (ctr < 100) { + consumer.receive(msg); + std::cout << "Received: " << msg + << " with payload '" << msg.getDataAsString() << "'" << std::endl; + + consumer.acknowledge(msg); + ctr++; + } + + std::cout << "Finished consuming synchronously!" << std::endl; + + client.close(); + return 0; } +``` + +### Consumer with a message listener -Message msg; +You can avoid running a loop with blocking calls with an event based style by using a message listener which is invoked for each message that is received. -while (true) { - consumer.receive(msg); - LOG_INFO("Received: " << msg - << " with payload '" << msg.getDataAsString() << "'"); +This example starts a subscription at the earliest offset and consumes 100 messages. + +```c++ +#include +#include +#include + +using namespace pulsar; + +std::atomic messagesReceived; + +void handleAckComplete(Result res) { + std::cout << "Ack res: " << res << std::endl; +} - consumer.acknowledge(msg); +void listener(Consumer consumer, const Message& msg) { + std::cout << "Got message " << msg << " with content '" << msg.getDataAsString() << "'" << std::endl; + messagesReceived++; + consumer.acknowledgeAsync(msg.getMessageId(), handleAckComplete); } -client.close(); +int main() { + Client client("pulsar://localhost:6650"); + + Consumer consumer; + ConsumerConfiguration config; + config.setMessageListener(listener); + config.setSubscriptionInitialPosition(InitialPositionEarliest); + Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer); + if (result != ResultOk) { + std::cout << "Failed to subscribe: " << result << std::endl; + return -1; + } + + // wait for 100 messages to be consumed + while (messagesReceived < 100) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::cout << "Finished consuming asynchronously!" << std::endl; + + client.close(); + return 0; +} ``` ## Create a producer -To use Pulsar as a producer, you need to create a producer on the C++ client. The following is an example. +To use Pulsar as a producer, you need to create a producer on the C++ client. There are two main ways of using a producer: +- [Blocking style](#simple-blocking-example) : each call to `send` waits for an ack from the broker. +- [Non-blocking asynchronous style](#non-blocking-example) : `sendAsync` is called instead of `send` and a callback is supplied for when the ack is received from the broker. + +### Simple blocking example + +This example sends 100 messages using the blocking style. While simple, it does not produce high throughput as it waits for each ack to come back before sending the next message. + +```c++ +#include +#include + +using namespace pulsar; + +int main() { + Client client("pulsar://localhost:6650"); + + Result result = client.createProducer("persistent://public/default/my-topic", producer); + if (result != ResultOk) { + std::cout << "Error creating producer: " << result << std::endl; + return -1; + } + + // Send 100 messages synchronously + int ctr = 0; + while (ctr < 100) { + std::string content = "msg" + std::to_string(ctr); + Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build(); + Result result = producer.send(msg); + if (result != ResultOk) { + std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl; + } else { + std::cout << "The message " << content << " sent successfully" << std::endl; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ctr++; + } + + std::cout << "Finished producing synchronously!" << std::endl; + + client.close(); + return 0; +} +``` + +### Non-blocking example + +This example sends 100 messages using the non-blocking style calling `sendAsync` instead of `send`. This allows the producer to have multiple messages inflight at a time which increases throughput. + +The producer configuration `blockIfQueueFull` is useful here to avoid `ResultProducerQueueIsFull` errors when the internal queue for outgoing send requests becomes full. Once the internal queue is full, `sendAsync` becomes blocking which can make your code simpler. + +Without this configuration, the result code `ResultProducerQueueIsFull` is passed to the callback. You must decide how to deal with that (retry, discard etc). ```c++ -Client client("pulsar://localhost:6650"); +#include +#include + +using namespace pulsar; -Producer producer; -Result result = client.createProducer("my-topic", producer); -if (result != ResultOk) { - LOG_ERROR("Error creating producer: " << result); - return -1; +std::atomic acksReceived; + +void callback(Result code, const MessageId& msgId, std::string msgContent) { + // message processing logic here + std::cout << "Received ack for msg: " << msgContent << " with code: " + << code << " -- MsgID: " << msgId << std::endl; + acksReceived++; } -// Publish 10 messages to the topic -for (int i = 0; i < 10; i++){ - Message msg = MessageBuilder().setContent("my-message").build(); - Result res = producer.send(msg); - LOG_INFO("Message sent: " << res); +int main() { + Client client("pulsar://localhost:6650"); + + ProducerConfiguration producerConf; + producerConf.setBlockIfQueueFull(true); + Producer producer; + Result result = client.createProducer("persistent://public/default/my-topic", + producerConf, producer); + if (result != ResultOk) { + std::cout << "Error creating producer: " << result << std::endl; + return -1; + } + + // Send 100 messages asynchronously + int ctr = 0; + while (ctr < 100) { + std::string content = "msg" + std::to_string(ctr); + Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build(); + producer.sendAsync(msg, std::bind(callback, + std::placeholders::_1, std::placeholders::_2, content)); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ctr++; + } + + // wait for 100 messages to be acked + while (acksReceived < 100) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::cout << "Finished producing asynchronously!" << std::endl; + + client.close(); + return 0; } -client.close(); +``` + +### Partitioned topics and lazy producers + +When scaling out a Pulsar topic, you may configure a topic to have hundreds of partitions. Likewise, you may have also scaled out your producers so there are hundreds or even thousands of producers. This can put some strain on the Pulsar brokers as when you create a producer on a partitioned topic, internally it creates one internal producer per partition which involves communications to the brokers for each one. So for a topic with 1000 partitions and 1000 producers, it ends up creating 1,000,000 internal producers across the producer applications, each of which has to communicate with a broker to find out which broker it should connect to and then perform the connection handshake. + +You can reduce the load caused by this combination of a large number of partitions and many producers by doing the following: +- use SinglePartition partition routing mode (this ensures that all messages are only sent to a single, randomly selected partition) +- use non-keyed messages (when messages are keyed, routing is based on the hash of the key and so messages will end up being sent to multiple partitions) +- use lazy producers (this ensures that an internal producer is only created on demand when a message needs to be routed to a partition) + +With our example above, that reduces the number of internal producers spread out over the 1000 producer apps from 1,000,000 to just 1000. + +Note that there can be extra latency for the first message sent. If you set a low send timeout, this timeout could be reached if the initial connection handshake is slow to complete. + +```c++ +ProducerConfiguration producerConf; +producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); +producerConf.setLazyStartPartitionedProducers(true); ``` ## Enable authentication in connection URLs @@ -322,32 +534,85 @@ For complete examples, refer to [C++ client examples](https://github.com/apache/ ## Schema -This section describes some examples about schema. For more information about schema, see [Pulsar schema](schema-get-started.md). - -### Create producer with Avro schema +This section describes some examples about schema. For more information about +schema, see [Pulsar schema](schema-get-started.md). + +### Avro schema + +- The following example shows how to create a producer with an Avro schema. + + ```cpp + static const std::string exampleSchema = + "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," + "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; + Producer producer; + ProducerConfiguration producerConf; + producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); + client.createProducer("topic-avro", producerConf, producer); + ``` + +- The following example shows how to create a consumer with an Avro schema. + + ```cpp + static const std::string exampleSchema = + "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," + "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; + ConsumerConfiguration consumerConf; + Consumer consumer; + consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); + client.subscribe("topic-avro", "sub-2", consumerConf, consumer) + ``` + +### ProtobufNative schema + +The following example shows how to create a producer and a consumer with a ProtobufNative schema. +​ +1. Generate the `User` class using Protobuf3. + + > **Note** + > You need to use Protobuf3 or later versions. +​ + ```protobuf + syntax = "proto3"; + + message User { + string name = 1; + int32 age = 2; + } + ``` +​ +2. Include the `ProtobufNativeSchema.h` in your source code. Ensure the Protobuf dependency has been added to your project. +​ + ```c++ + #include + ``` +​ +3. Create a producer to send a `User` instance. +​ + ```c++ + ProducerConfiguration producerConf; + producerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor())); + Producer producer; + client.createProducer("topic-protobuf", producerConf, producer); + User user; + user.set_name("my-name"); + user.set_age(10); + std::string content; + user.SerializeToString(&content); + producer.send(MessageBuilder().setContent(content).build()); + ``` +​ +4. Create a consumer to receive a `User` instance. +​ + ```c++ + ConsumerConfiguration consumerConf; + consumerConf.setSchema(createProtobufNativeSchema(User::GetDescriptor())); + consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest); + Consumer consumer; + client.subscribe("topic-protobuf", "my-sub", consumerConf, consumer); + Message msg; + consumer.receive(msg); + User user2; + user2.ParseFromArray(msg.getData(), msg.getLength()); + ``` -The following example shows how to create a producer with an Avro schema. - -```cpp -static const std::string exampleSchema = - "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," - "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; -Producer producer; -ProducerConfiguration producerConf; -producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); -client.createProducer("topic-avro", producerConf, producer); -``` - -### Create consumer with Avro schema - -The following example shows how to create a consumer with an Avro schema. - -```cpp -static const std::string exampleSchema = - "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," - "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}"; -ConsumerConfiguration consumerConf; -Consumer consumer; -consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); -client.subscribe("topic-avro", "sub-2", consumerConf, consumer) -```