diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore index 3c0a6723f63e1..0eee9ae1f1eff 100644 --- a/pulsar-client-cpp/.gitignore +++ b/pulsar-client-cpp/.gitignore @@ -39,8 +39,8 @@ lib*.so* /examples/SampleAsyncProducer /examples/SampleConsumerListener /tests/main -/perf/PerfProducer -/perf/PerfConsumer +/perf/perfProducer +/perf/perfConsumer /system-test/SystemTest # IDE generated files diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt index 85be551d031dd..da17ef82d8f8d 100644 --- a/pulsar-client-cpp/CMakeLists.txt +++ b/pulsar-client-cpp/CMakeLists.txt @@ -20,14 +20,17 @@ project (pulsar-cpp) set(Boost_NO_BOOST_CMAKE ON) set (CMAKE_CXX_FLAGS "-Wno-deprecated-declarations ${CMAKE_CXX_FLAGS}") +set(PROTOBUF_LIBRARIES $ENV{PROTOBUF_LIBRARIES}) find_package(Boost REQUIRED COMPONENTS program_options filesystem regex thread system) find_package(OpenSSL REQUIRED) find_package(ZLIB REQUIRED) -find_package(ProtoBuf QUIET) -if(NOT ProtoBuf_FOUND) - find_library(PROTOBUF_LIBRARIES protobuf) -endif() +if (NOT PROTOBUF_LIBRARIES) + find_package(ProtoBuf QUIET) + if (NOT ProtoBuf_FOUND) + find_library(PROTOBUF_LIBRARIES protobuf) + endif (NOT ProtoBuf_FOUND) +endif (NOT PROTOBUF_LIBRARIES) find_library(LOG4CXX_LIBRARY_PATH log4cxx) find_library(CURL_LIBRARY_PATH curl) find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h) diff --git a/pulsar-client-cpp/include/pulsar/BatchMessageId.h b/pulsar-client-cpp/include/pulsar/BatchMessageId.h index d21b94b50fddc..a0ee550f6efd5 100644 --- a/pulsar-client-cpp/include/pulsar/BatchMessageId.h +++ b/pulsar-client-cpp/include/pulsar/BatchMessageId.h @@ -18,8 +18,9 @@ #define LIB_BATCHMESSAGEID_H_ #include - +#pragma GCC visibility push(default) namespace pulsar { +class PulsarWrapper; class BatchMessageId : public MessageId { public: BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1) @@ -40,6 +41,7 @@ class BatchMessageId : public MessageId { friend class PartitionedProducerImpl; friend class PartitionedConsumerImpl; friend class BatchAcknowledgementTracker; + friend class PulsarWrapper; friend class PulsarFriend; int64_t batchIndex_; }; @@ -53,4 +55,6 @@ bool BatchMessageId::operator<=(const BatchMessageId& mID) const { } } +#pragma GCC visibility pop + #endif /* LIB_BATCHMESSAGEID_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h index d49dfc2ecbde7..ebf8c22963b07 100644 --- a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h +++ b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h @@ -24,8 +24,10 @@ #include #include +#pragma GCC visibility push(default) namespace pulsar { class BrokerConsumerStatsImplBase; +class PulsarWrapper; /* @note: isValid() or getXXX() methods are not allowed on an invalid BrokerConsumerStats */ class BrokerConsumerStats { @@ -78,9 +80,13 @@ class BrokerConsumerStats { /** @deprecated */ boost::shared_ptr getImpl() const; + friend class PulsarWrapper; friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj); }; typedef boost::function BrokerConsumerStatsCallback; } + +#pragma GCC visibility pop + #endif //PULSAR_CPP_BROKERCONSUMERSTATS_H diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 6989e9e48ae99..ae947c2b1759c 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -27,15 +27,14 @@ #pragma GCC visibility push(default) -class PulsarFriend; - namespace pulsar { - typedef boost::function CreateProducerCallback; typedef boost::function SubscribeCallback; typedef boost::function CloseCallback; class ClientImpl; +class PulsarFriend; +class PulsarWrapper; class Client { public: @@ -106,8 +105,10 @@ class Client { private: Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections); + Client(const boost::shared_ptr); friend class PulsarFriend; + friend class PulsarWrapper; boost::shared_ptr impl_; }; diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h index 43f79bd134e6d..a70b4d44d2076 100644 --- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h @@ -18,8 +18,9 @@ #define PULSAR_CLIENTCONFIGURATION_H_ #include - +#pragma GCC visibility push(default) namespace pulsar { +class PulsarWrapper; class ClientConfigurationImpl; class ClientConfiguration { public: @@ -118,13 +119,15 @@ class ClientConfiguration { ClientConfiguration& setTlsAllowInsecureConnection(bool allowInsecure); bool isTlsAllowInsecureConnection() const; + friend class ClientImpl; + friend class PulsarWrapper; private: - const AuthenticationPtr& getAuthenticationPtr() const; + const AuthenticationPtr& getAuthPtr() const; boost::shared_ptr impl_; - friend class ClientImpl; }; } +#pragma GCC visibility pop #endif /* PULSAR_CLIENTCONFIGURATION_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/CompressionType.h b/pulsar-client-cpp/include/pulsar/CompressionType.h index 6250d91ab49b7..824a13b695291 100644 --- a/pulsar-client-cpp/include/pulsar/CompressionType.h +++ b/pulsar-client-cpp/include/pulsar/CompressionType.h @@ -16,6 +16,7 @@ #ifndef PULSAR_COMPRESSIONTYPE_H_ #define PULSAR_COMPRESSIONTYPE_H_ +#pragma GCC visibility push(default) namespace pulsar { enum CompressionType { CompressionNone = 0, @@ -24,4 +25,5 @@ enum CompressionType { }; } +#pragma GCC visibility pop #endif /* PULSAR_COMPRESSIONTYPE_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index 44771966ece2c..fae35ddb375ce 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -23,12 +23,11 @@ #include #pragma GCC visibility push(default) -class PulsarFriend; namespace pulsar { - +class PulsarWrapper; class ConsumerImplBase; - +class PulsarFriend; /** * */ @@ -204,10 +203,11 @@ class Consumer { void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); private: typedef boost::shared_ptr ConsumerImplBasePtr; - friend class PulsarFriend; ConsumerImplBasePtr impl_; explicit Consumer(ConsumerImplBasePtr); + friend class PulsarFriend; + friend class PulsarWrapper; friend class PartitionedConsumerImpl; friend class ConsumerImpl; friend class ClientImpl; diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index ab403df9de8cb..8ad12adca8b9f 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -23,9 +23,11 @@ #include #include +#pragma GCC visibility push(default) namespace pulsar { class Consumer; +class PulsarWrapper; /// Callback definition for non-data operation typedef boost::function ResultCallback; @@ -117,12 +119,13 @@ class ConsumerConfiguration { * @return the configured timeout in milliseconds caching BrokerConsumerStats. */ long getBrokerConsumerStatsCacheTimeInMs() const; + friend class PulsarWrapper; private: boost::shared_ptr impl_; }; } - +#pragma GCC visibility pop #endif /* PULSAR_CONSUMERCONFIGURATION_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/ConsumerType.h b/pulsar-client-cpp/include/pulsar/ConsumerType.h index 9f0fdc97364f3..1ab4b28d800f8 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerType.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerType.h @@ -17,6 +17,7 @@ #ifndef PULSAR_CPP_CONSUMERTYPE_H #define PULSAR_CPP_CONSUMERTYPE_H +#pragma GCC visibility push(default) namespace pulsar { enum ConsumerType { /** @@ -36,5 +37,5 @@ namespace pulsar { ConsumerFailover }; } - +#pragma GCC visibility pop #endif //PULSAR_CPP_CONSUMERTYPE_H diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h index 0ed8bc3fce65a..5e92c7b7394d3 100644 --- a/pulsar-client-cpp/include/pulsar/Message.h +++ b/pulsar-client-cpp/include/pulsar/Message.h @@ -35,6 +35,7 @@ namespace pulsar { class SharedBuffer; class MessageBuilder; class MessageImpl; +class PulsarWrapper; class Message { public: @@ -127,6 +128,7 @@ class Message { friend class Commands; friend class BatchMessageContainer; friend class BatchAcknowledgementTracker; + friend class PulsarWrapper; friend std::ostream& operator<<(std::ostream& s, const StringMap& map); friend std::ostream& operator<<(std::ostream& s, const Message& msg); diff --git a/pulsar-client-cpp/include/pulsar/MessageBuilder.h b/pulsar-client-cpp/include/pulsar/MessageBuilder.h index e6872c3fd0a43..b465908d82b47 100644 --- a/pulsar-client-cpp/include/pulsar/MessageBuilder.h +++ b/pulsar-client-cpp/include/pulsar/MessageBuilder.h @@ -23,6 +23,7 @@ #pragma GCC visibility push(default) namespace pulsar { +class PulsarWrapper; class MessageBuilder { public: @@ -94,8 +95,10 @@ class MessageBuilder { private: MessageBuilder(const MessageBuilder&); void checkMetadata(); - + static boost::shared_ptr createMessageImpl(); Message::MessageImplPtr impl_; + + friend class PulsarWrapper; }; } diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h index d3ffd0dbe9b17..db8239edc1c72 100644 --- a/pulsar-client-cpp/include/pulsar/MessageId.h +++ b/pulsar-client-cpp/include/pulsar/MessageId.h @@ -26,6 +26,7 @@ namespace pulsar { class ConsumerImpl; class UnAckedMessageTrackerEnabled; +class PulsarWrapper; class MessageId { public: @@ -42,6 +43,7 @@ class MessageId { friend class PartitionedConsumerImpl; friend class UnAckedMessageTrackerEnabled; friend class BatchAcknowledgementTracker; + friend class PulsarWrapper; MessageId(int64_t, int64_t); friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId); int64_t ledgerId_; diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h index a3f74325f4647..5795447cc9846 100644 --- a/pulsar-client-cpp/include/pulsar/Producer.h +++ b/pulsar-client-cpp/include/pulsar/Producer.h @@ -23,11 +23,10 @@ #pragma GCC visibility push(default) -class PulsarFriend; - namespace pulsar { class ProducerImplBase; - +class PulsarWrapper; +class PulsarFriend; class Producer { public: /** @@ -100,6 +99,7 @@ class Producer { friend class ClientImpl; friend class PulsarFriend; + friend class PulsarWrapper; ProducerImplBasePtr impl_; }; diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 3bb20de8b2124..c10dc9b6a4013 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -21,6 +21,7 @@ #include #include #include +#pragma GCC visibility push(default) namespace pulsar { @@ -28,6 +29,7 @@ typedef boost::function SendCallback; typedef boost::function CloseCallback; class ProducerConfigurationImpl; +class PulsarWrapper; /** * Class that holds the configuration for a producer @@ -76,10 +78,13 @@ class ProducerConfiguration { ProducerConfiguration& setBatchingMaxPublishDelayMs( const unsigned long& batchingMaxPublishDelayMs); const unsigned long& getBatchingMaxPublishDelayMs() const; + friend class PulsarWrapper; + private: struct Impl; boost::shared_ptr impl_; }; } +#pragma GCC visibility pop #endif /* PULSAR_PRODUCERCONFIGURATION_H_ */ diff --git a/pulsar-client-cpp/lib/BrokerConsumerStats.cc b/pulsar-client-cpp/lib/BrokerConsumerStats.cc index aba2a84b3fab0..9fb155bd66f21 100644 --- a/pulsar-client-cpp/lib/BrokerConsumerStats.cc +++ b/pulsar-client-cpp/lib/BrokerConsumerStats.cc @@ -33,6 +33,7 @@ bool BrokerConsumerStats::isValid() const { return impl_->isValid(); } +#pragma GCC visibility push(default) std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) { os << "\nBrokerConsumerStats [" << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " << obj.getMsgRateOut() << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() @@ -45,6 +46,7 @@ std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) { << ", msgBacklog_ = " << obj.getMsgBacklog() << "]"; return os; } +#pragma GCC visibility pop double BrokerConsumerStats::getMsgRateOut() const { return impl_->getMsgRateOut(); diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h index dbb35ca2aad74..21aabed1c02d1 100644 --- a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h @@ -24,7 +24,7 @@ #include #include #include - +#pragma GCC visibility push(default) namespace pulsar { class BrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase { private: @@ -123,4 +123,5 @@ class BrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase { static ConsumerType convertStringToConsumerType(const std::string& str); }; } +#pragma GCC visibility pop #endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc index 19c89666e14a5..c8284aca9719d 100644 --- a/pulsar-client-cpp/lib/Client.cc +++ b/pulsar-client-cpp/lib/Client.cc @@ -30,6 +30,9 @@ DECLARE_LOG_OBJECT() namespace pulsar { +Client::Client(const boost::shared_ptr impl) : impl_(impl) { +} + Client::Client(const std::string& serviceUrl) : impl_(boost::make_shared(serviceUrl, ClientConfiguration(), true)) { } diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc index 05823f76a7780..c47294f14a94f 100644 --- a/pulsar-client-cpp/lib/ClientConfiguration.cc +++ b/pulsar-client-cpp/lib/ClientConfiguration.cc @@ -43,7 +43,7 @@ const Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; } -const AuthenticationPtr& ClientConfiguration::getAuthenticationPtr() const { +const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; } diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 85086509c54d9..186c168139af1 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -63,7 +63,7 @@ namespace pulsar { ioExecutorProvider_(boost::make_shared(clientConfiguration.getIOThreads())), listenerExecutorProvider_(boost::make_shared(clientConfiguration.getMessageListenerThreads())), partitionListenerExecutorProvider_(boost::make_shared(clientConfiguration.getMessageListenerThreads())), - pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthenticationPtr(), poolConnections), + pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthPtr(), poolConnections), producerIdGenerator_(0), consumerIdGenerator_(0), requestIdGenerator_(0) { @@ -73,7 +73,7 @@ namespace pulsar { lookupServicePtr_ = boost::make_shared(boost::cref(serviceUrl_), boost::cref(clientConfiguration_), boost::cref( - clientConfiguration.getAuthenticationPtr())); + clientConfiguration.getAuthPtr())); } else { LOG_DEBUG("Using Binary Lookup"); lookupServicePtr_ = boost::make_shared(boost::ref(pool_), boost::ref(serviceUrl)); diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h index b24c8b10e4c28..f469b54c890f3 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.h +++ b/pulsar-client-cpp/lib/ConnectionPool.h @@ -24,7 +24,7 @@ #include #include #include - +#pragma GCC visibility push(default) namespace pulsar { class ExecutorService; @@ -49,5 +49,5 @@ class ConnectionPool { }; } - +#pragma GCC visibility pop #endif //_PULSAR_CONNECTION_POOL_HEADER_ diff --git a/pulsar-client-cpp/lib/MessageBuilder.cc b/pulsar-client-cpp/lib/MessageBuilder.cc index 182bef49d1113..e79742285b021 100644 --- a/pulsar-client-cpp/lib/MessageBuilder.cc +++ b/pulsar-client-cpp/lib/MessageBuilder.cc @@ -33,12 +33,16 @@ namespace pulsar { ObjectPool messagePool; +boost::shared_ptr MessageBuilder::createMessageImpl() { + return messagePool.create(); +} + MessageBuilder::MessageBuilder() { - impl_ = messagePool.create(); + impl_ = createMessageImpl(); } MessageBuilder& MessageBuilder::create() { - impl_ = messagePool.create(); + impl_ = createMessageImpl(); return *this; } diff --git a/pulsar-client-cpp/lib/MessageImpl.cc b/pulsar-client-cpp/lib/MessageImpl.cc index a3773651bfdd1..86ac22db358da 100644 --- a/pulsar-client-cpp/lib/MessageImpl.cc +++ b/pulsar-client-cpp/lib/MessageImpl.cc @@ -51,4 +51,24 @@ namespace pulsar { return 0ull; } } + + void MessageImpl::setReplicationClusters(const std::vector& clusters) { + google::protobuf::RepeatedPtrField r(clusters.begin(), clusters.end()); + r.Swap(metadata.mutable_replicate_to()); + } + + void MessageImpl::disableReplication(bool flag) { + google::protobuf::RepeatedPtrField r; + if (flag) { + r.AddAllocated(new std::string("__local__")); + } + r.Swap(metadata.mutable_replicate_to()); + } + + void MessageImpl::setProperty(const std::string& name, const std::string& value) { + proto::KeyValue *keyValue = proto::KeyValue().New(); + keyValue->set_key(name); + keyValue->set_value(value); + metadata.mutable_properties()->AddAllocated(keyValue); + } } diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h index 8b10e986b0f8c..e4d77c71f41f5 100644 --- a/pulsar-client-cpp/lib/MessageImpl.h +++ b/pulsar-client-cpp/lib/MessageImpl.h @@ -28,6 +28,7 @@ using namespace pulsar; namespace pulsar { +class PulsarWrapper; class ClientConnection; class BatchMessageContainer; @@ -46,7 +47,13 @@ class MessageImpl { bool hasPartitionKey() const; uint64_t getPublishTimestamp() const; - private: + + friend class PulsarWrapper; + friend class MessageBuilder; +private: + void setReplicationClusters(const std::vector& clusters); + void setProperty(const std::string& name, const std::string& value); + void disableReplication(bool flag); Message::StringMap properties_; }; diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h index e734977aeafc6..802714e7c1ec2 100644 --- a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h +++ b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h @@ -26,7 +26,7 @@ #include #include #include - +#pragma GCC visibility push(default) namespace pulsar { class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase { private: @@ -87,4 +87,5 @@ class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase { typedef boost::shared_ptr PartitionedBrokerConsumerStatsPtr; } +#pragma GCC visibility pop #endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H