Skip to content

Commit

Permalink
[improve][client] PIP-224 Part 1: Add TopicMessageId for seek and ack…
Browse files Browse the repository at this point in the history
…nowledge (apache#19158)
  • Loading branch information
BewareMyPower authored Jan 31, 2023
1 parent 1cd1aef commit 17c58a5
Show file tree
Hide file tree
Showing 22 changed files with 373 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -337,7 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer1.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}

Expand All @@ -354,7 +353,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer2.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
if (message == null) {
break;
}
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
received.add(topicMessageId.getInnerMessageId());
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,54 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class MultiTopicsConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerTest.class);
private ScheduledExecutorService internalExecutorServiceDelegate;

@BeforeMethod(alwaysRun = true)
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -231,4 +242,113 @@ public void testBatchReceiveAckTimeout()
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}

@Test(timeOut = 30000)
public void testAcknowledgeWrongMessageId() throws Exception {
final var topic1 = newTopicName();
final var topic2 = newTopicName();

@Cleanup final var singleTopicConsumer = pulsarClient.newConsumer()
.topic(topic1)
.subscriptionName("sub-1")
.isAckReceiptEnabled(true)
.subscribe();
assertTrue(singleTopicConsumer instanceof ConsumerImpl);

@Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer()
.topics(List.of(topic1, topic2))
.subscriptionName("sub-2")
.isAckReceiptEnabled(true)
.subscribe();
assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl);

@Cleanup final var producer = pulsarClient.newProducer().topic(topic1).create();
final var nonTopicMessageIds = new ArrayList<MessageId>();
nonTopicMessageIds.add(producer.send(new byte[]{ 0x00 }));
nonTopicMessageIds.add(singleTopicConsumer.receive().getMessageId());

// Multi-topics consumers can only acknowledge TopicMessageId, otherwise NotAllowedException will be thrown
for (var msgId : nonTopicMessageIds) {
assertFalse(msgId instanceof TopicMessageId);
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
() -> multiTopicsConsumer.acknowledge(msgId));
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
() -> multiTopicsConsumer.acknowledge(Collections.singletonList(msgId)));
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
() -> multiTopicsConsumer.acknowledgeCumulative(msgId));
}

// Single-topic consumer can acknowledge TopicMessageId
final var topicMessageId = multiTopicsConsumer.receive().getMessageId();
assertTrue(topicMessageId instanceof TopicMessageId);
assertFalse(topicMessageId instanceof MessageIdImpl);
singleTopicConsumer.acknowledge(topicMessageId);
}

@DataProvider
public static Object[][] messageIdFromProducer() {
return new Object[][] { { true }, { false } };
}

@Test(timeOut = 30000, dataProvider = "messageIdFromProducer")
public void testSeekCustomTopicMessageId(boolean messageIdFromProducer) throws Exception {
final var topic = TopicName.get(newTopicName()).toString();
final var numPartitions = 3;
admin.topics().createPartitionedTopic(topic, numPartitions);

@Cleanup final var producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.messageRouter(new MessageRouter() {
int index = 0;
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return index++ % metadata.numPartitions();
}
})
.create();
@Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic)
.subscriptionName("sub").subscribe();
assertTrue(consumer instanceof MultiTopicsConsumerImpl);

final var msgIds = new HashMap<String, List<MessageId>>();
final var numMessagesPerPartition = 10;
final var numMessages = numPartitions * numMessagesPerPartition;
for (int i = 0; i < numMessages; i++) {
var msgId = (MessageIdImpl) producer.send(i);
if (messageIdFromProducer) {
msgIds.computeIfAbsent(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + msgId.getPartitionIndex(),
__ -> new ArrayList<>()).add(msgId);
} else {
var topicMessageId = (TopicMessageId) consumer.receive().getMessageId();
msgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> new ArrayList<>()).add(topicMessageId);
}
}

final var partitions = IntStream.range(0, numPartitions)
.mapToObj(i -> topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)
.collect(Collectors.toSet());
assertEquals(msgIds.keySet(), partitions);

for (var partition : partitions) {
final var msgIdList = msgIds.get(partition);
assertEquals(msgIdList.size(), numMessagesPerPartition);
if (messageIdFromProducer) {
consumer.seek(TopicMessageId.create(partition, msgIdList.get(numMessagesPerPartition / 2)));
} else {
consumer.seek(msgIdList.get(numMessagesPerPartition / 2));
}
}

var topicMsgIds = new HashMap<String, List<TopicMessageId>>();
for (int i = 0; i < ((numMessagesPerPartition / 2 - 1) * numPartitions); i++) {
TopicMessageId topicMessageId = (TopicMessageId) consumer.receive().getMessageId();
topicMsgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> new ArrayList<>()).add(topicMessageId);
}
assertEquals(topicMsgIds.keySet(), partitions);
for (var partition : partitions) {
assertEquals(topicMsgIds.get(partition),
msgIds.get(partition).subList(numMessagesPerPartition / 2 + 1, numMessagesPerPartition));
}
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {

for (int i = 0; i < totalMessages; i ++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(), 2);
consumer1.acknowledge(msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
}
assertTrue(messageIds.remove(messageId), "Failed to receive message");
}
Expand Down Expand Up @@ -166,9 +166,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls

for (int i = 0; i < numberOfMessages; i++) {
MessageId messageId = consumer.receive().getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
}
log.info("Remaining message IDs = {}", messageIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
Expand Down Expand Up @@ -292,7 +293,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
.subscribe();

MessageId messageId = new MessageIdImpl(3, 1, 0);
TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", "topic-1", messageId);
TopicMessageId topicMessageId = TopicMessageId.create("topic-1", messageId);
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,9 @@ CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message,
* <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
* </ul>
*
* <p>Note: For multi-topics consumer, you can only seek to the earliest or latest message.
* <p>Note: For multi-topics consumer, if `messageId` is a {@link TopicMessageId}, the seek operation will happen
* on the owner topic of the message, which is returned by {@link TopicMessageId#getOwnerTopic()}. Otherwise, you
* can only seek to the earliest or latest message for all topics subscribed.
*
* @param messageId
* the message id where to reposition the subscription
Expand Down Expand Up @@ -519,19 +521,7 @@ CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message,
CompletableFuture<Void> seekAsync(Function<String, Object> function);

/**
* Reset the subscription associated with this consumer to a specific message id.
*
* <p>The message id can either be a specific message or represent the first or last messages in the topic.
* <ul>
* <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
* <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
* </ul>
*
* <p>Note: For multi-topics consumer, you can only seek to the earliest or latest message.
*
* @param messageId
* the message id where to reposition the subscription
* @return a future to track the completion of the seek operation
* The asynchronous version of {@link Consumer#seek(MessageId)}.
*/
CompletableFuture<Void> seekAsync(MessageId messageId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface MessageAcknowledger {
*
* @throws PulsarClientException.AlreadyClosedException}
* if the consumer was already closed
* @throws PulsarClientException.NotAllowedException
* if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed
*/
void acknowledge(MessageId messageId) throws PulsarClientException;

Expand All @@ -59,6 +61,8 @@ default void acknowledge(Message<?> message) throws PulsarClientException {
/**
* Acknowledge the consumption of a list of message.
* @param messageIdList the list of message IDs.
* @throws PulsarClientException.NotAllowedException
* if any message id in the list is not a {@link TopicMessageId} when multiple topics are subscribed
*/
void acknowledge(List<MessageId> messageIdList) throws PulsarClientException;

Expand All @@ -82,6 +86,8 @@ default void acknowledge(Messages<?> messages) throws PulsarClientException {
* The {@code MessageId} to be cumulatively acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws PulsarClientException.NotAllowedException
* if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed
*/
void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;

Expand Down
Loading

0 comments on commit 17c58a5

Please sign in to comment.