Skip to content

Commit

Permalink
Allow user to get topic from consumer message (apache#2007) (apache#2135
Browse files Browse the repository at this point in the history
)

When a user receives a message, allow them to get the topic from the
message itself. This is mostly useful in the case where a single
consumer is subscribed to multiple topics.

The change also contains a minor cleanup of the constructors in
MessageImpl.
  • Loading branch information
ivankelly authored and sijie committed Sep 30, 2018
1 parent 29382dc commit f8256e6
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public void producerBaseSetup() throws Exception {
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns");
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("test"));

// so that clients can test short names
admin.tenants().createTenant("public",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("public/default");
admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
}

protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TopicFromMessageTest extends ProducerConsumerBase {
private static final long testTimeout = 90000; // 1.5 min
private static final Logger log = LoggerFactory.getLogger(TopicFromMessageTest.class);

@Override
@BeforeMethod
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@Override
@AfterMethod
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = testTimeout)
public void testSingleTopicConsumerNoBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("topic1").subscriptionName("sub1").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("topic1").enableBatching(false).create()) {
producer.send("foobar".getBytes());
Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic1");
}
}

@Test(timeOut = testTimeout)
public void testSingleTopicConsumerNoBatchFullName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("my-property/my-ns/topic1").subscriptionName("sub1").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("my-property/my-ns/topic1").enableBatching(false).create()) {
producer.send("foobar".getBytes());
Assert.assertEquals(consumer.receive().getTopicName(), "persistent://my-property/my-ns/topic1");
}
}

@Test(timeOut = testTimeout)
public void testMultiTopicConsumerNoBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topics(Lists.newArrayList("topic1", "topic2")).subscriptionName("sub1").subscribe();
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic("topic1").enableBatching(false).create();
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic("topic2").enableBatching(false).create()) {
producer1.send("foobar".getBytes());
producer2.send("foobar".getBytes());
Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic1");
Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic2");
}
}

@Test(timeOut = testTimeout)
public void testSingleTopicConsumerBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("topic1").subscriptionName("sub1").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("topic1").enableBatching(true).batchingMaxMessages(1).create()) {
producer.send("foobar".getBytes());

Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic1");
}
}

@Test(timeOut = testTimeout)
public void testMultiTopicConsumerBatchShortName() throws Exception {
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topics(Lists.newArrayList("topic1", "topic2")).subscriptionName("sub1").subscribe();
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic("topic1").enableBatching(true).batchingMaxMessages(1).create();
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic("topic2").enableBatching(true).batchingMaxMessages(1).create()) {

producer1.send("foobar".getBytes());
producer2.send("foobar".getBytes());

Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic1");
Assert.assertEquals(consumer.receive().getTopicName(), "persistent://public/default/topic2");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ private CompletableFuture<List<Message<byte[]>>> peekNthMessage(String topic, St
@Override
public void completed(Response response) {
try {
future.complete(getMessageFromHttpResponse(response));
future.complete(getMessageFromHttpResponse(tn.toString(), response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
Expand Down Expand Up @@ -851,7 +851,7 @@ private TopicName validateTopic(String topic) {
return TopicName.get(topic);
}

private List<Message<byte[]>> getMessageFromHttpResponse(Response response) throws Exception {
private List<Message<byte[]>> getMessageFromHttpResponse(String topic, Response response) throws Exception {

if (response.getStatus() != Status.OK.getStatusCode()) {
if (response.getStatus() >= 500) {
Expand Down Expand Up @@ -879,7 +879,7 @@ private List<Message<byte[]>> getMessageFromHttpResponse(Response response) thro
tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String)tmp);
return getIndividualMsgsFromBatch(msgId, data, properties);
return getIndividualMsgsFromBatch(topic, msgId, data, properties);
}
for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
Expand All @@ -889,15 +889,17 @@ private List<Message<byte[]>> getMessageFromHttpResponse(Response response) thro
}
}

return Collections.singletonList(new MessageImpl<byte[]>(msgId, properties, data, Schema.BYTES));
return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
Unpooled.wrappedBuffer(data), Schema.BYTES));
} finally {
if (stream != null) {
stream.close();
}
}
}

private List<Message<byte[]>> getIndividualMsgsFromBatch(String msgId, byte[] data, Map<String, String> properties) {
private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
Map<String, String> properties) {
List<Message<byte[]>> ret = new ArrayList<>();
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
for (int i = 0; i < batchSize; i++) {
Expand All @@ -914,7 +916,7 @@ private List<Message<byte[]>> getIndividualMsgsFromBatch(String msgId, byte[] da
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl<>(batchMsgId, properties, singleMessagePayload, Schema.BYTES));
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
} catch (Exception ex) {
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ public interface Message<T> {
*/
byte[] getKeyBytes();

/**
* Get the topic the message was published to
*
* @return the topic the message was published to
*/
String getTopicName();

/**
* {@link EncryptionContext} contains encryption and compression information in it using which application can
* decrypt consumed message with encrypted-payload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final SubscriptionInitialPosition subscriptionInitialPosition;
private final ConnectionHandler connectionHandler;

private final TopicName topicName;
private final String topicNameWithoutPartition;

private ConcurrentHashMap<MessageIdImpl, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
Expand Down Expand Up @@ -206,8 +207,8 @@ enum SubscriptionMode {
new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS),
this);

TopicName topicName = TopicName.get(topic);
if (topicName.isPersistent()) {
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
} else {
Expand Down Expand Up @@ -815,9 +816,9 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
final MessageImpl<T> message = new MessageImpl<>(msgId, msgMetadata, uncompressedPayload,
createEncryptionContext(msgMetadata), cnx, schema);

final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId,
msgMetadata, uncompressedPayload,
createEncryptionContext(msgMetadata), cnx, schema);
uncompressedPayload.release();
msgMetadata.recycle();

Expand Down Expand Up @@ -995,8 +996,8 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv

BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), i, acker);
final MessageImpl<T> message = new MessageImpl<>(batchMessageIdImpl, msgMetadata,
singleMessageMetadataBuilder.build(), singleMessagePayload,
final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload,
createEncryptionContext(msgMetadata), cnx, schema);
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class MessageImpl<T> implements Message<T> {
private Schema<T> schema;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();

private String topic; // only set for incoming messages
transient private Map<String, String> properties;

// Constructor for out-going message
Expand All @@ -64,35 +65,25 @@ static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, Byt
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
msg.msgMetadataBuilder = msgMetadataBuilder;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.payload = Unpooled.wrappedBuffer(payload);
msg.properties = null;
msg.schema = schema;
return msg;
}

static MessageImpl<byte[]> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload) {
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
msg.msgMetadataBuilder = msgMetadataBuilder;
msg.messageId = null;
msg.cnx = null;
msg.payload = Unpooled.wrappedBuffer(payload);
msg.properties = null;
msg.schema = Schema.BYTES;
return msg;
}

// Constructor for incoming message
MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload, ClientCnx cnx,
Schema<T> schema) {
this(messageId, msgMetadata, payload, null, cnx, schema);
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
ByteBuf payload, ClientCnx cnx, Schema<T> schema) {
this(topic, messageId, msgMetadata, payload, null, cnx, schema);
}

MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = messageId;
this.topic = topic;
this.cnx = cnx;

// Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
Expand All @@ -110,11 +101,12 @@ static MessageImpl<byte[]> create(MessageMetadata.Builder msgMetadataBuilder, By
this.schema = schema;
}

MessageImpl(BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = batchMessageIdImpl;
this.topic = topic;
this.cnx = cnx;

this.payload = Unpooled.copiedBuffer(payload);
Expand Down Expand Up @@ -142,11 +134,8 @@ static MessageImpl<byte[]> create(MessageMetadata.Builder msgMetadataBuilder, By
this.schema = schema;
}

public MessageImpl(String msgId, Map<String, String> properties, byte[] payload, Schema<T> schema) {
this(msgId, properties, Unpooled.wrappedBuffer(payload), schema);
}

public MessageImpl(String msgId, Map<String, String> properties, ByteBuf payload, Schema<T> schema) {
public MessageImpl(String topic, String msgId, Map<String, String> properties,
ByteBuf payload, Schema<T> schema) {
String[] data = msgId.split(":");
long ledgerId = Long.parseLong(data[0]);
long entryId = Long.parseLong(data[1]);
Expand All @@ -155,6 +144,7 @@ public MessageImpl(String msgId, Map<String, String> properties, ByteBuf payload
} else {
this.messageId = new MessageIdImpl(ledgerId, entryId, -1);
}
this.topic = topic;
this.cnx = null;
this.payload = payload;
this.properties = Collections.unmodifiableMap(properties);
Expand All @@ -170,6 +160,7 @@ public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws
msgMetadata.recycle();
msg.payload = headersAndPayload;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
return msg;
Expand Down Expand Up @@ -287,6 +278,11 @@ public boolean hasKey() {
return msgMetadataBuilder.hasPartitionKey();
}

@Override
public String getTopicName() {
return topic;
}

@Override
public String getKey() {
checkNotNull(msgMetadataBuilder);
Expand Down Expand Up @@ -316,6 +312,7 @@ public ClientCnx getCnx() {
public void recycle() {
msgMetadataBuilder = null;
messageId = null;
topic = null;
payload = null;
properties = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ public static void parseMessage(TopicName topicName, long ledgerId, long entryId
final int numMessages = msgMetadata.getNumMessagesInBatch();

if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
final MessageImpl<?> message = new MessageImpl<>(msgId, msgMetadata, uncompressedPayload, null, null);
final MessageImpl<?> message = new MessageImpl<>(topicName.toString(),
msgId, msgMetadata, uncompressedPayload,
null, null);
processor.process(msgId, message, uncompressedPayload);
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, messageId, null, -1, processor);
receiveIndividualMessagesFromBatch(topicName.toString(), msgMetadata, uncompressedPayload, messageId, null, -1, processor);
}
} finally {
if (uncompressedPayload != null) {
Expand Down Expand Up @@ -149,8 +151,9 @@ public static ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, Message
}
}

public static void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx, int partitionIndex, MessageProcessor processor) {
public static void receiveIndividualMessagesFromBatch(String topic, MessageMetadata msgMetadata,
ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx,
int partitionIndex, MessageProcessor processor) {
int batchSize = msgMetadata.getNumMessagesInBatch();

try {
Expand All @@ -170,7 +173,8 @@ public static void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadat

BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex, i, null);
final MessageImpl<?> message = new MessageImpl<>(batchMessageIdImpl, msgMetadata,
final MessageImpl<?> message = new MessageImpl<>(
topic, batchMessageIdImpl, msgMetadata,
singleMessageMetadataBuilder.build(), singleMessagePayload, Optional.empty(), cnx, null);

processor.process(batchMessageIdImpl, message, singleMessagePayload);
Expand Down
Loading

0 comments on commit f8256e6

Please sign in to comment.