Skip to content

Commit

Permalink
[C++] Fix undefined behavior caused by uninitialized variables (apach…
Browse files Browse the repository at this point in the history
…e#10892)

* Use brace initializer to initialize configurations

* Add config tests
  • Loading branch information
BewareMyPower authored Jun 10, 2021
1 parent 3de089c commit e77fa36
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 108 deletions.
2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class PULSAR_PUBLIC ClientConfiguration {
/**
* Initialize stats interval in seconds. Stats are printed and reset after every `statsIntervalInSeconds`.
*
* Default: 600
*
* Set to 0 means disabling stats collection.
*/
ClientConfiguration& setStatsIntervalInSeconds(const unsigned int&);
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class PULSAR_PUBLIC ConsumerConfiguration {
* 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
* If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
* redelivered.
*
* Default: 0, which means the the tracker for unacknowledged messages is disabled.
*
* @param timeout in milliseconds
*/
void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
Expand Down Expand Up @@ -269,6 +272,9 @@ class PULSAR_PUBLIC ConsumerConfiguration {

/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
*
* Default: 30000, which means 30 seconds.
*
* @param cacheTimeInMs in milliseconds
*/
void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs);
Expand Down
17 changes: 15 additions & 2 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* The first message uses (initialSequenceId + 1) as its sequence ID and subsequent messages are assigned
* incremental sequence IDs.
*
* Default: -1, which means the first message's sequence ID is 0.
*
* @param initialSequenceId the initial sequence ID for the producer.
* @return
*/
Expand Down Expand Up @@ -178,6 +180,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* would fail unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the
* blocking behavior.
*
* Default: 1000
*
* @param maxPendingMessages max number of pending messages.
* @return
*/
Expand All @@ -194,6 +198,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* This setting will be used to lower the max pending messages for each partition
* ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
*
* Default: 50000
*
* @param maxPendingMessagesAcrossPartitions
*/
ProducerConfiguration& setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
Expand All @@ -206,6 +212,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
/**
* Set the message routing modes for partitioned topics.
*
* Default: UseSinglePartition
*
* @param PartitionsRoutingMode partition routing mode.
* @return
*/
Expand Down Expand Up @@ -233,6 +241,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* Set the hashing scheme, which is a standard hashing function available when choosing the partition
* used for a particular message.
*
* Default: HashingScheme::BoostHash
*
* <p>Standard hashing functions available are:
* <ul>
* <li>{@link HashingScheme::JavaStringHash}: Java {@code String.hashCode()} (Default).
Expand Down Expand Up @@ -266,8 +276,9 @@ class PULSAR_PUBLIC ProducerConfiguration {
// Zero queue size feature will not be supported on consumer end if batching is enabled

/**
* Control whether automatic batching of messages is enabled or not for the producer. <i>Default value:
* false (no automatic batching).</i>
* Control whether automatic batching of messages is enabled or not for the producer.
*
* Default: true
*
* When automatic batching is enabled, multiple calls to Producer::sendAsync can result in a single batch
* to be sent to the broker, leading to better throughput, especially when publishing small messages. If
Expand Down Expand Up @@ -343,6 +354,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
const unsigned long& getBatchingMaxPublishDelayMs() const;

/**
* Default: DefaultBatching
*
* @see BatchingType
*/
ProducerConfiguration& setBatchingType(BatchingType batchingType);
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/include/pulsar/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ typedef std::map<std::string, std::string> StringMap;
*/
class PULSAR_PUBLIC SchemaInfo {
public:
/**
* The default constructor with following configs:
* - schemaType: SchemaType::BYTES
* - name: "BYTES"
* - schema: ""
* - properties: {}
*
* @see SchemaInfo(SchemaType schemaType, const std::string& name, const std::string& schema, const
* StringMap& properties)
*/
SchemaInfo();

/**
Expand Down
38 changes: 11 additions & 27 deletions pulsar-client-cpp/lib/ClientConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,22 @@
namespace pulsar {

struct ClientConfigurationImpl {
AuthenticationPtr authenticationPtr;
uint64_t memoryLimit;
int ioThreads;
int operationTimeoutSeconds;
int messageListenerThreads;
int concurrentLookupRequest;
AuthenticationPtr authenticationPtr{AuthFactory::Disabled()};
uint64_t memoryLimit{0ull};
int ioThreads{1};
int operationTimeoutSeconds{30};
int messageListenerThreads{1};
int concurrentLookupRequest{50000};
std::string logConfFilePath;
bool useTls;
bool useTls{false};
std::string tlsTrustCertsFilePath;
bool tlsAllowInsecureConnection;
unsigned int statsIntervalInSeconds;
bool tlsAllowInsecureConnection{false};
unsigned int statsIntervalInSeconds{600}; // 10 minutes
std::unique_ptr<LoggerFactory> loggerFactory;
bool validateHostName;
unsigned int partitionsUpdateInterval;
bool validateHostName{false};
unsigned int partitionsUpdateInterval{60}; // 1 minute
std::string listenerName;

ClientConfigurationImpl()
: authenticationPtr(AuthFactory::Disabled()),
memoryLimit(0ull),
ioThreads(1),
operationTimeoutSeconds(30),
messageListenerThreads(1),
concurrentLookupRequest(50000),
logConfFilePath(),
useTls(false),
tlsAllowInsecureConnection(false),
statsIntervalInSeconds(600), // 10 minutes
loggerFactory(),
validateHostName(false),
partitionsUpdateInterval(60) // 1 minute
{}

std::unique_ptr<LoggerFactory> takeLogger() { return std::move(loggerFactory); }
};
} // namespace pulsar
Expand Down
51 changes: 15 additions & 36 deletions pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,27 @@
namespace pulsar {
struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;
long tickDurationInMs;
long unAckedMessagesTimeoutMs{0};
long tickDurationInMs{1000};

long negativeAckRedeliveryDelayMs;
long ackGroupingTimeMs;
long ackGroupingMaxSize;
ConsumerType consumerType;
long negativeAckRedeliveryDelayMs{60000};
long ackGroupingTimeMs{100};
long ackGroupingMaxSize{1000};
ConsumerType consumerType{ConsumerExclusive};
MessageListener messageListener;
bool hasMessageListener;
int receiverQueueSize;
int maxTotalReceiverQueueSizeAcrossPartitions;
bool hasMessageListener{false};
int receiverQueueSize{1000};
int maxTotalReceiverQueueSizeAcrossPartitions{50000};
std::string consumerName;
long brokerConsumerStatsCacheTimeInMs;
long brokerConsumerStatsCacheTimeInMs{30 * 1000L}; // 30 seconds
CryptoKeyReaderPtr cryptoKeyReader;
ConsumerCryptoFailureAction cryptoFailureAction;
bool readCompacted;
InitialPosition subscriptionInitialPosition;
int patternAutoDiscoveryPeriod;
bool replicateSubscriptionStateEnabled;
ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
bool readCompacted{false};
InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
int patternAutoDiscoveryPeriod{60};
bool replicateSubscriptionStateEnabled{false};
std::map<std::string, std::string> properties;
KeySharedPolicy keySharedPolicy;

ConsumerConfigurationImpl()
: schemaInfo(),
unAckedMessagesTimeoutMs(0),
tickDurationInMs(1000),
negativeAckRedeliveryDelayMs(60000),
ackGroupingTimeMs(100),
ackGroupingMaxSize(1000),
consumerType(ConsumerExclusive),
messageListener(),
hasMessageListener(false),
brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds
receiverQueueSize(1000),
maxTotalReceiverQueueSizeAcrossPartitions(50000),
cryptoKeyReader(),
cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
readCompacted(false),
subscriptionInitialPosition(InitialPosition::InitialPositionLatest),
patternAutoDiscoveryPeriod(60),
properties(),
keySharedPolicy() {}
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
43 changes: 13 additions & 30 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,23 @@ struct ProducerConfigurationImpl {
SchemaInfo schemaInfo;
Optional<std::string> producerName;
Optional<int64_t> initialSequenceId;
int sendTimeoutMs;
CompressionType compressionType;
int maxPendingMessages;
int maxPendingMessagesAcrossPartitions;
ProducerConfiguration::PartitionsRoutingMode routingMode;
int sendTimeoutMs{30000};
CompressionType compressionType{CompressionNone};
int maxPendingMessages{1000};
int maxPendingMessagesAcrossPartitions{50000};
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme hashingScheme;
bool blockIfQueueFull;
bool batchingEnabled;
unsigned int batchingMaxMessages;
unsigned long batchingMaxAllowedSizeInBytes;
unsigned long batchingMaxPublishDelayMs;
ProducerConfiguration::BatchingType batchingType;
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
bool blockIfQueueFull{false};
bool batchingEnabled{true};
unsigned int batchingMaxMessages{1000};
unsigned long batchingMaxAllowedSizeInBytes{128 * 1024}; // 128 KB
unsigned long batchingMaxPublishDelayMs{10}; // 10 milli seconds
ProducerConfiguration::BatchingType batchingType{ProducerConfiguration::DefaultBatching};
CryptoKeyReaderPtr cryptoKeyReader;
std::set<std::string> encryptionKeys;
ProducerCryptoFailureAction cryptoFailureAction;
ProducerCryptoFailureAction cryptoFailureAction{ProducerCryptoFailureAction::FAIL};
std::map<std::string, std::string> properties;
ProducerConfigurationImpl()
: schemaInfo(),
sendTimeoutMs(30000),
compressionType(CompressionNone),
maxPendingMessages(1000),
maxPendingMessagesAcrossPartitions(50000),
routingMode(ProducerConfiguration::UseSinglePartition),
hashingScheme(ProducerConfiguration::BoostHash),
blockIfQueueFull(false),
batchingEnabled(true),
batchingMaxMessages(1000),
batchingMaxAllowedSizeInBytes(128 * 1024), // 128 KB
batchingMaxPublishDelayMs(10), // 10 milli seconds
batchingType(ProducerConfiguration::DefaultBatching),
cryptoKeyReader(),
encryptionKeys(),
cryptoFailureAction(ProducerCryptoFailureAction::FAIL) {}
};
} // namespace pulsar

Expand Down
99 changes: 99 additions & 0 deletions pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <pulsar/Client.h>
#include <gtest/gtest.h>
#include <lib/LogUtils.h>
#include "NoOpsCryptoKeyReader.h"

DECLARE_LOG_OBJECT()

Expand All @@ -27,6 +28,104 @@ DECLARE_LOG_OBJECT()

using namespace pulsar;

TEST(ConsumerConfigurationTest, testDefaultConfig) {
ConsumerConfiguration conf;
ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
ASSERT_EQ(conf.getConsumerType(), ConsumerExclusive);
ASSERT_EQ(conf.hasMessageListener(), false);
ASSERT_EQ(conf.getReceiverQueueSize(), 1000);
ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
ASSERT_EQ(conf.getConsumerName(), "");
ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(conf.getTickDurationInMs(), 1000);
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000);
ASSERT_EQ(conf.getAckGroupingTimeMs(), 100);
ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000);
ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000);
ASSERT_EQ(conf.isReadCompacted(), false);
ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 60);
ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionLatest);
ASSERT_EQ(conf.getCryptoKeyReader(), CryptoKeyReaderPtr{});
ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL);
ASSERT_EQ(conf.isEncryptionEnabled(), false);
ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
ASSERT_EQ(conf.getProperties().empty(), true);
}

TEST(ConsumerConfigurationTest, testCustomConfig) {
ConsumerConfiguration conf;

const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});

conf.setSchema(schema);
ASSERT_EQ(conf.getSchema().getName(), schema.getName());
ASSERT_EQ(conf.getSchema().getSchemaType(), schema.getSchemaType());
ASSERT_EQ(conf.getSchema().getSchema(), schema.getSchema());
ASSERT_EQ(conf.getSchema().getProperties(), schema.getProperties());

conf.setConsumerType(ConsumerKeyShared);
ASSERT_EQ(conf.getConsumerType(), ConsumerKeyShared);

conf.setMessageListener([](Consumer consumer, const Message& msg) {});
ASSERT_EQ(conf.hasMessageListener(), true);

conf.setReceiverQueueSize(2000);
ASSERT_EQ(conf.getReceiverQueueSize(), 2000);

conf.setMaxTotalReceiverQueueSizeAcrossPartitions(100000);
ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 100000);

conf.setConsumerName("consumer");
ASSERT_EQ(conf.getConsumerName(), "consumer");

conf.setUnAckedMessagesTimeoutMs(20000);
ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 20000);

conf.setTickDurationInMs(2000);
ASSERT_EQ(conf.getTickDurationInMs(), 2000);

conf.setNegativeAckRedeliveryDelayMs(10000);
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000);

conf.setAckGroupingTimeMs(200);
ASSERT_EQ(conf.getAckGroupingTimeMs(), 200);

conf.setAckGroupingMaxSize(2000);
ASSERT_EQ(conf.getAckGroupingMaxSize(), 2000);

conf.setBrokerConsumerStatsCacheTimeInMs(60000);
ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 60000);

conf.setReadCompacted(true);
ASSERT_EQ(conf.isReadCompacted(), true);

conf.setPatternAutoDiscoveryPeriod(120);
ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 120);

conf.setSubscriptionInitialPosition(InitialPositionEarliest);
ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionEarliest);

const auto cryptoKeyReader = std::make_shared<NoOpsCryptoKeyReader>();
conf.setCryptoKeyReader(cryptoKeyReader);
ASSERT_EQ(conf.getCryptoKeyReader(), cryptoKeyReader);
// NOTE: once CryptoKeyReader was set, the isEncryptionEnabled() would return true, it's different from
// ProducerConfiguration
ASSERT_EQ(conf.isEncryptionEnabled(), true);

conf.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::CONSUME);

conf.setReplicateSubscriptionStateEnabled(true);
ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), true);

conf.setProperty("k1", "v1");
ASSERT_EQ(conf.getProperties()["k1"], "v1");
ASSERT_EQ(conf.hasProperty("k1"), true);
}

TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-topic";
Expand Down
Loading

0 comments on commit e77fa36

Please sign in to comment.