Skip to content

Commit

Permalink
[Issue 4803][client] return null if the message value/data is not set…
Browse files Browse the repository at this point in the history
… by producer (apache#6379)

Fixes apache#4803 

### Motivation
Allow the typed consumer receive messages with `null` value if the producer sends message without payload.

### Modifications
- add a flag in `MessageMetadata` to indicate if the payload is set when the message is created
- check and return `null` if the flag is not set when reading data from a message
  • Loading branch information
nlu90 authored May 19, 2020
1 parent 1d5c418 commit d55bc00
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1930,6 +1930,9 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}
if (metadata.hasNullValue()) {
responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,17 @@ public void persistentTopics(String topicName) throws Exception {
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);

publishNullValueMessageOnPersistentTopic(persistentTopicName, 10);
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
messages = admin.topics().peekMessages(persistentTopicName, subName, 10);
assertEquals(messages.size(), 10);
for (int i = 0; i < 10; i++) {
assertNull(messages.get(i).getData());
assertNull(messages.get(i).getValue());
}
admin.topics().skipAllMessages(persistentTopicName, subName);

consumer.close();
client.close();

Expand Down Expand Up @@ -1559,19 +1570,28 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception {
long secondTimestamp = System.currentTimeMillis();

private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
publishMessagesOnPersistentTopic(topicName, messages, 0);
publishMessagesOnPersistentTopic(topicName, messages, 0, false);
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
publishMessagesOnPersistentTopic(topicName, messages, 0, true);
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
boolean nullValue) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

for (int i = startIdx; i < (messages + startIdx); i++) {
String message = "message-" + i;
producer.send(message.getBytes());
if (nullValue) {
producer.send(null);
} else {
String message = "message-" + i;
producer.send(message.getBytes());
}
}

producer.close();
Expand Down Expand Up @@ -1704,13 +1724,13 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

publishMessagesOnPersistentTopic(topicName, 5, 0);
publishMessagesOnPersistentTopic(topicName, 5, 0, false);

// Allow at least 1ms for messages to have different timestamps
Thread.sleep(1);
long messageTimestamp = System.currentTimeMillis();

publishMessagesOnPersistentTopic(topicName, 5, 5);
publishMessagesOnPersistentTopic(topicName, 5, 5, false);

List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
assertEquals(messages.size(), 10);
Expand Down Expand Up @@ -1757,17 +1777,17 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

publishMessagesOnPersistentTopic(topicName, 5, 0);
publishMessagesOnPersistentTopic(topicName, 5, 0, false);

// Allow at least 1ms for messages to have different timestamps
Thread.sleep(1);
long firstTimestamp = System.currentTimeMillis();
publishMessagesOnPersistentTopic(topicName, 3, 5);
publishMessagesOnPersistentTopic(topicName, 3, 5, false);

Thread.sleep(1);
long secondTimestamp = System.currentTimeMillis();

publishMessagesOnPersistentTopic(topicName, 2, 8);
publishMessagesOnPersistentTopic(topicName, 2, 8, false);

List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
assertEquals(messages.size(), 10);
Expand Down Expand Up @@ -1829,13 +1849,13 @@ public void persistentTopicsCursorResetAndFailover() throws Exception {
.consumerName("consumerA").subscriptionType(SubscriptionType.Failover)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

publishMessagesOnPersistentTopic(topicName, 5, 0);
publishMessagesOnPersistentTopic(topicName, 5, 0, false);

// Allow at least 1ms for messages to have different timestamps
Thread.sleep(1);
long messageTimestamp = System.currentTimeMillis();

publishMessagesOnPersistentTopic(topicName, 5, 5);
publishMessagesOnPersistentTopic(topicName, 5, 5, false);

// Currently the active consumer is consumerA
for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -1866,7 +1886,7 @@ public void persistentTopicsCursorResetAndFailover() throws Exception {
// Closing consumerA activates consumerB
consumerA.close();

publishMessagesOnPersistentTopic(topicName, 5, 10);
publishMessagesOnPersistentTopic(topicName, 5, 10, false);

int receivedAfterFailover = 0;
for (int i = 10; i < 15; i++) {
Expand Down Expand Up @@ -1901,11 +1921,11 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

publishMessagesOnPersistentTopic(topicName, 5, 0);
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
Thread.sleep(1);

long timestamp = System.currentTimeMillis();
publishMessagesOnPersistentTopic(topicName, 5, 5);
publishMessagesOnPersistentTopic(topicName, 5, 5, false);

for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer.receive();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* Null value message produce and consume test.
*/
@Slf4j
public class NullValueTest extends BrokerTestBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void nullValueBytesSchemaTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/null-value-bytes-test";

@Cleanup
Producer producer = pulsarClient.newProducer()
.topic(topic)
.create();

@Cleanup
Consumer consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();

int numMessage = 10;
for (int i = 0; i < numMessage; i++) {
if (i % 2 == 0) {
producer.newMessage().value("not null".getBytes()).send();
} else {
producer.newMessage().value(null).send();
}
}

for (int i = 0; i < numMessage; i++) {
Message message = consumer.receive();
if (i % 2 == 0) {
Assert.assertNotNull(message.getData());
Assert.assertNotNull(message.getValue());
Assert.assertEquals(new String(message.getData()), "not null");
} else {
Assert.assertNull(message.getData());
Assert.assertNull(message.getValue());
}
consumer.acknowledge(message);
}

for (int i = 0; i < numMessage; i++) {
if (i % 2 == 0) {
producer.newMessage().value("not null".getBytes()).sendAsync();
} else {
producer.newMessage().value(null).sendAsync();
}
}

for (int i = 0; i < numMessage; i++) {
CompletableFuture<Message> completableFuture = consumer.receiveAsync();
final int index = i;
completableFuture.whenComplete((message, throwable) -> {
Assert.assertNull(throwable);
if (index % 2 == 0) {
Assert.assertNotNull(message.getData());
Assert.assertNotNull(message.getValue());
Assert.assertEquals(new String(message.getData()), "not null");
} else {
Assert.assertNull(message.getData());
Assert.assertNull(message.getValue());
}
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
Assert.assertNull(e);
}
});
}

}

@Test
public void nullValueBooleanSchemaTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/null-value-bool-test";

@Cleanup
Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
.topic(topic)
.create();

@Cleanup
Consumer<Boolean> consumer = pulsarClient.newConsumer(Schema.BOOL)
.topic(topic)
.subscriptionName("test")
.subscribe();

int numMessage = 10;
for (int i = 0; i < numMessage; i++) {
producer.newMessage().value(null).sendAsync();
}

for (int i = 0; i < numMessage; i++) {
Message<Boolean> message = consumer.receive();
Assert.assertNull(message.getValue());
Assert.assertNull(message.getData());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,7 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}

String msgId = response.getHeaderString(MESSAGE_ID);
PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
try (InputStream stream = (InputStream) response.getEntity()) {
byte[] data = new byte[stream.available()];
stream.read(data);
Expand All @@ -1298,10 +1299,16 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
if (tmp != null) {
properties.put("publish-time", (String) tmp);
}

tmp = headers.getFirst("X-Pulsar-null-value");
if (tmp != null) {
messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
}

tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String) tmp);
return getIndividualMsgsFromBatch(topic, msgId, data, properties);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
}
for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
Expand All @@ -1312,12 +1319,12 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}

return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
Unpooled.wrappedBuffer(data), Schema.BYTES));
Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata));
}
}

private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
Map<String, String> properties) {
Map<String, String> properties, PulsarApi.MessageMetadata.Builder msgMetadataBuilder) {
List<Message<byte[]>> ret = new ArrayList<>();
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
ByteBuf buf = Unpooled.wrappedBuffer(data);
Expand All @@ -1334,7 +1341,8 @@ private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String ms
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload,
Schema.BYTES, msgMetadataBuilder));
} catch (Exception ex) {
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ protected boolean canEnqueueMessage(Message<T> message) {
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
if (canEnqueueMessage(message)) {
incomingMessages.add(message);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.getData().length);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
this, message.getData() == null ? 0 : message.getData().length);
}
return hasEnoughMessagesForBatchReceive();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
stats.updateNumMsgsReceived(msg);

trackMessage(msg);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
}

protected void trackMessage(Message<?> msg) {
Expand Down
Loading

0 comments on commit d55bc00

Please sign in to comment.