Skip to content

Commit

Permalink
Add support for partitioned topic consumer seek by time. (apache#5435)
Browse files Browse the repository at this point in the history
### Motivation

Add support for partitioned topic consumer seek by time.

### Modifications

Call each partition consumer seekAsync() while call partitioned consumer seekAsync()

### Verifying this change

Update unit tests for consumer.seek().
  • Loading branch information
codelipenghui authored and sijie committed Oct 24, 2019
1 parent 997a128 commit a95bea6
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -145,18 +146,58 @@ public void testSeekTime() throws Exception {
@Test
public void testSeekTimeOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekTimePartitions";
long timestamp = 1550479732;

admin.topics().createPartitionedTopic(topicName, 2);
final String resetTimeStr = "100s";
final int partitions = 2;
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
admin.topics().createPartitionedTopic(topicName, partitions);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").subscribe();

try {
consumer.seek(timestamp);
fail("Should not have succeeded");
} catch (PulsarClientException e) {
// Expected
List<PersistentSubscription> subs = new ArrayList<>();

for (int i = 0; i < partitions; i++) {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topicName + TopicName.PARTITIONED_TOPIC_SUFFIX + i).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
assertEquals(topicRef.getSubscriptions().size(), 1);
PersistentSubscription sub = topicRef.getSubscription("my-subscription");
assertNotNull(sub);
subs.add(sub);
}

for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

long backlogs = 0;
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog();
}

assertEquals(backlogs, 10);

backlogs = 0;
long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog();
}
assertEquals(backlogs, 2);

// Wait for consumer to reconnect
Thread.sleep(1000);
consumer.seek(currentTimestamp - resetTimeInMillis);
backlogs = 0;

for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog();
}
assertEquals(backlogs, 10);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ public interface Consumer<T> extends Closeable {
/**
* Reset the subscription associated with this consumer to a specific message publish time.
*
* <p>Note: this operation can only be done on non-partitioned topics. For these, one can rather perform
* the seek() on the individual partitions.
*
* @param timestamp
* the message publish time where to reposition the subscription
*/
Expand All @@ -350,9 +347,6 @@ public interface Consumer<T> extends Closeable {
/**
* Reset the subscription associated with this consumer to a specific message publish time.
*
* <p>Note: this operation can only be done on non-partitioned topics. For these, one can rather
* perform the seek() on the individual partitions.
*
* @param timestamp
* the message publish time where to reposition the subscription
* @return a future to track the completion of the seek operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,9 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
return FutureUtil.waitForAll(futures);
}

@Override
Expand Down

0 comments on commit a95bea6

Please sign in to comment.