Skip to content

Commit

Permalink
[C++] Fix single message metadata not set correctly (apache#15072)
Browse files Browse the repository at this point in the history
### Motivation

Recently I found the messages sent by C++ producer don't have the schema
version, which causes Java consumer cannot consume them with
`AUTO_CONSUME` schema. After rechecking the code, I found the C++ client
doesn't set single message metadata correctly, i.e. when batching is
enabled, some messages' metadata could be wrong.

- In `initBatchMessageMetadata`, the schema version is not set.
- In `serializeSingleMessageInBatchWithPayload`, the ordering key and
  the sequence id are not set.

In addition, when a C++ consumer consumes batched messages from a Java
producer, some metadata might be wrong. Because even for batched
messages, Java producer also sets the partition key and the ordering
key. It's redundant because only keys in `SingleMessageMetadata` should
be set. To avoid 2nd and later single messages in the batch reuse the
keys in `MessageMetadata`, Java client clears these fields if the
`SingleMessageMetadata` doesn't contain them when a `MessageImpl` is
constructed.

### Modifications

- Set the fields that were not set before when creating a batch. Some
  fields like `null_value` and `null_partition_key` are not set because
  they are not supported by C++ client at this moment.
- Use a more efficient way to copy the repeated fields of ProtoBuf.
- Clear some fields when they are not contained by the
  `SingleMessageMetadata` object when creating a `MessageImpl` so that
  the bahavior could be consisitent with Java client.

### Verifying this change

Following tests are added:
- `BatchMessageTest.testSingleMessageMetadata`: test 3 single messages
  in batch are consumed successfully, i.e. the correct metadata is received.
- `SchemaTest.testHasSchemaVersion`: test when schema is configured, all
  messages should has the schema version.
- The validation for schema version is also added to
  `ProtobufNativeSchemaTest.testEndToEnd`.
  • Loading branch information
BewareMyPower authored Apr 8, 2022
1 parent 5cf3fa0 commit 6f41fde
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 11 deletions.
31 changes: 20 additions & 11 deletions pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -689,26 +689,35 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa
batchMetadata.add_replicate_to(metadata.replicate_to(i));
}
}
// TODO: set other optional fields
if (metadata.has_schema_version()) {
batchMetadata.set_schema_version(metadata.schema_version());
}
}

uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
unsigned long maxMessageSizeInBytes) {
const auto& msgMetadata = msg.impl_->metadata;
SingleMessageMetadata metadata;
if (msg.impl_->hasPartitionKey()) {
metadata.set_partition_key(msg.impl_->getPartitionKey());
if (msgMetadata.has_partition_key()) {
metadata.set_partition_key(msgMetadata.partition_key());
}
if (msgMetadata.has_ordering_key()) {
metadata.set_ordering_key(msgMetadata.ordering_key());
}

for (MessageBuilder::StringMap::const_iterator it = msg.impl_->properties().begin();
it != msg.impl_->properties().end(); it++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(it->first);
keyValue->set_value(it->second);
metadata.mutable_properties()->Reserve(msgMetadata.properties_size());
for (int i = 0; i < msgMetadata.properties_size(); i++) {
auto keyValue = proto::KeyValue().New();
*keyValue = msgMetadata.properties(i);
metadata.mutable_properties()->AddAllocated(keyValue);
}

if (msg.impl_->getEventTimestamp() != 0) {
metadata.set_event_time(msg.impl_->getEventTimestamp());
if (msgMetadata.has_event_time()) {
metadata.set_event_time(msgMetadata.event_time());
}

if (msgMetadata.has_sequence_id()) {
metadata.set_sequence_id(msgMetadata.sequence_id());
}

// Format of batch message
Expand Down Expand Up @@ -738,7 +747,7 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
batchPayLoad.bytesWritten(msgMetadataSize);
batchPayLoad.write(msg.impl_->payload.data(), payloadSize);

return msg.impl_->metadata.sequence_id();
return msgMetadata.sequence_id();
}

Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) {
Expand Down
26 changes: 26 additions & 0 deletions pulsar-client-cpp/lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,38 @@ Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, S
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
impl_->topicName_ = &topicName;

impl_->metadata.clear_properties();
if (singleMetadata.properties_size() > 0) {
impl_->metadata.mutable_properties()->Reserve(singleMetadata.properties_size());
for (int i = 0; i < singleMetadata.properties_size(); i++) {
auto keyValue = proto::KeyValue().New();
*keyValue = singleMetadata.properties(i);
impl_->metadata.mutable_properties()->AddAllocated(keyValue);
}
}

if (singleMetadata.has_partition_key()) {
impl_->metadata.set_partition_key(singleMetadata.partition_key());
} else {
impl_->metadata.clear_partition_key();
}

if (singleMetadata.has_ordering_key()) {
impl_->metadata.set_ordering_key(singleMetadata.ordering_key());
} else {
impl_->metadata.clear_ordering_key();
}

if (singleMetadata.has_event_time()) {
impl_->metadata.set_event_time(singleMetadata.event_time());
} else {
impl_->metadata.clear_event_time();
}

if (singleMetadata.has_sequence_id()) {
impl_->metadata.set_sequence_id(singleMetadata.sequence_id());
} else {
impl_->metadata.clear_sequence_id();
}
}

Expand Down
78 changes: 78 additions & 0 deletions pulsar-client-cpp/tests/BatchMessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1071,3 +1071,81 @@ TEST(BatchMessageTest, testProducerQueueWithBatches) {

ASSERT_EQ(rejectedMessges, 10);
}

TEST(BatchMessageTest, testSingleMessageMetadata) {
const auto topic = "BatchMessageTest-SingleMessageMetadata-" + std::to_string(time(nullptr));
constexpr int numMessages = 3;

Client client(lookupUrl);

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(
topic, ProducerConfiguration().setBatchingMaxMessages(numMessages), producer));

producer.sendAsync(MessageBuilder()
.setContent("msg-0")
.setPartitionKey("key-0")
.setOrderingKey("ordering-key-0")
.setEventTimestamp(10UL)
.setProperty("k0", "v0")
.setProperty("k1", "v1")
.build(),
nullptr);
producer.sendAsync(MessageBuilder()
.setContent("msg-1")
.setOrderingKey("ordering-key-1")
.setEventTimestamp(11UL)
.setProperty("k2", "v2")
.build(),
nullptr);
producer.sendAsync(MessageBuilder().setContent("msg-2").build(), nullptr);
ASSERT_EQ(ResultOk, producer.flush());

Message msgs[numMessages];
for (int i = 0; i < numMessages; i++) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
msgs[i] = msg;
LOG_INFO("message " << i << ": " << msg.getDataAsString()
<< ", key: " << (msg.hasPartitionKey() ? msg.getPartitionKey() : "(null)")
<< ", ordering key: " << (msg.hasOrderingKey() ? msg.getOrderingKey() : "(null)")
<< ", event time: " << (msg.getEventTimestamp())
<< ", properties count: " << msg.getProperties().size()
<< ", has schema version: " << msg.hasSchemaVersion());
}

ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
ASSERT_TRUE(msgs[0].hasPartitionKey());
ASSERT_EQ(msgs[0].getPartitionKey(), "key-0");
ASSERT_TRUE(msgs[0].hasOrderingKey());
ASSERT_EQ(msgs[0].getOrderingKey(), "ordering-key-0");
ASSERT_EQ(msgs[0].getEventTimestamp(), 10UL);
ASSERT_EQ(msgs[0].getProperties().size(), 2);
ASSERT_TRUE(msgs[0].hasProperty("k0"));
ASSERT_EQ(msgs[0].getProperty("k0"), "v0");
ASSERT_TRUE(msgs[0].hasProperty("k1"));
ASSERT_EQ(msgs[0].getProperty("k1"), "v1");
ASSERT_FALSE(msgs[0].hasSchemaVersion());

ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
ASSERT_FALSE(msgs[1].hasPartitionKey());
ASSERT_TRUE(msgs[1].hasOrderingKey());
ASSERT_EQ(msgs[1].getOrderingKey(), "ordering-key-1");
ASSERT_EQ(msgs[1].getEventTimestamp(), 11UL);
ASSERT_EQ(msgs[1].getProperties().size(), 1);
ASSERT_TRUE(msgs[1].hasProperty("k2"));
ASSERT_EQ(msgs[1].getProperty("k2"), "v2");
ASSERT_FALSE(msgs[1].hasSchemaVersion());

ASSERT_EQ(msgs[2].getDataAsString(), "msg-2");
ASSERT_FALSE(msgs[2].hasPartitionKey());
ASSERT_FALSE(msgs[2].hasOrderingKey());
ASSERT_EQ(msgs[2].getEventTimestamp(), 0UL);
ASSERT_EQ(msgs[2].getProperties().size(), 0);
ASSERT_FALSE(msgs[2].hasSchemaVersion());

client.close();
}
3 changes: 3 additions & 0 deletions pulsar-client-cpp/tests/ProtobufNativeSchemaTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ TEST(ProtobufNativeSchemaTest, testEndToEnd) {
receivedTestMessage.ParseFromArray(msg.getData(), msg.getLength());
ASSERT_EQ(receivedTestMessage.testenum(), ::proto::TestEnum::FAILOVER);

ASSERT_TRUE(msg.hasSchemaVersion());
ASSERT_EQ(msg.getSchemaVersion(), std::string(8L, '\0'));

client.close();
}

Expand Down
34 changes: 34 additions & 0 deletions pulsar-client-cpp/tests/SchemaTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,37 @@ TEST(SchemaTest, testSchema) {

client.close();
}

TEST(SchemaTest, testHasSchemaVersion) {
Client client(lookupUrl);
std::string topic = "SchemaTest-HasSchemaVersion";
SchemaInfo stringSchema(SchemaType::STRING, "String", "");

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic + "1", "sub", ConsumerConfiguration().setSchema(stringSchema),
consumer));
Producer batchedProducer;
ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
batchedProducer));
Producer nonBatchedProducer;
ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
nonBatchedProducer));

ASSERT_EQ(ResultOk, batchedProducer.send(MessageBuilder().setContent("msg-0").build()));
ASSERT_EQ(ResultOk, nonBatchedProducer.send(MessageBuilder().setContent("msg-1").build()));

Message msgs[2];
ASSERT_EQ(ResultOk, consumer.receive(msgs[0], 3000));
ASSERT_EQ(ResultOk, consumer.receive(msgs[1], 3000));

std::string schemaVersion(8, '\0');
ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
ASSERT_TRUE(msgs[0].hasSchemaVersion());
ASSERT_EQ(msgs[0].getSchemaVersion(), schemaVersion);

ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
ASSERT_TRUE(msgs[1].hasSchemaVersion());
ASSERT_EQ(msgs[1].getSchemaVersion(), schemaVersion);

client.close();
}

0 comments on commit 6f41fde

Please sign in to comment.