Skip to content

Commit

Permalink
[client] add properties to producer for cpp & python client (apache#2420
Browse files Browse the repository at this point in the history
)

* [client] add properties to producer for cpp & python client

 ### Motivation

This is a caught-up change to enable properties for producer as java clients.

 ### Changes

Enable properties on producer for both cpp & python client

 ### Results

Properties are added as metadata for CommandProducer. However there is no way
to verify the producer properties. so I didn't add any specific tests, just
adding properties for both cpp and python clients in the tests, that should
excerise the corresponding code path.

* Add `properties` to pydoc
  • Loading branch information
sijie authored and merlimat committed Aug 22, 2018
1 parent def32dd commit 834104b
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 6 deletions.
34 changes: 34 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,40 @@ class ProducerConfiguration {
bool isEncryptionEnabled() const;
ProducerConfiguration& addEncryptionKey(std::string key);

/**
* Check whether the message has a specific property attached.
*
* @param name the name of the property to check
* @return true if the message has the specified property
* @return false if the property is not defined
*/
bool hasProperty(const std::string& name) const;

/**
* Get the value of a specific property
*
* @param name the name of the property
* @return the value of the property or null if the property was not defined
*/
const std::string& getProperty(const std::string& name) const;

/**
* Get all the properties attached to this producer.
*/
std::map<std::string, std::string>& getProperties() const;

/**
* Sets a new property on a message.
* @param name the name of the property
* @param value the associated value
*/
ProducerConfiguration& setProperty(const std::string& name, const std::string& value);

/**
* Add all the properties in the provided map
*/
ProducerConfiguration& setProperties(const std::map<std::string, std::string>& properties);

friend class PulsarWrapper;

private:
Expand Down
11 changes: 10 additions & 1 deletion pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "pulsar/MessageBuilder.h"
#include "PulsarApi.pb.h"
#include "LogUtils.h"
#include "PulsarApi.pb.h"
#include "Utils.h"
#include "Url.h"
#include "checksum/ChecksumProvider.h"
Expand Down Expand Up @@ -231,13 +232,21 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) {
}

SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
const std::string& producerName, uint64_t requestId) {
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
producer->set_topic(topic);
producer->set_producer_id(producerId);
producer->set_request_id(requestId);
for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(it->first);
keyValue->set_value(it->second);
producer->mutable_metadata()->AddAllocated(keyValue);
}

if (!producerName.empty()) {
producer->set_producer_name(producerName);
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class Commands {
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

static SharedBuffer newProducer(const std::string& topic, uint64_t producerId,
const std::string& producerName, uint64_t requestId);
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata);

static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
proto::CommandAck_AckType ackType, int validationError);
Expand Down
34 changes: 33 additions & 1 deletion pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include <lib/ProducerConfigurationImpl.h>

namespace pulsar {

const static std::string emptyString;

ProducerConfiguration::ProducerConfiguration() : impl_(boost::make_shared<ProducerConfigurationImpl>()) {}

ProducerConfiguration::~ProducerConfiguration() {}
Expand All @@ -36,7 +39,6 @@ ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string&
}

const std::string& ProducerConfiguration::getProducerName() const {
static const std::string emptyString;
return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
}

Expand Down Expand Up @@ -185,4 +187,34 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}

bool ProducerConfiguration::hasProperty(const std::string& name) const {
const std::map<std::string, std::string>& m = impl_->properties;
return m.find(name) != m.end();
}

const std::string& ProducerConfiguration::getProperty(const std::string& name) const {
if (hasProperty(name)) {
const std::map<std::string, std::string>& m = impl_->properties;
return m.at(name);
} else {
return emptyString;
}
}

std::map<std::string, std::string>& ProducerConfiguration::getProperties() const { return impl_->properties; }

ProducerConfiguration& ProducerConfiguration::setProperty(const std::string& name, const std::string& value) {
impl_->properties.insert(std::make_pair(name, value));
return *this;
}

ProducerConfiguration& ProducerConfiguration::setProperties(
const std::map<std::string, std::string>& properties) {
for (std::map<std::string, std::string>::const_iterator it = properties.begin(); it != properties.end();
it++) {
setProperty(it->first, it->second);
}
return *this;
}

} // namespace pulsar
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct ProducerConfigurationImpl {
CryptoKeyReaderPtr cryptoKeyReader;
std::set<std::string> encryptionKeys;
ProducerCryptoFailureAction cryptoFailureAction;
std::map<std::string, std::string> properties;
ProducerConfigurationImpl()
: sendTimeoutMs(30000),
compressionType(CompressionNone),
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();

SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId);
SharedBuffer cmd =
Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties());
cnx->sendRequestWithId(cmd, requestId)
.addListener(boost::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, _1, _2));
}
Expand Down
11 changes: 10 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ def create_producer(self, topic,
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
properties=None,
):
"""
Create a new producer on a given topic.
Expand Down Expand Up @@ -361,6 +362,9 @@ def create_producer(self, topic,
* `message_routing_mode`:
Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
other option is `PartitionsRoutingMode.UseSinglePartition`
* `properties`:
Sets the properties for the producer. The properties associated with a producer
can be used for identify a producer at broker side.
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
Expand All @@ -374,6 +378,7 @@ def create_producer(self, topic,
_check_type(int, batching_max_messages, 'batching_max_messages')
_check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
_check_type_or_none(dict, properties, 'properties')

conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
Expand All @@ -390,6 +395,10 @@ def create_producer(self, topic,
conf.producer_name(producer_name)
if initial_sequence_id:
conf.initial_sequence_id(initial_sequence_id)
if properties:
for k, v in properties.items():
conf.property(k, v)

p = Producer()
p._producer = self._client.create_producer(topic, conf)
return p
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void export_config() {
.def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, return_self<>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs, return_value_policy<copy_const_reference>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs, return_self<>())
.def("property", &ProducerConfiguration::setProperty, return_self<>())
;

class_<ConsumerConfiguration>("ConsumerConfiguration")
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/python/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
'my-topic',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10
batching_max_publish_delay_ms=10,
properties={
"producer-name": "test-producer-name",
"producer-id": "test-producer-id"
}
)

for i in range(10):
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ TEST(BasicEndToEndTest, testBatchMessages) {
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
conf.setBlockIfQueueFull(true);
conf.setProperty("producer-name", "test-producer-name");
conf.setProperty("producer-id", "test-producer-id");

Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Expand Down

0 comments on commit 834104b

Please sign in to comment.