Skip to content

Commit

Permalink
[Issue 8154] [Python client] Expose schema version (of writerSchema) …
Browse files Browse the repository at this point in the history
…in Message (apache#8173)

* [Issue 8154] Expose schema version (of writerSchema) in python client message

* Adding formating suggestion on PR#8173 to fix tests

* Fixing build issues

* Added a test for python client returning schema version

* Added one more test case for python client returning schema version

* Fix test- move subscribe before send so the consumer offset is ahead of new data

* Fix test to make it run on python 2 and 3 both

Co-authored-by: Sijie Guo <[email protected]>
  • Loading branch information
shiv4289 and sijie authored Oct 28, 2020
1 parent 98bd0c2 commit 54d6811
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ class PULSAR_PUBLIC Message {
*/
const int getRedeliveryCount() const;

/**
* Check if schema version exists
*/
bool hasSchemaVersion() const;

/**
* Get the schema version
*/
const std::string& getSchemaVersion() const;

bool operator==(const Message& msg) const;

private:
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ PULSAR_PUBLIC const char *pulsar_message_get_topic_name(pulsar_message_t *messag

PULSAR_PUBLIC int pulsar_message_get_redelivery_count(pulsar_message_t *message);

PULSAR_PUBLIC int pulsar_message_has_schema_version(pulsar_message_t *message);

PULSAR_PUBLIC const char *pulsar_message_get_schemaVersion(pulsar_message_t *message);

PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message, const char *schemaVersion);

#ifdef __cplusplus
}
#endif
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
m.impl_->setTopicName(topic_);
m.impl_->setRedeliveryCount(msg.redelivery_count());

if (metadata.has_schema_version()) {
m.impl_->setSchemaVersion(metadata.schema_version());
}

LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
<< metadata.has_num_messages_in_batch());
Expand Down
14 changes: 14 additions & 0 deletions pulsar-client-cpp/lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ const int Message::getRedeliveryCount() const {
return impl_->getRedeliveryCount();
}

bool Message::hasSchemaVersion() const {
if (impl_) {
return impl_->hasSchemaVersion();
}
return false;
}

const std::string& Message::getSchemaVersion() const {
if (!impl_) {
return emptyString;
}
return impl_->getSchemaVersion();
}

uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }

uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,10 @@ int MessageImpl::getRedeliveryCount() { return redeliveryCount_; }

void MessageImpl::setRedeliveryCount(int count) { redeliveryCount_ = count; }

bool MessageImpl::hasSchemaVersion() const { return metadata.has_schema_version(); }

void MessageImpl::setSchemaVersion(const std::string& schemaVersion) { schemaVersion_ = &schemaVersion; }

const std::string& MessageImpl::getSchemaVersion() const { return metadata.schema_version(); }

} // namespace pulsar
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class MessageImpl {
ClientConnection* cnx_;
const std::string* topicName_;
int redeliveryCount_;
bool hasSchemaVersion_;
const std::string* schemaVersion_;

const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
Expand All @@ -66,6 +68,10 @@ class MessageImpl {
int getRedeliveryCount();
void setRedeliveryCount(int count);

bool hasSchemaVersion() const;
const std::string& getSchemaVersion() const;
void setSchemaVersion(const std::string& value);

friend class PulsarWrapper;
friend class MessageBuilder;

Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ def redelivery_count(self):
"""
return self._message.redelivery_count()

def schema_version(self):
"""
Get the schema version for this message
"""
return self._message.schema_version()

@staticmethod
def _wrap(_message):
self = Message()
Expand Down
26 changes: 26 additions & 0 deletions pulsar-client-cpp/python/schema_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,32 @@ class Example(Record):
self.assertEqual(r2.__class__.__name__, 'Example')
self.assertEqual(r2, r)

def test_schema_version(self):
class Example(Record):
a = Integer()
b = Integer()

client = pulsar.Client(self.serviceUrl)
producer = client.create_producer(
'my-avro-python-schema-version-topic',
schema=AvroSchema(Example))

consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1',
schema=AvroSchema(Example))

r = Example(a=1, b=2)
producer.send(r)

msg = consumer.receive()

self.assertIsNotNone(msg.schema_version())

self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode())

self.assertEqual(r, msg.value())

client.close()

def test_serialize_wrong_types(self):
class Example(Record):
a = Integer()
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/python/src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ std::string Topic_name_str(const Message& msg) {
return ss.str();
}

std::string schema_version_str(const Message& msg) {
std::stringstream ss;
ss << msg.getSchemaVersion();
return ss.str();
}

const MessageId& Message_getMessageId(const Message& msg) {
return msg.getMessageId();
}
Expand Down Expand Up @@ -168,6 +174,7 @@ void export_message() {
.def("__str__", &Message_str)
.def("topic_name", &Topic_name_str)
.def("redelivery_count", &Message::getRedeliveryCount)
.def("schema_version", &schema_version_str)
;

MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom;
Expand Down

0 comments on commit 54d6811

Please sign in to comment.