Skip to content

Commit

Permalink
Reader support seek from separate messageId/time (apache#10348)
Browse files Browse the repository at this point in the history
Fixes apache#9301

### Motivation
Currently in ReaderConfigurationData the API allow to ‘setStartMessageId’ only from single message ID and this apply to all consumers in the `MultiTopicsReaderImpl`.

Is it possible to add start message per partition / topic.

### Modifications
Reader can seek by function

### Verifying this change
1 For partitioned topic, we can seek by time/messageId for every partition
2 For non-partitioned topic, we can seek by function
  • Loading branch information
315157973 authored May 15, 2021
1 parent 96325b9 commit 7bd849b
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
Expand All @@ -45,6 +46,7 @@
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -87,7 +89,7 @@ protected void cleanup() throws Exception {

@Test(timeOut = 30000)
public void testReadMessageWithoutBatching() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic";
String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
testReadMessages(topic, false);
}
Expand All @@ -114,7 +116,7 @@ public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exceptio

@Test(timeOut = 10000)
public void testReadMessageWithBatching() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching";
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
testReadMessages(topic, true);
}
Expand Down Expand Up @@ -212,7 +214,8 @@ public void testReaderWithTimeLong() throws Exception {
@Test(timeOut = 10000)
public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException {

final String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor";
final String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor"
+ UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
@Cleanup
Reader<byte[]> reader1 = pulsarClient.newReader()
Expand Down Expand Up @@ -249,11 +252,91 @@ public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException

@Test(timeOut = 10000)
public void testMultiReaderSeek() throws Exception {
String topic = "persistent://my-property/my-ns/testKeyHashRangeReader";
String topic = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
publishMessages(topic,100,false);
}

@Test
public void testMultiTopicSeekByFunction() throws Exception {
final String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID();
int partitionNum = 4;
int msgNum = 20;
admin.topics().createPartitionedTopic(topicName, partitionNum);
publishMessages(topicName, msgNum, false);
Reader<byte[]> reader = pulsarClient
.newReader().startMessageIdInclusive().startMessageId(MessageId.latest)
.topic(topicName).subscriptionName("my-sub").create();
long now = System.currentTimeMillis();
reader.seek((topic) -> now);
assertNull(reader.readNext(1, TimeUnit.SECONDS));

reader.seek((topic) -> {
TopicName name = TopicName.get(topic);
switch (name.getPartitionIndex()) {
case 0:
return MessageId.latest;
case 1:
return MessageId.earliest;
case 2:
return now;
case 3:
return now - 999999;
default:
return null;
}
});
int count = 0;
while (true) {
Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
int msgNumInPartition0 = 0;
int msgNumInPartition1 = msgNum / partitionNum;
int msgNumInPartition2 = 0;
int msgNumInPartition3 = msgNum / partitionNum;

assertEquals(count, msgNumInPartition0 + msgNumInPartition1 + msgNumInPartition2 + msgNumInPartition3);
}

@Test
public void testMultiTopicSeekByFunctionWithException() throws Exception {
final String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID();
int partitionNum = 4;
int msgNum = 20;
admin.topics().createPartitionedTopic(topicName, partitionNum);
publishMessages(topicName, msgNum, false);
Reader<byte[]> reader = pulsarClient
.newReader().startMessageIdInclusive().startMessageId(MessageId.latest)
.topic(topicName).subscriptionName("my-sub").create();
long now = System.currentTimeMillis();
reader.seek((topic) -> now);
assertNull(reader.readNext(1, TimeUnit.SECONDS));
try {
reader.seek((topic) -> {
TopicName name = TopicName.get(topic);
switch (name.getPartitionIndex()) {
case 0:
throw new RuntimeException("test");
case 1:
return MessageId.latest;
case 2:
return MessageId.earliest;
case 3:
return now - 999999;
default:
return null;
}
});
} catch (Exception e) {
assertEquals(e.getMessage(), "test");
assertTrue(e instanceof RuntimeException);
}
}

@Test(timeOut = 20000)
public void testMultiTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
Expand Down Expand Up @@ -297,7 +380,7 @@ public void testMultiTopic() throws Exception {
@Test(timeOut = 10000)
public void testKeyHashRangeReader() throws Exception {
final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader";
final String topic = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
Expand All @@ -30,6 +31,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
Expand All @@ -47,6 +49,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -171,6 +174,46 @@ private void testReadMessages(String topic, boolean enableBatch) throws Exceptio
Assert.assertFalse(readLatest.hasMessageAvailable());
}

@Test
public void testMultiTopicSeekByFunction() throws Exception {
final String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID();
int msgNum = 10;
publishMessages(topicName, msgNum, false);
Reader<byte[]> reader = pulsarClient
.newReader().startMessageIdInclusive().startMessageId(MessageId.latest)
.topic(topicName).subscriptionName("my-sub").create();
long now = System.currentTimeMillis();
reader.seek((topic) -> now);
assertNull(reader.readNext(1, TimeUnit.SECONDS));
// seek by time
reader.seek((topic) -> {
assertFalse(TopicName.get(topic).isPartitioned());
return now - 999999;
});
int count = 0;
while (true) {
Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
assertEquals(count, msgNum);
// seek by msg id
reader.seek((topic) -> {
assertFalse(TopicName.get(topic).isPartitioned());
return MessageId.earliest;
});
count = 0;
while (true) {
Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
assertEquals(count, msgNum);
}

@Test
public void testReadFromPartition() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,26 +600,31 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
void seek(long timestamp) throws PulsarClientException;

/**
* Reset the subscription associated with this consumer to a specific message id.
* Reset the subscription associated with this consumer to a specific message ID or message publish time.
* <p>
* The Function input is topic+partition.
* The Function input is topic+partition. It returns only timestamp or MessageId.
* <p>
* The return value is the seek position/timestamp of the current partition.
* Exception is thrown if other object types are returned.
* <p>
* If returns null, the current partition will not do any processing.
* Exception in a partition may affect other partitions.
* @param function
* @throws PulsarClientException
*/
void seek(Function<String, Object> function) throws PulsarClientException;

/**
* Reset the subscription associated with this consumer to a specific message id asynchronously.
* Reset the subscription associated with this consumer to a specific message ID
* or message publish time asynchronously.
* <p>
* The Function input is topic+partition.
* The Function input is topic+partition. It returns only timestamp or MessageId.
* <p>
* The return value is the seek position/timestamp of the current partition.
* Exception is thrown if other object types are returned.
* <p>
* If returns null, the current partition will not do any processing.
* Exception in a partition may affect other partitions.
* @param function
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand Down Expand Up @@ -155,6 +157,37 @@ public interface Reader<T> extends Closeable {
*/
void seek(long timestamp) throws PulsarClientException;

/**
* Reset the subscription associated with this consumer to a specific message ID or message publish time.
* <p>
* The Function input is topic+partition. It returns only timestamp or MessageId.
* <p>
* The return value is the seek position/timestamp of the current partition.
* Exception is thrown if other object types are returned.
* <p>
* If returns null, the current partition will not do any processing.
* Exception in a partition may affect other partitions.
* @param function
* @throws PulsarClientException
*/
void seek(Function<String, Object> function) throws PulsarClientException;

/**
* Reset the subscription associated with this consumer to a specific message ID
* or message publish time asynchronously.
* <p>
* The Function input is topic+partition. It returns only timestamp or MessageId.
* <p>
* The return value is the seek position/timestamp of the current partition.
* Exception is thrown if other object types are returned.
* <p>
* If returns null, the current partition will not do any processing.
* Exception in a partition may affect other partitions.
* @param function
* @return
*/
CompletableFuture<Void> seekAsync(Function<String, Object> function);

/**
* Reset the subscription associated with this reader to a specific message id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -163,6 +164,11 @@ public void seek(long timestamp) throws PulsarClientException {
multiTopicsConsumer.seek(timestamp);
}

@Override
public void seek(Function<String, Object> function) throws PulsarClientException {
multiTopicsConsumer.seek(function);
}

@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
return multiTopicsConsumer.seekAsync(messageId);
Expand All @@ -173,6 +179,11 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
return multiTopicsConsumer.seekAsync(timestamp);
}

@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
return multiTopicsConsumer.seekAsync(function);
}

@Override
public void close() throws IOException {
multiTopicsConsumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -187,6 +188,16 @@ public void seek(long timestamp) throws PulsarClientException {
consumer.seek(timestamp);
}

@Override
public void seek(Function<String, Object> function) throws PulsarClientException {
consumer.seek(function);
}

@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
return consumer.seekAsync(function);
}

@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
return consumer.seekAsync(messageId);
Expand Down

0 comments on commit 7bd849b

Please sign in to comment.