Skip to content

Commit

Permalink
Implement seekToBeginning()/end operations in Kafka consumer wrapper (
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Oct 24, 2017
1 parent a5b5659 commit c85a779
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -59,7 +60,8 @@
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -179,7 +181,14 @@ public Set<String> subscription() {

@Override
public void subscribe(Collection<String> topics) {
subscribe(topics, null);
}

@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer>> futures = new ArrayList<>();

List<TopicPartition> topicPartitions = new ArrayList<>();
try {
for (String topic : topics) {
// Create individual subscription on each partition, that way we can keep using the
Expand All @@ -197,25 +206,32 @@ public void subscribe(Collection<String> topics) {
CompletableFuture<org.apache.pulsar.client.api.Consumer> future = client
.subscribeAsync(partitionName, groupId, conf);
int partitionIndex = i;
future.thenAccept(
consumer -> consumers.putIfAbsent(new TopicPartition(topic, partitionIndex), consumer));
TopicPartition tp = new TopicPartition(topic, partitionIndex);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
topicPartitions.add(tp);
}

} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer> future = client.subscribeAsync(topic,
groupId, conf);
future.thenAccept(consumer -> consumers.putIfAbsent(new TopicPartition(topic, 0), consumer));
TopicPartition tp = new TopicPartition(topic, 0);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
topicPartitions.add(tp);
}
}

// Wait for all consumers to be ready
futures.forEach(CompletableFuture::join);

// Notify the listener is now owning all topics/partitions
if (callback != null) {
callback.onPartitionsAssigned(topicPartitions);
}

} catch (Exception e) {
// Close all consumer that might have been sucessfully created
// Close all consumer that might have been successfully created
futures.forEach(f -> {
try {
f.get().close();
Expand All @@ -228,11 +244,6 @@ public void subscribe(Collection<String> topics) {
}
}

@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
throw new UnsupportedOperationException("ConsumerRebalanceListener is not supported");
}

@Override
public void assign(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException("Cannot manually assign partitions");
Expand Down Expand Up @@ -383,17 +394,59 @@ private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {

@Override
public void seek(TopicPartition partition, long offset) {
throw new UnsupportedOperationException();
MessageId msgId = MessageIdUtils.getMessageId(offset);
org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}

try {
c.seek(msgId);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}

@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
List<CompletableFuture<Void>> futures = new ArrayList<>();

if (partitions.isEmpty()) {
partitions = consumers.keySet();
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.earliest));
}
}

FutureUtil.waitForAll(futures).join();
}

@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
List<CompletableFuture<Void>> futures = new ArrayList<>();

if (partitions.isEmpty()) {
partitions = consumers.keySet();
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.latest));
}
}

FutureUtil.waitForAll(futures).join();
}

@Override
Expand Down Expand Up @@ -472,4 +525,6 @@ public void close(long timeout, TimeUnit unit) {
public void wakeup() {
throw new UnsupportedOperationException();
}

private static final Logger log = LoggerFactory.getLogger(PulsarKafkaConsumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -232,4 +233,109 @@ public void testPartitions() throws Exception {

consumers.forEach(Consumer::close);
}

@Test
public void testConsumerSeek() throws Exception {
String topic = "persistent://sample/standalone/ns/testSimpleConsumer";

Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl.toString());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

Producer pulsarProducer = pulsarClient.createProducer(topic);

for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
.build();
pulsarProducer.send(msg);
}

for (int i = 0; i < 10; i++) {
ConsumerRecords<String, String> records = consumer.poll(100);
assertEquals(records.count(), 1);
int idx = i;
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(idx));
assertEquals(record.value(), "hello-" + idx);
});

consumer.commitSync();
}

consumer.seekToBeginning(Collections.emptyList());

Thread.sleep(500);

// Messages should be available again
for (int i = 0; i < 10; i++) {
ConsumerRecords<String, String> records = consumer.poll(100);
assertEquals(records.count(), 1);
int idx = i;
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(idx));
assertEquals(record.value(), "hello-" + idx);
});

consumer.commitSync();
}

consumer.close();
}

@Test
public void testConsumerSeekToEnd() throws Exception {
String topic = "persistent://sample/standalone/ns/testSimpleConsumer";

Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl.toString());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

Producer pulsarProducer = pulsarClient.createProducer(topic);

for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder.create().setKey(Integer.toString(i)).setContent(("hello-" + i).getBytes())
.build();
pulsarProducer.send(msg);
}

for (int i = 0; i < 10; i++) {
ConsumerRecords<String, String> records = consumer.poll(100);
assertEquals(records.count(), 1);
int idx = i;
records.forEach(record -> {
assertEquals(record.key(), Integer.toString(idx));
assertEquals(record.value(), "hello-" + idx);
});

consumer.commitSync();
}

consumer.seekToEnd(Collections.emptyList());
Thread.sleep(500);

consumer.close();

// Recreate the consumer
consumer = new PulsarKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

ConsumerRecords<String, String> records = consumer.poll(100);
// Since we are at the end of the topic, there should be no messages
assertEquals(records.count(), 0);

consumer.close();
}

}

0 comments on commit c85a779

Please sign in to comment.