Skip to content

Commit

Permalink
CPP Client - Added Monitoring Stats (apache#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
jai1 authored Jun 28, 2017
1 parent 8ccbfae commit b3db513
Show file tree
Hide file tree
Showing 25 changed files with 2,176 additions and 83 deletions.
12 changes: 12 additions & 0 deletions pulsar-client-cpp/include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ class ClientConfiguration {

ClientConfiguration& setTlsAllowInsecureConnection(bool allowInsecure);
bool isTlsAllowInsecureConnection() const;

/*
* Initialize stats interval in seconds. Stats are printed and reset after every 'statsIntervalInSeconds'.
* Set to 0 in order to disable stats collection.
*/
ClientConfiguration& setStatsIntervalInSeconds(const unsigned int&);

/*
* Get the stats interval set in the client.
*/
const unsigned int& getStatsIntervalInSeconds() const;

friend class ClientImpl;
friend class PulsarWrapper;

Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ProducerConfiguration {
ProducerConfiguration& setBatchingMaxPublishDelayMs(
const unsigned long& batchingMaxPublishDelayMs);
const unsigned long& getBatchingMaxPublishDelayMs() const;

friend class PulsarWrapper;

private:
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc)
file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc stats/*.cc)

execute_process(COMMAND cat ../pom.xml COMMAND xmllint --format - COMMAND sed "s/xmlns=\".*\"//g" COMMAND xmllint --stream --pattern /project/version --debug - COMMAND grep -A 2 "matches pattern" COMMAND grep text COMMAND sed "s/.* [0-9] //g" OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE PV)
set (CMAKE_CXX_FLAGS " ${CMAKE_CXX_FLAGS} -msse4.2 -mpclmul -D_PULSAR_VERSION_=\\\"${PV}\\\"")
Expand Down
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/ClientConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,13 @@ const std::string& ClientConfiguration::getLogConfFilePath() const {
return impl_->logConfFilePath;
}

ClientConfiguration& ClientConfiguration::setStatsIntervalInSeconds(const unsigned int& statsIntervalInSeconds) {
impl_->statsIntervalInSeconds = statsIntervalInSeconds;
return *this;
}

const unsigned int& ClientConfiguration::getStatsIntervalInSeconds() const {
return impl_->statsIntervalInSeconds;
}

}
5 changes: 4 additions & 1 deletion pulsar-client-cpp/lib/ClientConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ struct ClientConfigurationImpl {
bool useTls;
std::string tlsTrustCertsFilePath;
bool tlsAllowInsecureConnection;
unsigned int statsIntervalInSeconds;
ClientConfigurationImpl() : authenticationPtr(AuthFactory::Disabled()),
ioThreads(1),
operationTimeoutSeconds(30),
messageListenerThreads(1),
concurrentLookupRequest(5000),
logConfFilePath(),
useTls(false),
tlsAllowInsecureConnection(true) {}
tlsAllowInsecureConnection(true),
statsIntervalInSeconds(600) { // 10 minutes
}
};
}

Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ namespace pulsar {
}

ClientImpl::~ClientImpl() {
LOG_DEBUG("~ClientImpl");
shutdown();
}

Expand Down Expand Up @@ -367,4 +368,8 @@ namespace pulsar {
return requestIdGenerator_++;
}

const ClientConfiguration& ClientImpl::getClientConfig() const {
return clientConfiguration_;
}

} /* namespace pulsar */
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
uint64_t newConsumerId();
uint64_t newRequestId();

const ClientConfiguration& getClientConfig() const;

const ClientConfiguration& conf() const;
ExecutorServiceProviderPtr getIOExecutorProvider();
ExecutorServiceProviderPtr getListenerExecutorProvider();
Expand Down
43 changes: 37 additions & 6 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
consumerCreatedPromise_(),
messageListenerRunning_(true),
batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_),
brokerConsumerStats_() {
brokerConsumerStats_(),
consumerStatsBasePtr_() {
std::stringstream consumerStrStream;
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
Expand All @@ -67,6 +68,15 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
} else {
listenerExecutor_ = client->getListenerExecutorProvider()->get();
}

unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
if (statsIntervalInSeconds) {
consumerStatsBasePtr_ = boost::make_shared<ConsumerStatsImpl>(
consumerStr_, client->getIOExecutorProvider()->get()->createDeadlineTimer(),
statsIntervalInSeconds);
} else {
consumerStatsBasePtr_ = boost::make_shared<ConsumerStatsDisabled>();
}
}

ConsumerImpl::~ConsumerImpl() {
Expand Down Expand Up @@ -357,6 +367,7 @@ void ConsumerImpl::internalListener() {
return;
}
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
messageListener_(Consumer(shared_from_this()), msg);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
Expand Down Expand Up @@ -407,6 +418,12 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
}

Result ConsumerImpl::receive(Message& msg) {
Result res = receiveHelper(msg);
consumerStatsBasePtr_->receivedMessage(msg, res);
return res;
}

Result ConsumerImpl::receiveHelper(Message& msg) {
{
Lock lock(mutex_);
if (state_ != Ready) {
Expand All @@ -429,6 +446,12 @@ Result ConsumerImpl::receive(Message& msg) {
}

Result ConsumerImpl::receive(Message& msg, int timeout) {
Result res = receiveHelper(msg, timeout);
consumerStatsBasePtr_->receivedMessage(msg, res);
return res;
}

Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
if (config_.getReceiverQueueSize() == 0) {
LOG_WARN(getName() << "Can't use this function if the queue size is 0");
return ResultInvalidConfiguration;
Expand Down Expand Up @@ -496,27 +519,35 @@ inline proto::CommandSubscribe_SubType ConsumerImpl::getSubType() {
}
}


void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::CommandAck_AckType ackType) {
consumerStatsBasePtr_->messageAcknowledged(res, ackType);
callback(res);
}

void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
ResultCallback cb = boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Individual);
const BatchMessageId& batchMsgId = (const BatchMessageId&)msgId;
if(batchMsgId.batchIndex_ != -1 && !batchAcknowledgementTracker_.isBatchReady(batchMsgId, proto::CommandAck_AckType_Individual)) {
callback(ResultOk);
cb(ResultOk);
return;
}
doAcknowledge(batchMsgId, proto::CommandAck_AckType_Individual, callback);
doAcknowledge(batchMsgId, proto::CommandAck_AckType_Individual, cb);
}

void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, ResultCallback callback) {
ResultCallback cb = boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Cumulative);
const BatchMessageId& msgId = (const BatchMessageId&) mId;
if(msgId.batchIndex_ != -1 && !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Cumulative)) {
BatchMessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
if(messageId == BatchMessageId()) {
// nothing to ack
callback(ResultOk);
cb(ResultOk);
} else {
doAcknowledge(messageId, proto::CommandAck_AckType_Cumulative, callback);
doAcknowledge(messageId, proto::CommandAck_AckType_Cumulative, cb);
}
} else {
doAcknowledge(msgId, proto::CommandAck_AckType_Cumulative, callback);
doAcknowledge(msgId, proto::CommandAck_AckType_Cumulative, cb);
}
}

Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "BatchAcknowledgementTracker.h"
#include <limits>
#include <lib/BrokerConsumerStatsImpl.h>
#include <lib/stats/ConsumerStatsImpl.h>
#include <lib/stats/ConsumerStatsDisabled.h>

using namespace pulsar;

Expand Down Expand Up @@ -107,6 +109,7 @@ enum ConsumerTopicType {
}
virtual const std::string& getName() const;
virtual int getNumOfPrefetchedMessages() const ;
ConsumerStatsBasePtr consumerStatsBasePtr_;
private:
bool waitingForZeroQueueSizeMessage;
bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
Expand All @@ -119,6 +122,11 @@ enum ConsumerTopicType {
unsigned int receiveIndividualMessagesFromBatch(Message &batchedMessage);
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback);

// TODO - Convert these functions to lambda when we move to C++11
Result receiveHelper(Message& msg);
Result receiveHelper(Message& msg, int timeout);
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);

boost::mutex mutexForReceiveWithZeroQueueSize;
const ConsumerConfiguration config_;
const std::string subscription_;
Expand All @@ -139,6 +147,8 @@ enum ConsumerTopicType {
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;

friend class PulsarFriend;
};

} /* namespace pulsar */
Expand Down
45 changes: 25 additions & 20 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,17 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic,
LOG_DEBUG(
"ProducerName - " << producerName_ << " Created producer on topic " << topic_ << " id: " << producerId_);
// boost::ref is used to drop the constantness constraint of make_shared
if(conf_.getBatchingEnabled()) {
if (conf_.getBatchingEnabled()) {
batchMessageContainer = boost::make_shared<BatchMessageContainer>(boost::ref(*this));
}
numOfMsgPublished = 0;
numOfSendAsyncCalls = 0;
numOfMsgAckSuccessfully = 0;

unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
if (statsIntervalInSeconds) {
producerStatsBasePtr_ = boost::make_shared<ProducerStatsImpl>(producerStr_, executor_->createDeadlineTimer(),
statsIntervalInSeconds);
} else {
producerStatsBasePtr_ = boost::make_shared<ProducerStatsDisabled>();
}
}

ProducerImpl::~ProducerImpl() {
Expand Down Expand Up @@ -234,7 +239,14 @@ void ProducerImpl::setMessageMetadata(const Message &msg, const uint64_t& sequen
}
}

void ProducerImpl::statsCallBackHandler(Result res, const Message& msg, SendCallback callback, boost::posix_time::ptime publishTime) {
producerStatsBasePtr_->messageReceived(res, publishTime);
callback(res, msg);
}

void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
producerStatsBasePtr_->messageSent(msg);
SendCallback cb = boost::bind(&ProducerImpl::statsCallBackHandler, this, _1, _2, callback, boost::posix_time::microsec_clock::universal_time());
if (msg.getLength() > Commands::MaxMessageSize) {
callback(ResultMessageTooBig, msg);
return;
Expand All @@ -258,13 +270,12 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
}

Lock lock(mutex_);
numOfSendAsyncCalls++;
if (state_ != Ready) {
lock.unlock();
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
callback(ResultAlreadyClosed, msg);
cb(ResultAlreadyClosed, msg);
return;
}

Expand All @@ -274,7 +285,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
callback(ResultInvalidMessage, msg);
cb(ResultInvalidMessage, msg);
return;
}

Expand All @@ -289,17 +300,17 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
batchMessageContainer->sendMessage();
}
lock.unlock();
callback(ResultProducerQueueIsFull, msg);
cb(ResultProducerQueueIsFull, msg);
return;
}

// If we reach this point then you have a reserved spot on the queue

if (batchMessageContainer) { // Batching is enabled
batchMessageContainer->add(msg, callback);
batchMessageContainer->add(msg, cb);
return;
}
sendMessage(msg, callback);
sendMessage(msg, cb);
}

// Precondition -
Expand All @@ -314,7 +325,6 @@ void ProducerImpl::sendMessage(const Message& msg, SendCallback callback) {

LOG_DEBUG("Inserting data to pendingMessagesQueue_");
pendingMessagesQueue_.push(op, true);
numOfMsgPublished++;
LOG_DEBUG("Completed Inserting data to pendingMessagesQueue_");

ClientConnectionPtr cnx = getCnx().lock();
Expand All @@ -340,14 +350,11 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e

void ProducerImpl::printStats() {
if (batchMessageContainer) {
LOG_INFO("Producer - " << producerStr_ << ", [numOfMsgPublished = " << numOfMsgPublished
<< "] [numOfMsgAckSuccessfully = " << numOfMsgAckSuccessfully
<< "] [numOfSendAsyncCalls =" << numOfSendAsyncCalls << "] [batchMessageContainer = "
<< *batchMessageContainer << "]");
LOG_INFO("Producer - " << producerStr_ <<
", [batchMessageContainer = " << *batchMessageContainer << "]");
} else {
LOG_INFO("Producer - " << producerStr_ << ", [numOfMsgPublished = " << numOfMsgPublished
<< "] [numOfMsgAckSuccessfully = " << numOfMsgAckSuccessfully
<< "] [numOfSendAsyncCalls =" << numOfSendAsyncCalls << "] [batching = off]");
LOG_INFO("Producer - " << producerStr_ <<
", [batching = off]");
}
}

Expand Down Expand Up @@ -476,7 +483,6 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
} else {
LOG_DEBUG(getName() << "Remove corrupt message from queue " << sequenceId);
pendingMessagesQueue_.pop();
numOfMsgAckSuccessfully++;
if (op.msg_.impl_->metadata.has_num_messages_in_batch()) {
// batch message - need to release more spots
// -1 since the pushing batch message into the queue already released a spot
Expand Down Expand Up @@ -520,7 +526,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
pendingMessagesQueue_.pop();
numOfMsgAckSuccessfully++;
if (op.msg_.impl_->metadata.has_num_messages_in_batch()) {
// batch message - need to release more spots
// -1 since the pushing batch message into the queue already released a spot
Expand Down
9 changes: 5 additions & 4 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "HandlerBase.h"
#include "SharedBuffer.h"
#include "CompressionCodec.h"
#include "stats/ProducerStatsDisabled.h"
#include "stats/ProducerStatsImpl.h"

using namespace pulsar;

Expand Down Expand Up @@ -82,6 +84,7 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P
bool isClosed();

protected:
ProducerStatsBasePtr producerStatsBasePtr_;

typedef BlockingQueue<OpSendMsg> MessageQueue;

Expand Down Expand Up @@ -110,6 +113,8 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const std::string& producerName);

void statsCallBackHandler(Result , const Message& , SendCallback , boost::posix_time::ptime );

void handleClose(Result result, ResultCallback callback);

void resendMessages(ClientConnectionPtr cnx);
Expand All @@ -136,10 +141,6 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;

void failPendingMessages(Result result);

unsigned long numOfMsgPublished;
unsigned long numOfSendAsyncCalls;
unsigned long numOfMsgAckSuccessfully;
};

struct ProducerImplCmp {
Expand Down
Loading

0 comments on commit b3db513

Please sign in to comment.