diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java index c4adb9712a091..ebdfb52d0aac4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; @@ -45,6 +46,7 @@ import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; @@ -87,7 +89,7 @@ protected void cleanup() throws Exception { @Test(timeOut = 30000) public void testReadMessageWithoutBatching() throws Exception { - String topic = "persistent://my-property/my-ns/my-reader-topic"; + String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); testReadMessages(topic, false); } @@ -114,7 +116,7 @@ public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exceptio @Test(timeOut = 10000) public void testReadMessageWithBatching() throws Exception { - String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching"; + String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); testReadMessages(topic, true); } @@ -212,7 +214,8 @@ public void testReaderWithTimeLong() throws Exception { @Test(timeOut = 10000) public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException { - final String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor"; + final String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor" + + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); @Cleanup Reader reader1 = pulsarClient.newReader() @@ -249,11 +252,91 @@ public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException @Test(timeOut = 10000) public void testMultiReaderSeek() throws Exception { - String topic = "persistent://my-property/my-ns/testKeyHashRangeReader"; + String topic = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); publishMessages(topic,100,false); } + @Test + public void testMultiTopicSeekByFunction() throws Exception { + final String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID(); + int partitionNum = 4; + int msgNum = 20; + admin.topics().createPartitionedTopic(topicName, partitionNum); + publishMessages(topicName, msgNum, false); + Reader reader = pulsarClient + .newReader().startMessageIdInclusive().startMessageId(MessageId.latest) + .topic(topicName).subscriptionName("my-sub").create(); + long now = System.currentTimeMillis(); + reader.seek((topic) -> now); + assertNull(reader.readNext(1, TimeUnit.SECONDS)); + + reader.seek((topic) -> { + TopicName name = TopicName.get(topic); + switch (name.getPartitionIndex()) { + case 0: + return MessageId.latest; + case 1: + return MessageId.earliest; + case 2: + return now; + case 3: + return now - 999999; + default: + return null; + } + }); + int count = 0; + while (true) { + Message message = reader.readNext(1, TimeUnit.SECONDS); + if (message == null) { + break; + } + count++; + } + int msgNumInPartition0 = 0; + int msgNumInPartition1 = msgNum / partitionNum; + int msgNumInPartition2 = 0; + int msgNumInPartition3 = msgNum / partitionNum; + + assertEquals(count, msgNumInPartition0 + msgNumInPartition1 + msgNumInPartition2 + msgNumInPartition3); + } + + @Test + public void testMultiTopicSeekByFunctionWithException() throws Exception { + final String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID(); + int partitionNum = 4; + int msgNum = 20; + admin.topics().createPartitionedTopic(topicName, partitionNum); + publishMessages(topicName, msgNum, false); + Reader reader = pulsarClient + .newReader().startMessageIdInclusive().startMessageId(MessageId.latest) + .topic(topicName).subscriptionName("my-sub").create(); + long now = System.currentTimeMillis(); + reader.seek((topic) -> now); + assertNull(reader.readNext(1, TimeUnit.SECONDS)); + try { + reader.seek((topic) -> { + TopicName name = TopicName.get(topic); + switch (name.getPartitionIndex()) { + case 0: + throw new RuntimeException("test"); + case 1: + return MessageId.latest; + case 2: + return MessageId.earliest; + case 3: + return now - 999999; + default: + return null; + } + }); + } catch (Exception e) { + assertEquals(e.getMessage(), "test"); + assertTrue(e instanceof RuntimeException); + } + } + @Test(timeOut = 20000) public void testMultiTopic() throws Exception { final String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID(); @@ -297,7 +380,7 @@ public void testMultiTopic() throws Exception { @Test(timeOut = 10000) public void testKeyHashRangeReader() throws Exception { final List keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); - final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader"; + final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index d771d7a11fe6b..6d27b5b98c08a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; @@ -30,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -47,6 +49,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -171,6 +174,46 @@ private void testReadMessages(String topic, boolean enableBatch) throws Exceptio Assert.assertFalse(readLatest.hasMessageAvailable()); } + @Test + public void testMultiTopicSeekByFunction() throws Exception { + final String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID(); + int msgNum = 10; + publishMessages(topicName, msgNum, false); + Reader reader = pulsarClient + .newReader().startMessageIdInclusive().startMessageId(MessageId.latest) + .topic(topicName).subscriptionName("my-sub").create(); + long now = System.currentTimeMillis(); + reader.seek((topic) -> now); + assertNull(reader.readNext(1, TimeUnit.SECONDS)); + // seek by time + reader.seek((topic) -> { + assertFalse(TopicName.get(topic).isPartitioned()); + return now - 999999; + }); + int count = 0; + while (true) { + Message message = reader.readNext(1, TimeUnit.SECONDS); + if (message == null) { + break; + } + count++; + } + assertEquals(count, msgNum); + // seek by msg id + reader.seek((topic) -> { + assertFalse(TopicName.get(topic).isPartitioned()); + return MessageId.earliest; + }); + count = 0; + while (true) { + Message message = reader.readNext(1, TimeUnit.SECONDS); + if (message == null) { + break; + } + count++; + } + assertEquals(count, msgNum); + } @Test public void testReadFromPartition() throws Exception { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 1af1be8a05388..95334e4ee5b5b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -600,26 +600,31 @@ CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, void seek(long timestamp) throws PulsarClientException; /** - * Reset the subscription associated with this consumer to a specific message id. + * Reset the subscription associated with this consumer to a specific message ID or message publish time. *

- * The Function input is topic+partition. + * The Function input is topic+partition. It returns only timestamp or MessageId. *

* The return value is the seek position/timestamp of the current partition. + * Exception is thrown if other object types are returned. *

* If returns null, the current partition will not do any processing. + * Exception in a partition may affect other partitions. * @param function * @throws PulsarClientException */ void seek(Function function) throws PulsarClientException; /** - * Reset the subscription associated with this consumer to a specific message id asynchronously. + * Reset the subscription associated with this consumer to a specific message ID + * or message publish time asynchronously. *

- * The Function input is topic+partition. + * The Function input is topic+partition. It returns only timestamp or MessageId. *

* The return value is the seek position/timestamp of the current partition. + * Exception is thrown if other object types are returned. *

* If returns null, the current partition will not do any processing. + * Exception in a partition may affect other partitions. * @param function * @return */ diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java index 92e6541036958..6f66cb9e85fb4 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java @@ -21,6 +21,8 @@ import java.io.Closeable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; + import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -155,6 +157,37 @@ public interface Reader extends Closeable { */ void seek(long timestamp) throws PulsarClientException; + /** + * Reset the subscription associated with this consumer to a specific message ID or message publish time. + *

+ * The Function input is topic+partition. It returns only timestamp or MessageId. + *

+ * The return value is the seek position/timestamp of the current partition. + * Exception is thrown if other object types are returned. + *

+ * If returns null, the current partition will not do any processing. + * Exception in a partition may affect other partitions. + * @param function + * @throws PulsarClientException + */ + void seek(Function function) throws PulsarClientException; + + /** + * Reset the subscription associated with this consumer to a specific message ID + * or message publish time asynchronously. + *

+ * The Function input is topic+partition. It returns only timestamp or MessageId. + *

+ * The return value is the seek position/timestamp of the current partition. + * Exception is thrown if other object types are returned. + *

+ * If returns null, the current partition will not do any processing. + * Exception in a partition may affect other partitions. + * @param function + * @return + */ + CompletableFuture seekAsync(Function function); + /** * Reset the subscription associated with this reader to a specific message id. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index b9593bbde3e05..d25d1ba067f93 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; @@ -163,6 +164,11 @@ public void seek(long timestamp) throws PulsarClientException { multiTopicsConsumer.seek(timestamp); } + @Override + public void seek(Function function) throws PulsarClientException { + multiTopicsConsumer.seek(function); + } + @Override public CompletableFuture seekAsync(MessageId messageId) { return multiTopicsConsumer.seekAsync(messageId); @@ -173,6 +179,11 @@ public CompletableFuture seekAsync(long timestamp) { return multiTopicsConsumer.seekAsync(timestamp); } + @Override + public CompletableFuture seekAsync(Function function) { + return multiTopicsConsumer.seekAsync(function); + } + @Override public void close() throws IOException { multiTopicsConsumer.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 794e4ae4bd1e7..884fe696a1002 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -22,6 +22,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; @@ -187,6 +188,16 @@ public void seek(long timestamp) throws PulsarClientException { consumer.seek(timestamp); } + @Override + public void seek(Function function) throws PulsarClientException { + consumer.seek(function); + } + + @Override + public CompletableFuture seekAsync(Function function) { + return consumer.seekAsync(function); + } + @Override public CompletableFuture seekAsync(MessageId messageId) { return consumer.seekAsync(messageId);