Skip to content

Commit

Permalink
Export Message.Topic() in Go wrapper (apache#3346)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jan 10, 2019
1 parent 99f05b5 commit fa45573
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Message {
int32_t partition);
/// Used for Batch Messages
Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata);
proto::SingleMessageMetadata& singleMetadata, const std::string& topicName);
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
friend class MultiTopicsConsumerImpl;
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message);
*/
uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message);

const char *pulsar_message_get_topic_name(pulsar_message_t *message);

#pragma GCC visibility pop

#ifdef __cplusplus
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,8 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32

const MessageId& m = batchedMessage.impl_->messageId;
MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex);
Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata);
Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata,
batchedMessage.impl_->getTopicName());
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;

return singleMessage;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::

Message m(msg, metadata, payload, partitionIndex_);
m.impl_->cnx_ = cnx.get();
m.impl_->topicName_ = &topic_;
m.impl_->setTopicName(topic_);

LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metad
}

Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata)
proto::SingleMessageMetadata& singleMetadata, const std::string& topicName)
: impl_(boost::make_shared<MessageImpl>()) {
impl_->messageId = messageID;
impl_->metadata = metadata;
impl_->payload = payload;
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
impl_->topicName_ = &topicName;

if (singleMetadata.has_partition_key()) {
impl_->metadata.set_partition_key(singleMetadata.partition_key());
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ pulsar_string_map_t *pulsar_message_get_properties(pulsar_message_t *message) {
map->map = message->message.getProperties();
return map;
}

const char *pulsar_message_get_topic_name(pulsar_message_t *message) {
return message->message.getTopicName().c_str();
}
4 changes: 4 additions & 0 deletions pulsar-client-go/pulsar/c_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func (m *message) Key() string {
return C.GoString(C.pulsar_message_get_partitionKey(m.ptr))
}

func (m *message) Topic() string {
return C.GoString(C.pulsar_message_get_topic_name(m.ptr))
}

//////// MessageID

func newMessageId(msg *C.pulsar_message_t) MessageID {
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-go/pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func TestConsumer(t *testing.T) {
assertNotNil(t, msg)

assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
assertEqual(t, string(msg.Topic()), "persistent://public/default/my-topic")

consumer.Ack(msg)
}
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-go/pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type ProducerMessage struct {
}

type Message interface {
// Get the topic from which this message originated from
Topic() string

// Return the properties attached to the message.
// Properties are application defined key/value pairs that will be attached to the message
Properties() map[string]string
Expand Down

0 comments on commit fa45573

Please sign in to comment.