Skip to content

Commit

Permalink
[cpp client]support key shared for cpp client (apache#4366)
Browse files Browse the repository at this point in the history
support key shared for cpp client
  • Loading branch information
wolfstudy authored and jiazhai committed May 28, 2019
1 parent 7a21094 commit 8ab6b1b
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 4 deletions.
8 changes: 7 additions & 1 deletion pulsar-client-cpp/include/pulsar/ConsumerType.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ enum ConsumerType
/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/
ConsumerFailover
ConsumerFailover,

/**
* Multiple consumer will be able to use the same subscription and all messages with the same key
* will be dispatched to only one consumer
*/
ConsumerKeyShared
};
}

Expand Down
15 changes: 15 additions & 0 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,21 @@ class PULSAR_PUBLIC Message {
const std::string& getPartitionKey() const;
bool hasPartitionKey() const;

/**
* Get the ordering key of the message
*
* @return the ordering key of the message
*/
const std::string& getOrderingKey() const;

/**
* Check whether the message has a ordering key
*
* @return true if the ordering key was set while creating the message
* false if the ordering key was not set while creating the message
*/
bool hasOrderingKey() const;

/**
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
* producer
Expand Down
8 changes: 7 additions & 1 deletion pulsar-client-cpp/include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& setProperties(const StringMap& properties);

/*
/**
* set partition key for the message routing
* @param hash of this key is used to determine message's topic partition
*/
MessageBuilder& setPartitionKey(const std::string& partitionKey);

/**
* set ordering key for the message routing
* @param the ordering key for the message
*/
MessageBuilder& setOrderingKey(const std::string& orderingKey);

/**
* Set the event timestamp for the message.
*/
Expand Down
8 changes: 7 additions & 1 deletion pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ typedef enum {
/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/
pulsar_ConsumerFailover
pulsar_ConsumerFailover,

/**
* Multiple consumer will be able to use the same subscription and all messages with the same key
* will be dispatched to only one consumer
*/
pulsar_ConsumerKeyShared
} pulsar_consumer_type;

typedef enum {
Expand Down
15 changes: 14 additions & 1 deletion pulsar-client-cpp/include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ PULSAR_PUBLIC void pulsar_message_set_allocated_content(pulsar_message_t *messag
PULSAR_PUBLIC void pulsar_message_set_property(pulsar_message_t *message, const char *name,
const char *value);

/*
/**
* set partition key for the message routing
* @param hash of this key is used to determine message's topic partition
*/
PULSAR_PUBLIC void pulsar_message_set_partition_key(pulsar_message_t *message, const char *partitionKey);

/**
* Sets the ordering key of the message for message dispatch in Key_Shared mode.
* @param the ordering key for the message
*/
PULSAR_PUBLIC void pulsar_message_set_ordering_key(pulsar_message_t *message, const char *orderingKey);

/**
* Set the event timestamp for the message.
*/
Expand Down Expand Up @@ -157,6 +163,13 @@ PULSAR_PUBLIC pulsar_message_id_t *pulsar_message_get_message_id(pulsar_message_
PULSAR_PUBLIC const char *pulsar_message_get_partitionKey(pulsar_message_t *message);
PULSAR_PUBLIC int pulsar_message_has_partition_key(pulsar_message_t *message);

/**
* Get the ordering key of the message for message dispatch in Key_Shared mode.
* Partition key Will be used if ordering key not specified
*/
PULSAR_PUBLIC const char *pulsar_message_get_orderingKey(pulsar_message_t *message);
PULSAR_PUBLIC int pulsar_message_has_ordering_key(pulsar_message_t *message);

/**
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
* producer
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ ConsumerType BrokerConsumerStatsImpl::convertStringToConsumerType(const std::str
return ConsumerFailover;
} else if (str == "ConsumerShared" || str == "Shared") {
return ConsumerShared;
} else if (str == "ConsumerKeyShared" || str == "KeyShared") {
return ConsumerKeyShared;
} else {
return ConsumerExclusive;
}
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,9 @@ inline proto::CommandSubscribe_SubType ConsumerImpl::getSubType() {

case ConsumerFailover:
return proto::CommandSubscribe::Failover;

case ConsumerKeyShared:
return proto::CommandSubscribe_SubType_Key_Shared;
}
}

Expand Down
14 changes: 14 additions & 0 deletions pulsar-client-cpp/lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ const std::string& Message::getPartitionKey() const {
return impl_->getPartitionKey();
}

bool Message::hasOrderingKey() const {
if (impl_) {
return impl_->hasOrderingKey();
}
return false;
}

const std::string& Message::getOrderingKey() const {
if (!impl_) {
return emptyString;
}
return impl_->getOrderingKey();
}

const std::string& Message::getTopicName() const {
if (!impl_) {
return emptyString;
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ MessageBuilder& MessageBuilder::setPartitionKey(const std::string& partitionKey)
return *this;
}

MessageBuilder& MessageBuilder::setOrderingKey(const std::string& orderingKey) {
checkMetadata();
impl_->metadata.set_ordering_key(orderingKey);
return *this;
}

MessageBuilder& MessageBuilder::setEventTimestamp(uint64_t eventTimestamp) {
checkMetadata();
impl_->metadata.set_event_time(eventTimestamp);
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const std::string& MessageImpl::getPartitionKey() const { return metadata.partit

bool MessageImpl::hasPartitionKey() const { return metadata.has_partition_key(); }

const std::string& MessageImpl::getOrderingKey() const { return metadata.ordering_key(); }

bool MessageImpl::hasOrderingKey() const { return metadata.has_ordering_key(); }

uint64_t MessageImpl::getPublishTimestamp() const {
if (metadata.has_publish_time()) {
return metadata.publish_time();
Expand Down Expand Up @@ -77,6 +81,8 @@ void MessageImpl::setPartitionKey(const std::string& partitionKey) {
metadata.set_partition_key(partitionKey);
}

void MessageImpl::setOrderingKey(const std::string& orderingKey) { metadata.set_ordering_key(orderingKey); }

void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) { metadata.set_event_time(eventTimestamp); }

void MessageImpl::setTopicName(const std::string& topicName) {
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class MessageImpl {
const std::string& getPartitionKey() const;
bool hasPartitionKey() const;

const std::string& getOrderingKey() const;
bool hasOrderingKey() const;

uint64_t getPublishTimestamp() const;
uint64_t getEventTimestamp() const;

Expand All @@ -67,6 +70,7 @@ class MessageImpl {
void setProperty(const std::string& name, const std::string& value);
void disableReplication(bool flag);
void setPartitionKey(const std::string& partitionKey);
void setOrderingKey(const std::string& orderingKey);
void setEventTimestamp(uint64_t eventTimestamp);
Message::StringMap properties_;
};
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ void pulsar_message_set_partition_key(pulsar_message_t *message, const char *par
message->builder.setPartitionKey(partitionKey);
}

void pulsar_message_set_ordering_key(pulsar_message_t *message, const char *orderingKey) {
message->builder.setOrderingKey(orderingKey);
}

void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t eventTimestamp) {
message->builder.setEventTimestamp(eventTimestamp);
}
Expand Down Expand Up @@ -87,6 +91,12 @@ const char *pulsar_message_get_partitionKey(pulsar_message_t *message) {

int pulsar_message_has_partition_key(pulsar_message_t *message) { return message->message.hasPartitionKey(); }

const char *pulsar_message_get_orderingKey(pulsar_message_t *message) {
return message->message.getOrderingKey().c_str();
}

int pulsar_message_has_ordering_key(pulsar_message_t *message) { return message->message.hasOrderingKey(); }

uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message) {
return message->message.getPublishTimestamp();
}
Expand Down
21 changes: 21 additions & 0 deletions pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ TEST(ConsumerConfigurationTest, testReadCompactPersistentFailover) {
consumer.close();
}

TEST(ConsumerConfigurationTest, testSubscribePersistentKeyShared) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-key-shared-topic";
std::string subName = "test-persist-key-shared";

Result result;

ConsumerConfiguration config;
// now, key-shared not support read compact
config.setReadCompacted(false);
config.setConsumerType(ConsumerKeyShared);

ClientConfiguration clientConfig;
Client client(lookupUrl, clientConfig);

Consumer consumer;
result = client.subscribe(topicName, subName, config, consumer);
ASSERT_EQ(ResultOk, result);
consumer.close();
}

TEST(ConsumerConfigurationTest, testReadCompactPersistentShared) {
std::string lookupUrl = "pulsar://localhost:6650";
std::string topicName = "persist-topic";
Expand Down

0 comments on commit 8ab6b1b

Please sign in to comment.