Skip to content

Commit

Permalink
Modifications to create a wrapper for CPP client (apache#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
jai1 authored Apr 25, 2017
1 parent 9765de1 commit 7f63ee9
Show file tree
Hide file tree
Showing 25 changed files with 106 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pulsar-client-cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ lib*.so*
/examples/SampleAsyncProducer
/examples/SampleConsumerListener
/tests/main
/perf/PerfProducer
/perf/PerfConsumer
/perf/perfProducer
/perf/perfConsumer
/system-test/SystemTest

# IDE generated files
Expand Down
11 changes: 7 additions & 4 deletions pulsar-client-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ project (pulsar-cpp)
set(Boost_NO_BOOST_CMAKE ON)

set (CMAKE_CXX_FLAGS "-Wno-deprecated-declarations ${CMAKE_CXX_FLAGS}")
set(PROTOBUF_LIBRARIES $ENV{PROTOBUF_LIBRARIES})

find_package(Boost REQUIRED COMPONENTS program_options filesystem regex thread system)
find_package(OpenSSL REQUIRED)
find_package(ZLIB REQUIRED)
find_package(ProtoBuf QUIET)
if(NOT ProtoBuf_FOUND)
find_library(PROTOBUF_LIBRARIES protobuf)
endif()
if (NOT PROTOBUF_LIBRARIES)
find_package(ProtoBuf QUIET)
if (NOT ProtoBuf_FOUND)
find_library(PROTOBUF_LIBRARIES protobuf)
endif (NOT ProtoBuf_FOUND)
endif (NOT PROTOBUF_LIBRARIES)
find_library(LOG4CXX_LIBRARY_PATH log4cxx)
find_library(CURL_LIBRARY_PATH curl)
find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h)
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/include/pulsar/BatchMessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
#define LIB_BATCHMESSAGEID_H_

#include <pulsar/MessageId.h>

#pragma GCC visibility push(default)
namespace pulsar {
class PulsarWrapper;
class BatchMessageId : public MessageId {
public:
BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1)
Expand All @@ -40,6 +41,7 @@ class BatchMessageId : public MessageId {
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
friend class PulsarFriend;
int64_t batchIndex_;
};
Expand All @@ -53,4 +55,6 @@ bool BatchMessageId::operator<=(const BatchMessageId& mID) const {
}

}
#pragma GCC visibility pop

#endif /* LIB_BATCHMESSAGEID_H_ */
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
#include <boost/function.hpp>
#include <pulsar/ConsumerType.h>

#pragma GCC visibility push(default)
namespace pulsar {
class BrokerConsumerStatsImplBase;
class PulsarWrapper;

/* @note: isValid() or getXXX() methods are not allowed on an invalid BrokerConsumerStats */
class BrokerConsumerStats {
Expand Down Expand Up @@ -78,9 +80,13 @@ class BrokerConsumerStats {
/** @deprecated */
boost::shared_ptr<BrokerConsumerStatsImplBase> getImpl() const;

friend class PulsarWrapper;
friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj);
};
typedef boost::function<void(Result result, BrokerConsumerStats brokerConsumerStats)> BrokerConsumerStatsCallback;

}

#pragma GCC visibility pop

#endif //PULSAR_CPP_BROKERCONSUMERSTATS_H
7 changes: 4 additions & 3 deletions pulsar-client-cpp/include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@

#pragma GCC visibility push(default)

class PulsarFriend;

namespace pulsar {

typedef boost::function<void(Result, Producer)> CreateProducerCallback;
typedef boost::function<void(Result, Consumer)> SubscribeCallback;
typedef boost::function<void(Result)> CloseCallback;

class ClientImpl;
class PulsarFriend;
class PulsarWrapper;

class Client {
public:
Expand Down Expand Up @@ -106,8 +105,10 @@ class Client {

private:
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections);
Client(const boost::shared_ptr<ClientImpl>);

friend class PulsarFriend;
friend class PulsarWrapper;
boost::shared_ptr<ClientImpl> impl_;
};

Expand Down
9 changes: 6 additions & 3 deletions pulsar-client-cpp/include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
#define PULSAR_CLIENTCONFIGURATION_H_

#include <pulsar/Authentication.h>

#pragma GCC visibility push(default)
namespace pulsar {
class PulsarWrapper;
class ClientConfigurationImpl;
class ClientConfiguration {
public:
Expand Down Expand Up @@ -118,13 +119,15 @@ class ClientConfiguration {

ClientConfiguration& setTlsAllowInsecureConnection(bool allowInsecure);
bool isTlsAllowInsecureConnection() const;
friend class ClientImpl;
friend class PulsarWrapper;

private:
const AuthenticationPtr& getAuthenticationPtr() const;
const AuthenticationPtr& getAuthPtr() const;
boost::shared_ptr<ClientConfigurationImpl> impl_;
friend class ClientImpl;
};
}

#pragma GCC visibility pop
#endif /* PULSAR_CLIENTCONFIGURATION_H_ */

2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/CompressionType.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#ifndef PULSAR_COMPRESSIONTYPE_H_
#define PULSAR_COMPRESSIONTYPE_H_
#pragma GCC visibility push(default)
namespace pulsar {
enum CompressionType {
CompressionNone = 0,
Expand All @@ -24,4 +25,5 @@ enum CompressionType {
};
}

#pragma GCC visibility pop
#endif /* PULSAR_COMPRESSIONTYPE_H_ */
8 changes: 4 additions & 4 deletions pulsar-client-cpp/include/pulsar/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
#include <pulsar/ConsumerConfiguration.h>
#pragma GCC visibility push(default)

class PulsarFriend;

namespace pulsar {

class PulsarWrapper;
class ConsumerImplBase;

class PulsarFriend;
/**
*
*/
Expand Down Expand Up @@ -204,10 +203,11 @@ class Consumer {
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
private:
typedef boost::shared_ptr<ConsumerImplBase> ConsumerImplBasePtr;
friend class PulsarFriend;
ConsumerImplBasePtr impl_;
explicit Consumer(ConsumerImplBasePtr);

friend class PulsarFriend;
friend class PulsarWrapper;
friend class PartitionedConsumerImpl;
friend class ConsumerImpl;
friend class ClientImpl;
Expand Down
5 changes: 4 additions & 1 deletion pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#include <pulsar/ConsumerType.h>
#include <pulsar/Message.h>

#pragma GCC visibility push(default)
namespace pulsar {

class Consumer;
class PulsarWrapper;

/// Callback definition for non-data operation
typedef boost::function<void(Result result)> ResultCallback;
Expand Down Expand Up @@ -117,12 +119,13 @@ class ConsumerConfiguration {
* @return the configured timeout in milliseconds caching BrokerConsumerStats.
*/
long getBrokerConsumerStatsCacheTimeInMs() const;
friend class PulsarWrapper;

private:
boost::shared_ptr<ConsumerConfigurationImpl> impl_;
};

}

#pragma GCC visibility pop
#endif /* PULSAR_CONSUMERCONFIGURATION_H_ */

3 changes: 2 additions & 1 deletion pulsar-client-cpp/include/pulsar/ConsumerType.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef PULSAR_CPP_CONSUMERTYPE_H
#define PULSAR_CPP_CONSUMERTYPE_H

#pragma GCC visibility push(default)
namespace pulsar {
enum ConsumerType {
/**
Expand All @@ -36,5 +37,5 @@ namespace pulsar {
ConsumerFailover
};
}

#pragma GCC visibility pop
#endif //PULSAR_CPP_CONSUMERTYPE_H
2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace pulsar {
class SharedBuffer;
class MessageBuilder;
class MessageImpl;
class PulsarWrapper;

class Message {
public:
Expand Down Expand Up @@ -127,6 +128,7 @@ class Message {
friend class Commands;
friend class BatchMessageContainer;
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;

friend std::ostream& operator<<(std::ostream& s, const StringMap& map);
friend std::ostream& operator<<(std::ostream& s, const Message& msg);
Expand Down
5 changes: 4 additions & 1 deletion pulsar-client-cpp/include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#pragma GCC visibility push(default)

namespace pulsar {
class PulsarWrapper;

class MessageBuilder {
public:
Expand Down Expand Up @@ -94,8 +95,10 @@ class MessageBuilder {
private:
MessageBuilder(const MessageBuilder&);
void checkMetadata();

static boost::shared_ptr<MessageImpl> createMessageImpl();
Message::MessageImplPtr impl_;

friend class PulsarWrapper;
};

}
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace pulsar {

class ConsumerImpl;
class UnAckedMessageTrackerEnabled;
class PulsarWrapper;

class MessageId {
public:
Expand All @@ -42,6 +43,7 @@ class MessageId {
friend class PartitionedConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
MessageId(int64_t, int64_t);
friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
int64_t ledgerId_;
Expand Down
6 changes: 3 additions & 3 deletions pulsar-client-cpp/include/pulsar/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@

#pragma GCC visibility push(default)

class PulsarFriend;

namespace pulsar {
class ProducerImplBase;

class PulsarWrapper;
class PulsarFriend;
class Producer {
public:
/**
Expand Down Expand Up @@ -100,6 +99,7 @@ class Producer {

friend class ClientImpl;
friend class PulsarFriend;
friend class PulsarWrapper;

ProducerImplBasePtr impl_;
};
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
#include <pulsar/Result.h>
#include <pulsar/Message.h>
#include <boost/function.hpp>
#pragma GCC visibility push(default)

namespace pulsar {

typedef boost::function<void(Result, const Message& msg)> SendCallback;
typedef boost::function<void(Result)> CloseCallback;

class ProducerConfigurationImpl;
class PulsarWrapper;

/**
* Class that holds the configuration for a producer
Expand Down Expand Up @@ -76,10 +78,13 @@ class ProducerConfiguration {
ProducerConfiguration& setBatchingMaxPublishDelayMs(
const unsigned long& batchingMaxPublishDelayMs);
const unsigned long& getBatchingMaxPublishDelayMs() const;
friend class PulsarWrapper;

private:
struct Impl;
boost::shared_ptr<ProducerConfigurationImpl> impl_;
};
}
#pragma GCC visibility pop
#endif /* PULSAR_PRODUCERCONFIGURATION_H_ */

2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/BrokerConsumerStats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ bool BrokerConsumerStats::isValid() const {
return impl_->isValid();
}

#pragma GCC visibility push(default)
std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) {
os << "\nBrokerConsumerStats [" << "validTill_ = " << obj.isValid() << ", msgRateOut_ = "
<< obj.getMsgRateOut() << ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
Expand All @@ -45,6 +46,7 @@ std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) {
<< ", msgBacklog_ = " << obj.getMsgBacklog() << "]";
return os;
}
#pragma GCC visibility pop

double BrokerConsumerStats::getMsgRateOut() const {
return impl_->getMsgRateOut();
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <boost/date_time/microsec_time_clock.hpp>
#include <pulsar/BrokerConsumerStats.h>
#include <lib/BrokerConsumerStatsImplBase.h>

#pragma GCC visibility push(default)
namespace pulsar {
class BrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
private:
Expand Down Expand Up @@ -123,4 +123,5 @@ class BrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
static ConsumerType convertStringToConsumerType(const std::string& str);
};
}
#pragma GCC visibility pop
#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ DECLARE_LOG_OBJECT()

namespace pulsar {

Client::Client(const boost::shared_ptr<ClientImpl> impl) : impl_(impl) {
}

Client::Client(const std::string& serviceUrl)
: impl_(boost::make_shared<ClientImpl>(serviceUrl, ClientConfiguration(), true)) {
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ClientConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const Authentication& ClientConfiguration::getAuth() const {
return *impl_->authenticationPtr;
}

const AuthenticationPtr& ClientConfiguration::getAuthenticationPtr() const {
const AuthenticationPtr& ClientConfiguration::getAuthPtr() const {
return impl_->authenticationPtr;
}

Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace pulsar {
ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getIOThreads())),
listenerExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
partitionListenerExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthenticationPtr(), poolConnections),
pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthPtr(), poolConnections),
producerIdGenerator_(0),
consumerIdGenerator_(0),
requestIdGenerator_(0) {
Expand All @@ -73,7 +73,7 @@ namespace pulsar {
lookupServicePtr_ = boost::make_shared<HTTPLookupService>(boost::cref(serviceUrl_),
boost::cref(clientConfiguration_),
boost::cref(
clientConfiguration.getAuthenticationPtr()));
clientConfiguration.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
lookupServicePtr_ = boost::make_shared<BinaryProtoLookupService>(boost::ref(pool_), boost::ref(serviceUrl));
Expand Down
Loading

0 comments on commit 7f63ee9

Please sign in to comment.