Skip to content

Commit

Permalink
Get ConsumerStats using Binary Protocol (apache#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
jai1 authored Feb 27, 2017
1 parent 9fde15b commit ae381b4
Show file tree
Hide file tree
Showing 28 changed files with 5,756 additions and 359 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ pulsar-broker/src/test/resources/log4j.properties
# Maven
log/
target/

# tmp files
*.pid
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/crc32c*</exclude>
<exclude>pulsar-client-cpp/lib/lz4/lz4.*</exclude>
<exclude>pulsar-client-cpp/.idea/*</exclude>
<exclude>pulsar-client-cpp/lib/PulsarApi.pb.*</exclude>
<exclude>pulsar-client-cpp/CMakeFiles/**</exclude>
<exclude>pulsar-client-cpp/travis-build.sh</exclude>
Expand Down
49 changes: 37 additions & 12 deletions protobuf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,56 @@

Pulsar uses protocol buffer messages for the client/broker wire protocol.

The protocol definition is located at `pulsar-common/src/main/proto/PulsarApi.proto` and the pre-generated Java code is at `pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java`.
The protocol definition is located at `pulsar-common/src/main/proto/PulsarApi.proto`. When making a change to the `PulsarApi.proto` definition, we have to regenerate the `PulsarApi.*` files and include them in the same commit.

When making a change to the `PulsarApi.proto` definition, we have regenerate the `PulsarApi.java` and include that in the same commit.
### For Broker and Java Client:

The pre-generated Java code is at `pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java`.

We are currently using a modified version of the Google Protocol Buffer code generator, to generate code that can serialize/deserialize messages with no memory allocations (caching already instantiated objects) and also to be able to directly use Netty pooled ByteBuf with direct memory.

To re-generate the `PulsarApi.java` code you need to apply a patch to the protobuf generator. Patch is found at `protobuf.patch`.
To re-generate the `PulsarApi.java` code you need to apply a patch to the protobuf generator. Patch is found in `protobuf.patch`.

### For C++ Client:

The pre-generated C++ code is at `pulsar-client-cpp/lib/PulsarApi.pb.cc` and `pulsar-client-cpp/lib/PulsarApi.pb.h`.

### Commands for creating the pre-generated files

```shell
export PULSAR_HOME=<Path where you cloned the pulsar repo>

cd $HOME
git clone https://github.com/google/protobuf.git
cd protobuf
git checkout v2.4.1

# Apply patch
patch -p1 < ../pulsar-path/protobuf/protobuf.patch
### For C++ ###
cd ${HOME}/protobuf
git checkout v2.6.0

# Compile protobuf
### Compile protobuf
autoreconf --install
./configure
make

# This would leave the binary in src/protoc
### Re-generate PulsarApi
cd ${PULSAR_HOME}/pulsar-client-cpp/
export PROTOC=${HOME}/protobuf/src/protoc
./generate_protobuf.sh

### For Java ###
cd ${HOME}/protobuf
git checkout v2.4.1

### Apply patch
patch -p1 < ${PULSAR_HOME}/protobuf/protobuf.patch

# Re-generate PulsarApi
cd pulsar-path/pulsar-common
PROTOC=~/protobuf/src/protoc ./generate_protobuf.sh
### Compile protobuf
autoreconf --install
./configure
make

### Re-generate PulsarApi
cd ${PULSAR_HOME}/pulsar-common/
export PROTOC=${HOME}/protobuf/src/protoc
./generate_protobuf.sh
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.utils.CopyOnWriteArrayList;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@

import com.yahoo.pulsar.broker.authentication.AuthenticationDataCommand;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnect;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandFlow;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
Expand Down Expand Up @@ -192,6 +197,88 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
});
}

@Override
protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
if (log.isDebugEnabled()) {
log.debug("Received CommandConsumerStats call from {}", remoteAddress);
}

final long requestId = commandConsumerStats.getRequestId();
final String topicName = commandConsumerStats.getTopicName();
final String subscriptionName = commandConsumerStats.getSubscriptionName();
final long consumerId = commandConsumerStats.getConsumerId();

if (log.isDebugEnabled()) {
log.debug("CommandConsumerStats[requestId = {}, topicName = {}, subscriptionName = {}, consumerId = {}]", requestId, topicName, subscriptionName, consumerId);
}

ByteBuf msg = null;
try {
PersistentTopic topic = (PersistentTopic) getBrokerService().getTopicReference(topicName);
if (topic != null) {
if (topic.getSubscriptions().containsKey(subscriptionName)) {
PersistentSubscription subscription = topic.getSubscriptions().get(subscriptionName);
boolean consumerFound = false;
for (Consumer consumer : subscription.getConsumers()) {
if (consumer.consumerId() == consumerId) {
consumerFound = true;
msg = Commands.newConsumerStatsResponse(createConsumerStatsResponse(consumer, subscription, requestId));
break;
}
}
if (!consumerFound) {
log.error(
"Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, topicName = {}, subscriptionName = {}, consumerId = {}]",
remoteAddress, requestId, topicName, subscriptionName, consumerId);
msg = Commands.newConsumerStatsResponse(ServerError.ConsumerNotFound,
"Consumer " + consumerId + " not found", requestId);
}
} else {
log.error(
"Failed to get consumer-stats response - Subscription not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, topicName = {}, subscriptionName = {}, consumerId = {}]",
remoteAddress, requestId, topicName, subscriptionName, consumerId);
msg = Commands.newConsumerStatsResponse(ServerError.SubscriptionNotFound,
"Subscription " + subscriptionName + " not found", requestId);
}
} else {
log.error(
"Failed to get consumer-stats response - Topic not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, topicName = {}, subscriptionName = {}, consumerId = {}]",
remoteAddress, requestId, topicName, subscriptionName, consumerId);
msg = Commands.newConsumerStatsResponse(ServerError.TopicNotFound, "Topic " + topicName + " not found",
requestId);
}
} catch (Exception e) {
log.error("Failed to get consumer-stats response - Exception: {} for CommandConsumerStats[remoteAddress = {}, requestId = {}, topicName = {}, subscriptionName = {}, consumerId = {}]",
e, remoteAddress, requestId, topicName, subscriptionName, consumerId);
msg = Commands.newConsumerStatsResponse(ServerError.UnknownError, "Exception: " + e, requestId);
} finally {
if (msg != null) {
ctx.writeAndFlush(msg);
}
}
}

CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consumer, PersistentSubscription subscription, long requestId) {
CommandConsumerStatsResponse.Builder commandConsumerStatsResponseBuilder = CommandConsumerStatsResponse
.newBuilder();
ConsumerStats consumerStats = consumer.getStats();
commandConsumerStatsResponseBuilder.setRequestId(requestId);
commandConsumerStatsResponseBuilder.setMsgRateOut(consumerStats.msgRateOut);
commandConsumerStatsResponseBuilder.setMsgThroughputOut(consumerStats.msgThroughputOut);
commandConsumerStatsResponseBuilder.setMsgRateRedeliver(consumerStats.msgRateRedeliver);
commandConsumerStatsResponseBuilder.setConsumerName(consumerStats.consumerName);
commandConsumerStatsResponseBuilder.setAvailablePermits(consumerStats.availablePermits);
commandConsumerStatsResponseBuilder.setUnackedMessages(consumerStats.unackedMessages);
commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs);
commandConsumerStatsResponseBuilder.setAddress(consumerStats.address);
commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.connectedSince);

commandConsumerStatsResponseBuilder.setType(subscription.getTypeString());
commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate());
commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog());
return commandConsumerStatsResponseBuilder;
}

@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Start);
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
.settings/
.pydevproject
.idea/
*.cbp

# doxygen files
apidocs/
Expand Down
73 changes: 71 additions & 2 deletions pulsar-client-cpp/include/pulsar/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
#include <boost/shared_ptr.hpp>
#include <pulsar/Message.h>
#include <pulsar/Result.h>

#include <boost/date_time/posix_time/ptime.hpp>
#include <iostream>
#pragma GCC visibility push(default)

class PulsarFriend;
Expand Down Expand Up @@ -55,6 +56,61 @@ enum ConsumerType {
ConsumerFailover
};

class BrokerConsumerStats {
private:
/*
* validTillInMs_ - Stats will be valid till this time.
*/
boost::posix_time::ptime validTill_;
public:
BrokerConsumerStats();
BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut,
double msgRateRedeliver, std::string consumerName, int availablePermits,
int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address,
std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog);

/** Returns true if the Message is Expired **/
bool isValid() const;

/** Total rate of messages delivered to the consumer. msg/s */
double msgRateOut_;

/** Total throughput delivered to the consumer. bytes/s */
double msgThroughputOut_;

/** Total rate of messages redelivered by this consumer. msg/s */
double msgRateRedeliver_;

/** Name of the consumer */
std::string consumerName_;

/** Number of available message permits for the consumer */
int availablePermits_;

/** Number of unacknowledged messages for the consumer */
int unackedMessages_;

/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */
bool blockedConsumerOnUnackedMsgs_;

/** Address of this consumer */
std::string address_;

/** Timestamp of connection */
std::string connectedSince_;

/// Whether this subscription is Exclusive or Shared or Failover
std::string type_;

/// Total rate of messages expired on this subscription. msg/s
double msgRateExpired_;

/// Number of messages in the subscription backlog
long msgBacklog_;

friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj);
};

/**
* Class specifying the configuration of a consumer.
*/
Expand Down Expand Up @@ -284,7 +340,20 @@ class Consumer {
*/
void redeliverUnacknowledgedMessages();

private:
/**
* Gets Consumer Stats from broker.
* The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires
* then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are
* still valid.
*
* @param brokerConsumerStats - if the function returns ResultOk, this object will contain consumer stats
* @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned.
*
* @note This is a blocking call with timeout of thirty seconds.
*/
Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1);

private:
typedef boost::shared_ptr<ConsumerImplBase> ConsumerImplBasePtr;
friend class PulsarFriend;
ConsumerImplBasePtr impl_;
Expand Down
7 changes: 5 additions & 2 deletions pulsar-client-cpp/include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ enum Result {
ResultProducerBlockedQuotaExceededError, /// Producer is blocked
ResultProducerBlockedQuotaExceededException, /// Producer is getting exception
ResultProducerQueueIsFull, /// Producer queue is full

ResultMessageTooBig /// Trying to send a messages exceeding the max size
ResultMessageTooBig, /// Trying to send a messages exceeding the max size
ResultTopicNotFound, /// Topic not found
ResultSubscriptionNotFound, /// Subscription not found
ResultConsumerNotFound, /// Consumer not found
ResultUnsupportedVersionError /// Error when an older client/version doesn't support a required feature
};

// Return string representation of result code
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/BatchMessageContainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ SharedBuffer BatchMessageContainer::getBatchedPayload() {
void BatchMessageContainer::clear() {
LOG_DEBUG(*this << " BatchMessageContainer::clear() called");
timer_->cancel();
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_))/++numberOfBatchesSent_;
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_))/(numberOfBatchesSent_ + 1);
numberOfBatchesSent_++;
messagesContainerListPtr_ = messageContainerListPool.create();
// Try to optimize this
messagesContainerListPtr_->reserve(10000);
Expand Down
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ struct ClientConfiguration::Impl {
operationTimeoutSeconds(30),
messageListenerThreads(1),
concurrentLookupRequest(5000),
logConfFilePath() {}
logConfFilePath(),
useTls(false),
tlsAllowInsecureConnection(true) {}
};

ClientConfiguration::ClientConfiguration()
Expand Down
Loading

0 comments on commit ae381b4

Please sign in to comment.