diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index 71717c271dd1e..11bfc43f734fd 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -217,6 +217,8 @@ class PULSAR_PUBLIC ClientConfiguration {
/**
* Initialize stats interval in seconds. Stats are printed and reset after every `statsIntervalInSeconds`.
*
+ * Default: 600
+ *
* Set to 0 means disabling stats collection.
*/
ClientConfiguration& setStatsIntervalInSeconds(const unsigned int&);
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index d0bc5ee78121e..bf7fdcd190646 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -187,6 +187,9 @@ class PULSAR_PUBLIC ConsumerConfiguration {
* 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
* If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
* redelivered.
+ *
+ * Default: 0, which means the the tracker for unacknowledged messages is disabled.
+ *
* @param timeout in milliseconds
*/
void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
@@ -269,6 +272,9 @@ class PULSAR_PUBLIC ConsumerConfiguration {
/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
+ *
+ * Default: 30000, which means 30 seconds.
+ *
* @param cacheTimeInMs in milliseconds
*/
void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs);
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 085d342d76316..3306b271d8c21 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -140,6 +140,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* The first message uses (initialSequenceId + 1) as its sequence ID and subsequent messages are assigned
* incremental sequence IDs.
*
+ * Default: -1, which means the first message's sequence ID is 0.
+ *
* @param initialSequenceId the initial sequence ID for the producer.
* @return
*/
@@ -178,6 +180,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* would fail unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the
* blocking behavior.
*
+ * Default: 1000
+ *
* @param maxPendingMessages max number of pending messages.
* @return
*/
@@ -194,6 +198,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* This setting will be used to lower the max pending messages for each partition
* ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
*
+ * Default: 50000
+ *
* @param maxPendingMessagesAcrossPartitions
*/
ProducerConfiguration& setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
@@ -206,6 +212,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
/**
* Set the message routing modes for partitioned topics.
*
+ * Default: UseSinglePartition
+ *
* @param PartitionsRoutingMode partition routing mode.
* @return
*/
@@ -233,6 +241,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* Set the hashing scheme, which is a standard hashing function available when choosing the partition
* used for a particular message.
*
+ * Default: HashingScheme::BoostHash
+ *
*
Standard hashing functions available are:
*
* - {@link HashingScheme::JavaStringHash}: Java {@code String.hashCode()} (Default).
@@ -266,8 +276,9 @@ class PULSAR_PUBLIC ProducerConfiguration {
// Zero queue size feature will not be supported on consumer end if batching is enabled
/**
- * Control whether automatic batching of messages is enabled or not for the producer. Default value:
- * false (no automatic batching).
+ * Control whether automatic batching of messages is enabled or not for the producer.
+ *
+ * Default: true
*
* When automatic batching is enabled, multiple calls to Producer::sendAsync can result in a single batch
* to be sent to the broker, leading to better throughput, especially when publishing small messages. If
@@ -343,6 +354,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
const unsigned long& getBatchingMaxPublishDelayMs() const;
/**
+ * Default: DefaultBatching
+ *
* @see BatchingType
*/
ProducerConfiguration& setBatchingType(BatchingType batchingType);
diff --git a/pulsar-client-cpp/include/pulsar/Schema.h b/pulsar-client-cpp/include/pulsar/Schema.h
index b14851e0bbd42..d8b47f5701652 100644
--- a/pulsar-client-cpp/include/pulsar/Schema.h
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -117,6 +117,16 @@ typedef std::map StringMap;
*/
class PULSAR_PUBLIC SchemaInfo {
public:
+ /**
+ * The default constructor with following configs:
+ * - schemaType: SchemaType::BYTES
+ * - name: "BYTES"
+ * - schema: ""
+ * - properties: {}
+ *
+ * @see SchemaInfo(SchemaType schemaType, const std::string& name, const std::string& schema, const
+ * StringMap& properties)
+ */
SchemaInfo();
/**
diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
index 9d36672e4aec8..631e8ae59d5cf 100644
--- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
@@ -24,38 +24,22 @@
namespace pulsar {
struct ClientConfigurationImpl {
- AuthenticationPtr authenticationPtr;
- uint64_t memoryLimit;
- int ioThreads;
- int operationTimeoutSeconds;
- int messageListenerThreads;
- int concurrentLookupRequest;
+ AuthenticationPtr authenticationPtr{AuthFactory::Disabled()};
+ uint64_t memoryLimit{0ull};
+ int ioThreads{1};
+ int operationTimeoutSeconds{30};
+ int messageListenerThreads{1};
+ int concurrentLookupRequest{50000};
std::string logConfFilePath;
- bool useTls;
+ bool useTls{false};
std::string tlsTrustCertsFilePath;
- bool tlsAllowInsecureConnection;
- unsigned int statsIntervalInSeconds;
+ bool tlsAllowInsecureConnection{false};
+ unsigned int statsIntervalInSeconds{600}; // 10 minutes
std::unique_ptr loggerFactory;
- bool validateHostName;
- unsigned int partitionsUpdateInterval;
+ bool validateHostName{false};
+ unsigned int partitionsUpdateInterval{60}; // 1 minute
std::string listenerName;
- ClientConfigurationImpl()
- : authenticationPtr(AuthFactory::Disabled()),
- memoryLimit(0ull),
- ioThreads(1),
- operationTimeoutSeconds(30),
- messageListenerThreads(1),
- concurrentLookupRequest(50000),
- logConfFilePath(),
- useTls(false),
- tlsAllowInsecureConnection(false),
- statsIntervalInSeconds(600), // 10 minutes
- loggerFactory(),
- validateHostName(false),
- partitionsUpdateInterval(60) // 1 minute
- {}
-
std::unique_ptr takeLogger() { return std::move(loggerFactory); }
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 24d6e6591d9f5..502c201d065a1 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -26,48 +26,27 @@
namespace pulsar {
struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
- long unAckedMessagesTimeoutMs;
- long tickDurationInMs;
+ long unAckedMessagesTimeoutMs{0};
+ long tickDurationInMs{1000};
- long negativeAckRedeliveryDelayMs;
- long ackGroupingTimeMs;
- long ackGroupingMaxSize;
- ConsumerType consumerType;
+ long negativeAckRedeliveryDelayMs{60000};
+ long ackGroupingTimeMs{100};
+ long ackGroupingMaxSize{1000};
+ ConsumerType consumerType{ConsumerExclusive};
MessageListener messageListener;
- bool hasMessageListener;
- int receiverQueueSize;
- int maxTotalReceiverQueueSizeAcrossPartitions;
+ bool hasMessageListener{false};
+ int receiverQueueSize{1000};
+ int maxTotalReceiverQueueSizeAcrossPartitions{50000};
std::string consumerName;
- long brokerConsumerStatsCacheTimeInMs;
+ long brokerConsumerStatsCacheTimeInMs{30 * 1000L}; // 30 seconds
CryptoKeyReaderPtr cryptoKeyReader;
- ConsumerCryptoFailureAction cryptoFailureAction;
- bool readCompacted;
- InitialPosition subscriptionInitialPosition;
- int patternAutoDiscoveryPeriod;
- bool replicateSubscriptionStateEnabled;
+ ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
+ bool readCompacted{false};
+ InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
+ int patternAutoDiscoveryPeriod{60};
+ bool replicateSubscriptionStateEnabled{false};
std::map properties;
KeySharedPolicy keySharedPolicy;
-
- ConsumerConfigurationImpl()
- : schemaInfo(),
- unAckedMessagesTimeoutMs(0),
- tickDurationInMs(1000),
- negativeAckRedeliveryDelayMs(60000),
- ackGroupingTimeMs(100),
- ackGroupingMaxSize(1000),
- consumerType(ConsumerExclusive),
- messageListener(),
- hasMessageListener(false),
- brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds
- receiverQueueSize(1000),
- maxTotalReceiverQueueSizeAcrossPartitions(50000),
- cryptoKeyReader(),
- cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
- readCompacted(false),
- subscriptionInitialPosition(InitialPosition::InitialPositionLatest),
- patternAutoDiscoveryPeriod(60),
- properties(),
- keySharedPolicy() {}
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 7c3079344044d..fa6b755822c63 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -30,40 +30,23 @@ struct ProducerConfigurationImpl {
SchemaInfo schemaInfo;
Optional producerName;
Optional initialSequenceId;
- int sendTimeoutMs;
- CompressionType compressionType;
- int maxPendingMessages;
- int maxPendingMessagesAcrossPartitions;
- ProducerConfiguration::PartitionsRoutingMode routingMode;
+ int sendTimeoutMs{30000};
+ CompressionType compressionType{CompressionNone};
+ int maxPendingMessages{1000};
+ int maxPendingMessagesAcrossPartitions{50000};
+ ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
MessageRoutingPolicyPtr messageRouter;
- ProducerConfiguration::HashingScheme hashingScheme;
- bool blockIfQueueFull;
- bool batchingEnabled;
- unsigned int batchingMaxMessages;
- unsigned long batchingMaxAllowedSizeInBytes;
- unsigned long batchingMaxPublishDelayMs;
- ProducerConfiguration::BatchingType batchingType;
+ ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
+ bool blockIfQueueFull{false};
+ bool batchingEnabled{true};
+ unsigned int batchingMaxMessages{1000};
+ unsigned long batchingMaxAllowedSizeInBytes{128 * 1024}; // 128 KB
+ unsigned long batchingMaxPublishDelayMs{10}; // 10 milli seconds
+ ProducerConfiguration::BatchingType batchingType{ProducerConfiguration::DefaultBatching};
CryptoKeyReaderPtr cryptoKeyReader;
std::set encryptionKeys;
- ProducerCryptoFailureAction cryptoFailureAction;
+ ProducerCryptoFailureAction cryptoFailureAction{ProducerCryptoFailureAction::FAIL};
std::map properties;
- ProducerConfigurationImpl()
- : schemaInfo(),
- sendTimeoutMs(30000),
- compressionType(CompressionNone),
- maxPendingMessages(1000),
- maxPendingMessagesAcrossPartitions(50000),
- routingMode(ProducerConfiguration::UseSinglePartition),
- hashingScheme(ProducerConfiguration::BoostHash),
- blockIfQueueFull(false),
- batchingEnabled(true),
- batchingMaxMessages(1000),
- batchingMaxAllowedSizeInBytes(128 * 1024), // 128 KB
- batchingMaxPublishDelayMs(10), // 10 milli seconds
- batchingType(ProducerConfiguration::DefaultBatching),
- cryptoKeyReader(),
- encryptionKeys(),
- cryptoFailureAction(ProducerCryptoFailureAction::FAIL) {}
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 379bcdca75c23..3ab23f72b41bb 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -19,6 +19,7 @@
#include
#include
#include
+#include "NoOpsCryptoKeyReader.h"
DECLARE_LOG_OBJECT()
@@ -27,6 +28,104 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
+TEST(ConsumerConfigurationTest, testDefaultConfig) {
+ ConsumerConfiguration conf;
+ ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
+ ASSERT_EQ(conf.getConsumerType(), ConsumerExclusive);
+ ASSERT_EQ(conf.hasMessageListener(), false);
+ ASSERT_EQ(conf.getReceiverQueueSize(), 1000);
+ ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
+ ASSERT_EQ(conf.getConsumerName(), "");
+ ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0);
+ ASSERT_EQ(conf.getTickDurationInMs(), 1000);
+ ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000);
+ ASSERT_EQ(conf.getAckGroupingTimeMs(), 100);
+ ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000);
+ ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000);
+ ASSERT_EQ(conf.isReadCompacted(), false);
+ ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 60);
+ ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionLatest);
+ ASSERT_EQ(conf.getCryptoKeyReader(), CryptoKeyReaderPtr{});
+ ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL);
+ ASSERT_EQ(conf.isEncryptionEnabled(), false);
+ ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
+ ASSERT_EQ(conf.getProperties().empty(), true);
+}
+
+TEST(ConsumerConfigurationTest, testCustomConfig) {
+ ConsumerConfiguration conf;
+
+ const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+ "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
+
+ conf.setSchema(schema);
+ ASSERT_EQ(conf.getSchema().getName(), schema.getName());
+ ASSERT_EQ(conf.getSchema().getSchemaType(), schema.getSchemaType());
+ ASSERT_EQ(conf.getSchema().getSchema(), schema.getSchema());
+ ASSERT_EQ(conf.getSchema().getProperties(), schema.getProperties());
+
+ conf.setConsumerType(ConsumerKeyShared);
+ ASSERT_EQ(conf.getConsumerType(), ConsumerKeyShared);
+
+ conf.setMessageListener([](Consumer consumer, const Message& msg) {});
+ ASSERT_EQ(conf.hasMessageListener(), true);
+
+ conf.setReceiverQueueSize(2000);
+ ASSERT_EQ(conf.getReceiverQueueSize(), 2000);
+
+ conf.setMaxTotalReceiverQueueSizeAcrossPartitions(100000);
+ ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 100000);
+
+ conf.setConsumerName("consumer");
+ ASSERT_EQ(conf.getConsumerName(), "consumer");
+
+ conf.setUnAckedMessagesTimeoutMs(20000);
+ ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 20000);
+
+ conf.setTickDurationInMs(2000);
+ ASSERT_EQ(conf.getTickDurationInMs(), 2000);
+
+ conf.setNegativeAckRedeliveryDelayMs(10000);
+ ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000);
+
+ conf.setAckGroupingTimeMs(200);
+ ASSERT_EQ(conf.getAckGroupingTimeMs(), 200);
+
+ conf.setAckGroupingMaxSize(2000);
+ ASSERT_EQ(conf.getAckGroupingMaxSize(), 2000);
+
+ conf.setBrokerConsumerStatsCacheTimeInMs(60000);
+ ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 60000);
+
+ conf.setReadCompacted(true);
+ ASSERT_EQ(conf.isReadCompacted(), true);
+
+ conf.setPatternAutoDiscoveryPeriod(120);
+ ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 120);
+
+ conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+ ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionEarliest);
+
+ const auto cryptoKeyReader = std::make_shared();
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ ASSERT_EQ(conf.getCryptoKeyReader(), cryptoKeyReader);
+ // NOTE: once CryptoKeyReader was set, the isEncryptionEnabled() would return true, it's different from
+ // ProducerConfiguration
+ ASSERT_EQ(conf.isEncryptionEnabled(), true);
+
+ conf.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+ ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::CONSUME);
+
+ conf.setReplicateSubscriptionStateEnabled(true);
+ ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), true);
+
+ conf.setProperty("k1", "v1");
+ ASSERT_EQ(conf.getProperties()["k1"], "v1");
+ ASSERT_EQ(conf.hasProperty("k1"), true);
+}
+
TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-topic";
diff --git a/pulsar-client-cpp/tests/NoOpsCryptoKeyReader.h b/pulsar-client-cpp/tests/NoOpsCryptoKeyReader.h
new file mode 100644
index 0000000000000..e152690e58bfa
--- /dev/null
+++ b/pulsar-client-cpp/tests/NoOpsCryptoKeyReader.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include
+
+namespace pulsar {
+
+class NoOpsCryptoKeyReader : public CryptoKeyReader {
+ public:
+ Result getPublicKey(const std::string& keyName, std::map& metadata,
+ EncryptionKeyInfo& encKeyInfo) const override {
+ return ResultOk;
+ }
+
+ Result getPrivateKey(const std::string& keyName, std::map& metadata,
+ EncryptionKeyInfo& encKeyInfo) const override {
+ return ResultOk;
+ }
+};
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/ProducerConfigurationTest.cc b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
new file mode 100644
index 0000000000000..b88f6e41890d8
--- /dev/null
+++ b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include
+#include
+#include "NoOpsCryptoKeyReader.h"
+
+using namespace pulsar;
+
+TEST(ProducerConfigurationTest, testDefaultConfig) {
+ ProducerConfiguration conf;
+ ASSERT_EQ(conf.getProducerName(), "");
+ ASSERT_EQ(conf.getSchema().getName(), "BYTES");
+ ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
+ ASSERT_EQ(conf.getSendTimeout(), 30000);
+ ASSERT_EQ(conf.getInitialSequenceId(), -1ll);
+ ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone);
+ ASSERT_EQ(conf.getMaxPendingMessages(), 1000);
+ ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000);
+ ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition);
+ ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{});
+ ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash);
+ ASSERT_EQ(conf.getBlockIfQueueFull(), false);
+ ASSERT_EQ(conf.getBatchingEnabled(), true);
+ ASSERT_EQ(conf.getBatchingMaxMessages(), 1000);
+ ASSERT_EQ(conf.getBatchingMaxAllowedSizeInBytes(), 128 * 1024);
+ ASSERT_EQ(conf.getBatchingMaxPublishDelayMs(), 10);
+ ASSERT_EQ(conf.getBatchingType(), ProducerConfiguration::DefaultBatching);
+ ASSERT_EQ(conf.getCryptoKeyReader(), CryptoKeyReaderPtr{});
+ ASSERT_EQ(conf.getCryptoFailureAction(), ProducerCryptoFailureAction::FAIL);
+ ASSERT_EQ(conf.isEncryptionEnabled(), false);
+ ASSERT_EQ(conf.getEncryptionKeys(), std::set{});
+ ASSERT_EQ(conf.getProperties().empty(), true);
+}
+
+class MockMessageRoutingPolicy : public MessageRoutingPolicy {
+ public:
+ int getPartition(const Message& msg) override { return 0; }
+ int getPartition(const Message& msg, const TopicMetadata& topicMetadata) override { return 0; }
+};
+
+TEST(ProducerConfigurationTest, testCustomConfig) {
+ ProducerConfiguration conf;
+
+ conf.setProducerName("producer");
+ ASSERT_EQ(conf.getProducerName(), "producer");
+
+ const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+ "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
+
+ conf.setSchema(schema);
+ ASSERT_EQ(conf.getSchema().getName(), schema.getName());
+ ASSERT_EQ(conf.getSchema().getSchemaType(), schema.getSchemaType());
+ ASSERT_EQ(conf.getSchema().getSchema(), schema.getSchema());
+ ASSERT_EQ(conf.getSchema().getProperties(), schema.getProperties());
+
+ conf.setSendTimeout(0);
+ ASSERT_EQ(conf.getSendTimeout(), 0);
+
+ conf.setInitialSequenceId(100ll);
+ ASSERT_EQ(conf.getInitialSequenceId(), 100ll);
+
+ conf.setCompressionType(CompressionType::CompressionLZ4);
+ ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionLZ4);
+
+ conf.setMaxPendingMessages(2000);
+ ASSERT_EQ(conf.getMaxPendingMessages(), 2000);
+
+ conf.setMaxPendingMessagesAcrossPartitions(100000);
+ ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000);
+
+ conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+ ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution);
+
+ const auto router = std::make_shared();
+ conf.setMessageRouter(router);
+ ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::CustomPartition);
+ ASSERT_EQ(conf.getMessageRouterPtr(), router);
+
+ conf.setHashingScheme(ProducerConfiguration::JavaStringHash);
+ ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::JavaStringHash);
+
+ conf.setBlockIfQueueFull(true);
+ ASSERT_EQ(conf.getBlockIfQueueFull(), true);
+
+ conf.setBatchingEnabled(false);
+ ASSERT_EQ(conf.getBatchingEnabled(), false);
+
+ conf.setBatchingMaxMessages(2000);
+ ASSERT_EQ(conf.getBatchingMaxMessages(), 2000);
+
+ conf.setBatchingMaxAllowedSizeInBytes(1024);
+ ASSERT_EQ(conf.getBatchingMaxAllowedSizeInBytes(), 1024);
+
+ conf.setBatchingMaxPublishDelayMs(1);
+ ASSERT_EQ(conf.getBatchingMaxPublishDelayMs(), 1);
+
+ conf.setBatchingType(ProducerConfiguration::KeyBasedBatching);
+ ASSERT_EQ(conf.getBatchingType(), ProducerConfiguration::KeyBasedBatching);
+
+ const auto cryptoKeyReader = std::make_shared();
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ ASSERT_EQ(conf.getCryptoKeyReader(), cryptoKeyReader);
+
+ conf.setCryptoFailureAction(pulsar::ProducerCryptoFailureAction::SEND);
+ ASSERT_EQ(conf.getCryptoFailureAction(), ProducerCryptoFailureAction::SEND);
+
+ conf.addEncryptionKey("key");
+ ASSERT_EQ(conf.getEncryptionKeys(), std::set{"key"});
+ ASSERT_EQ(conf.isEncryptionEnabled(), true);
+
+ conf.setProperty("k1", "v1");
+ ASSERT_EQ(conf.getProperties()["k1"], "v1");
+ ASSERT_EQ(conf.hasProperty("k1"), true);
+}
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
index 6af4b4a2c1f0a..ccbfa2d930028 100644
--- a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -24,6 +24,7 @@
#include
#include
#include
+#include "NoOpsCryptoKeyReader.h"
DECLARE_LOG_OBJECT()
@@ -31,19 +32,6 @@ using namespace pulsar;
static const std::string lookupUrl = "pulsar://localhost:6650";
-class NoOpsCryptoKeyReader : public CryptoKeyReader {
- public:
- Result getPublicKey(const std::string& keyName, std::map& metadata,
- EncryptionKeyInfo& encKeyInfo) const override {
- return ResultOk;
- }
-
- Result getPrivateKey(const std::string& keyName, std::map& metadata,
- EncryptionKeyInfo& encKeyInfo) const override {
- return ResultOk;
- }
-};
-
TEST(ReaderConfigurationTest, testDefaultConfig) {
const std::string topic = "ReaderConfigurationTest-default-config";
Client client(lookupUrl);