Skip to content

Commit

Permalink
[C++] Show bytes throughput for performance tests (apache#7371)
Browse files Browse the repository at this point in the history
### Motivation

The send callback changed after apache#4811 but message size cannot be retrieved from `MessageId`, so the statistics of message bytes were lost.

### Modifications

- Format code in `examples` and `perf` directories;
- Add a parameter to send callback to record each message's bytes count;
- Enable round robin routing for partitioned topics.
-------------
* Format perf and examples

* Add statistics of message bytes

* Enable round robin routing for partitioned topics
  • Loading branch information
BewareMyPower authored Jun 29, 2020
1 parent a3a63a3 commit f8080f4
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 144 deletions.
4 changes: 4 additions & 0 deletions pulsar-client-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ add_custom_target(format python ${BUILD_SUPPORT_DIR}/run_clang_format.py
0
${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
${CMAKE_SOURCE_DIR}/lib
${CMAKE_SOURCE_DIR}/perf
${CMAKE_SOURCE_DIR}/examples
${CMAKE_SOURCE_DIR}/tests
${CMAKE_SOURCE_DIR}/include)

Expand All @@ -352,5 +354,7 @@ add_custom_target(check-format python ${BUILD_SUPPORT_DIR}/run_clang_format.py
1
${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
${CMAKE_SOURCE_DIR}/lib
${CMAKE_SOURCE_DIR}/perf
${CMAKE_SOURCE_DIR}/examples
${CMAKE_SOURCE_DIR}/tests
${CMAKE_SOURCE_DIR}/include)
1 change: 0 additions & 1 deletion pulsar-client-cpp/examples/SampleConsumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;

int main() {

Client client("pulsar://localhost:6650");

Consumer consumer;
Expand Down
110 changes: 55 additions & 55 deletions pulsar-client-cpp/perf/PerfConsumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ struct Arguments {

namespace pulsar {
class PulsarFriend {
public:
static Client getClient(const std::string& url, const ClientConfiguration conf,
bool poolConnections) {
public:
static Client getClient(const std::string& url, const ClientConfiguration conf, bool poolConnections) {
return Client(url, conf, poolConnections);
}
};
}
} // namespace pulsar

#if __GNUC__ == 4 && __GNUC_MINOR__ == 4
// Used for gcc-4.4.7 with boost-1.41
Expand All @@ -89,9 +88,8 @@ class PulsarFriend {
#include <atomic>
#endif

class EncKeyReader: public CryptoKeyReader {

private:
class EncKeyReader : public CryptoKeyReader {
private:
std::string privKeyContents;

void readFile(std::string fileName, std::string& fileContents) const {
Expand All @@ -101,20 +99,21 @@ class EncKeyReader: public CryptoKeyReader {
fileContents = fileStream.str();
}

public:

public:
EncKeyReader(std::string keyFile) {
if (keyFile.empty()) {
return;
}
readFile(keyFile, privKeyContents);
}

Result getPublicKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const {
return ResultInvalidConfiguration;
}

Result getPrivateKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
EncryptionKeyInfo& encKeyInfo) const {
encKeyInfo.setKey(privKeyContents);
return ResultOk;
}
Expand All @@ -126,9 +125,7 @@ std::atomic<uint32_t> bytesReceived;

typedef std::chrono::high_resolution_clock Clock;

void handleAckComplete(Result) {
}

void handleAckComplete(Result) {}

std::mutex mutex;
typedef std::unique_lock<std::mutex> Lock;
Expand All @@ -151,7 +148,7 @@ std::vector<Consumer> consumers;

void handleSubscribe(Result result, Consumer consumer, Latch latch) {
if (result != ResultOk) {
LOG_ERROR("Error creating consumer: "<< result);
LOG_ERROR("Error creating consumer: " << result);
exit(-1);
}

Expand All @@ -172,7 +169,7 @@ void startPerfConsumer(const Arguments& args) {
}
conf.setIOThreads(args.ioThreads);
conf.setMessageListenerThreads(args.listenerThreads);
if(!args.authPlugin.empty()) {
if (!args.authPlugin.empty()) {
AuthenticationPtr auth = AuthFactory::create(args.authPlugin, args.authParams);
conf.setAuth(auth);
}
Expand All @@ -190,9 +187,7 @@ void startPerfConsumer(const Arguments& args) {
Latch latch(args.numTopics * args.numConsumers);

for (int i = 0; i < args.numTopics; i++) {
std::string topic =
(args.numTopics == 1) ?
args.topic : args.topic + "-" + std::to_string(i);
std::string topic = (args.numTopics == 1) ? args.topic : args.topic + "-" + std::to_string(i);
LOG_INFO("Adding " << args.numConsumers << " consumers on topic " << topic);

for (int j = 0; j < args.numConsumers; j++) {
Expand All @@ -203,22 +198,22 @@ void startPerfConsumer(const Arguments& args) {
subscriberName = args.subscriberName;
}

client.subscribeAsync(topic, subscriberName, consumerConf,
std::bind(handleSubscribe, std::placeholders::_1, std::placeholders::_2, latch));
client.subscribeAsync(
topic, subscriberName, consumerConf,
std::bind(handleSubscribe, std::placeholders::_1, std::placeholders::_2, latch));
}
}

Clock::time_point oldTime = Clock::now();

latch.wait();
LOG_INFO(
"Start receiving from " << args.numConsumers << " consumers on " << args.numTopics << " topics");
LOG_INFO("Start receiving from " << args.numConsumers << " consumers on " << args.numTopics << " topics");

while (true) {
std::this_thread::sleep_for(seconds(10));

Clock::time_point now = Clock::now();
double elapsed = duration_cast < milliseconds > (now - oldTime).count() / 1e3;
double elapsed = duration_cast<milliseconds>(now - oldTime).count() / 1e3;

double rate = messagesReceived.exchange(0) / elapsed;
double throughput = bytesReceived.exchange(0) / elapsed / 1024 / 1024 * 8;
Expand All @@ -230,7 +225,8 @@ void startPerfConsumer(const Arguments& args) {
lock.unlock();

LOG_INFO("Throughput received: " << rate << " msg/s --- " << throughput << " Mbit/s ---" //
<< " End-To-End latency: avg: " << e2eLatencyAvgMs << " ms -- 99pct: " << e2eLatency99pctMs << " ms");
<< " End-To-End latency: avg: " << e2eLatencyAvgMs
<< " ms -- 99pct: " << e2eLatency99pctMs << " ms");

oldTime = now;
}
Expand All @@ -246,7 +242,7 @@ int main(int argc, char** argv) {
po::variables_map vm;
po::options_description confFileDesc;
confFileDesc.add_options() //
("serviceURL", po::value<std::string>()->default_value("pulsar://localhost:6650"));
("serviceURL", po::value<std::string>()->default_value("pulsar://localhost:6650"));

po::store(po::parse_config_file<char>(file, confFileDesc, true), vm);
po::notify(vm);
Expand All @@ -263,45 +259,51 @@ int main(int argc, char** argv) {
po::options_description desc("Allowed options");
desc.add_options() //

("help,h", "Print this help message") //
("help,h", "Print this help message") //

("auth-params,v", po::value<std::string>(&args.authParams)->default_value(""), "Authentication parameters, e.g., \"key1:val1,key2:val2\"") //
("auth-params,v", po::value<std::string>(&args.authParams)->default_value(""),
"Authentication parameters, e.g., \"key1:val1,key2:val2\"") //

("auth-plugin,a", po::value<std::string>(&args.authPlugin)->default_value(""), "Authentication plugin class library path") //
("auth-plugin,a", po::value<std::string>(&args.authPlugin)->default_value(""),
"Authentication plugin class library path") //

("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false), "Whether tls connection is used") //
("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false),
"Whether tls connection is used") //

("allow-insecure,d", po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true), "Whether insecure tls connection is allowed") //
("allow-insecure,d", po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true),
"Whether insecure tls connection is allowed") //

("trust-cert-file,c", po::value<std::string>(&args.tlsTrustCertsFilePath)->default_value(""), "TLS trust certification file path") //
("trust-cert-file,c", po::value<std::string>(&args.tlsTrustCertsFilePath)->default_value(""),
"TLS trust certification file path") //

("num-topics,t", po::value<int>(&args.numTopics)->default_value(1), "Number of topics") //
("num-topics,t", po::value<int>(&args.numTopics)->default_value(1), "Number of topics") //

("num-consumers,n", po::value<int>(&args.numConsumers)->default_value(1),
"Number of consumers (per topic)") //
("num-consumers,n", po::value<int>(&args.numConsumers)->default_value(1),
"Number of consumers (per topic)") //

("subscriber-name,s", po::value<std::string>(&args.subscriberName)->default_value("sub"),
"Subscriber name prefix") //
("subscriber-name,s", po::value<std::string>(&args.subscriberName)->default_value("sub"),
"Subscriber name prefix") //

("wait-time,w", po::value<int>(&args.waitTimeMs)->default_value(1),
"Simulate a slow message consumer (Delay in ms)") //
("wait-time,w", po::value<int>(&args.waitTimeMs)->default_value(1),
"Simulate a slow message consumer (Delay in ms)") //

("service-url,u", po::value<std::string>(&args.serviceURL)->default_value(defaultServiceUrl),
"Pulsar Service URL") //
("service-url,u", po::value<std::string>(&args.serviceURL)->default_value(defaultServiceUrl),
"Pulsar Service URL") //

("receiver-queue-size,p", po::value<int>(&args.receiverQueueSize)->default_value(1000),
"Size of the receiver queue") //
("receiver-queue-size,p", po::value<int>(&args.receiverQueueSize)->default_value(1000),
"Size of the receiver queue") //

("io-threads,i", po::value<int>(&args.ioThreads)->default_value(1),
"Number of IO threads to use") //
("io-threads,i", po::value<int>(&args.ioThreads)->default_value(1),
"Number of IO threads to use") //

("listener-threads,l", po::value<int>(&args.listenerThreads)->default_value(1),
"Number of listener threads") //
("listener-threads,l", po::value<int>(&args.listenerThreads)->default_value(1),
"Number of listener threads") //

("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""), "The private key name to decrypt payload") //
("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""),
"The private key name to decrypt payload") //

("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""),
"The file which contains the private key to decrypt payload"); //
("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""),
"The file which contains the private key to decrypt payload"); //

po::options_description hidden;
hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic name");
Expand All @@ -311,9 +313,7 @@ int main(int argc, char** argv) {

po::variables_map map;
try {
po::store(
po::command_line_parser(argc, argv).options(allOptions).positional(positional).run(),
map);
po::store(po::command_line_parser(argc, argv).options(allOptions).positional(positional).run(), map);
po::notify(map);
} catch (const std::exception& e) {
std::cerr << "Error parsing parameters -- " << e.what() << std::endl << std::endl;
Expand All @@ -327,8 +327,8 @@ int main(int argc, char** argv) {
}

if (map.count("topic") != 1) {
std::cerr << "Need to specify a topic name. eg: persistent://prop/cluster/ns/my-topic"
<< std::endl << std::endl;
std::cerr << "Need to specify a topic name. eg: persistent://prop/cluster/ns/my-topic" << std::endl
<< std::endl;
std::cerr << desc << std::endl;
return -1;
}
Expand Down
Loading

0 comments on commit f8080f4

Please sign in to comment.