Skip to content

Commit

Permalink
Fixed integration tests (apache#22)
Browse files Browse the repository at this point in the history
### Motivation

KafkaApiTest is failing

### Modifications

1. conversion between Pulsar topic name and Kafka TopicPartition ended up with TopicPartition using name with "-partition-<partition idx>"

2. Seek was not working correctly:

PulsarKafkaConsumer seeks to beginning, as asked.
Clears lastReceivedOffset in the process.

on poll it checks
```
            if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                	resetOffsets(tp);
            } 
```
seek didn't update unpolledPartitions - reset offset uses default strategy to reset => seeks to the end

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

This change is already covered by existing tests, such as KafkaApiTest.

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

No

### Documentation

  - Does this pull request introduce a new feature? NO
  • Loading branch information
dlg99 authored Jun 18, 2021
1 parent 74c0dd8 commit 9801c43
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.utils.FunctionCommon;

import java.time.Duration;
import java.util.AbstractMap;
Expand All @@ -59,7 +58,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -277,9 +275,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
TopicName.get(topic).getLocalName(),
partitionIndex);
TopicPartition tp = normalizedTopicPartition(topic, partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
Expand All @@ -291,9 +287,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = new TopicPartition(
TopicName.get(topic).getLocalName(),
0);
TopicPartition tp = normalizedTopicPartition(topic, 0);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
Expand Down Expand Up @@ -327,6 +321,15 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
}
}

private TopicPartition normalizedTopicPartition(TopicPartition tp) {
return normalizedTopicPartition(tp.topic(), tp.partition());
}

private TopicPartition normalizedTopicPartition(String topic, int partition) {
String name = TopicName.get(topic).getPartitionedTopicName();
return new TopicPartition(name, partition);
}

@Override
public void assign(Collection<TopicPartition> partitions) {
Set<String> topics = partitions.stream().map(p -> p.topic()).collect(Collectors.toSet());
Expand Down Expand Up @@ -372,7 +375,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {

while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getLocalName();
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
Expand Down Expand Up @@ -504,12 +507,15 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo
}

private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
Preconditions.checkNotNull(offsets);
List<CompletableFuture<Void>> futures = new ArrayList<>();

applyConsumerInterceptorsOnCommit(interceptors, offsets);
offsets.forEach((topicPartition, offsetAndMetadata) -> {
offsets.forEach((tp, offsetAndMetadata) -> {
TopicPartition topicPartition = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);

lastCommittedOffset.put(tp, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
});

Expand Down Expand Up @@ -566,7 +572,8 @@ private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> i
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
TopicPartition topicPartition = normalizedTopicPartition(partition);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}
Expand Down Expand Up @@ -594,12 +601,14 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
lastReceivedOffset.clear();

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
TopicPartition normalizedTp = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.earliest));
unpolledPartitions.add(tp);
}
}

Expand All @@ -617,12 +626,15 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
lastReceivedOffset.clear();

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
TopicPartition normalizedTp = normalizedTopicPartition(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);

if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
futures.add(c.seekAsync(MessageId.latest));
unpolledPartitions.add(tp);
}
}

Expand Down Expand Up @@ -712,7 +724,8 @@ public Set<TopicPartition> paused() {
@Override
public void pause(Collection<TopicPartition> partitions) {
partitions.forEach(p -> {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
TopicPartition topicPartition = normalizedTopicPartition(p);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c != null) {
c.pause();
}
Expand All @@ -722,7 +735,8 @@ public void pause(Collection<TopicPartition> partitions) {
@Override
public void resume(Collection<TopicPartition> partitions) {
partitions.forEach(p -> {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
TopicPartition topicPartition = normalizedTopicPartition(p);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
if (c != null) {
c.resume();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void testPulsarKafkaProducerWithSerializer(int partitions) throws Excepti
producer.send(message);
}
producer.close();
Thread.sleep(500);

// (2) Consume using simple consumer
PulsarKafkaSimpleConsumer consumer = new PulsarKafkaSimpleConsumer(serviceUrl, 0, 0, 0, "clientId");
Expand All @@ -158,6 +159,7 @@ public void testPulsarKafkaProducerWithSerializer(int partitions) throws Excepti
.build();
FetchResponse fetchResponse = consumer.fetch(fReq);

Thread.sleep(500);
long lastOffset = 0;
MessageId offset = null;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topicName, partition)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -48,7 +49,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
private static final String EXPECTED_MESSAGE = "pulsar-spark test message";

@Test(dataProvider = "ServiceUrls")
public void testReceivedMessage(String serviceUrl) throws Exception {
public void testReceivedMessage(Supplier<String> serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();

Set<String> set = new HashSet<>();
Expand All @@ -68,14 +69,14 @@ public void received(Consumer consumer, Message msg) {
consConf.setMessageListener(msgListener);

SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

receiver.onStart();
waitForTransmission();

PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
producer.send(EXPECTED_MESSAGE.getBytes());

Expand All @@ -85,7 +86,7 @@ public void received(Consumer consumer, Message msg) {
}

@Test(dataProvider = "ServiceUrls")
public void testDefaultSettingsOfReceiver(String serviceUrl) {
public void testDefaultSettingsOfReceiver(Supplier<String> serviceUrl) {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();

Set<String> set = new HashSet<>();
Expand All @@ -94,7 +95,7 @@ public void testDefaultSettingsOfReceiver(String serviceUrl) {
consConf.setSubscriptionName(SUBS);

SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

Expand All @@ -103,7 +104,7 @@ public void testDefaultSettingsOfReceiver(String serviceUrl) {
}

@Test(dataProvider = "ServiceUrls")
public void testSharedSubscription(String serviceUrl) throws Exception {
public void testSharedSubscription(Supplier<String> serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();

Set<String> set = new HashSet<>();
Expand All @@ -120,20 +121,20 @@ public void testSharedSubscription(String serviceUrl) throws Exception {
});

SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
serviceUrl,
serviceUrl.get(),
consConf,
new AuthenticationDisabled());

receiver1.onStart();
receiver2.onStart();
waitForTransmission();

PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
for (int i = 0; i < 10; i++) {
producer.send(EXPECTED_MESSAGE.getBytes());
Expand All @@ -149,8 +150,8 @@ public void testSharedSubscription(String serviceUrl) throws Exception {
@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
dataProvider = "ServiceUrls")
public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
new SparkStreamingPulsarReceiver(serviceUrl, null, new AuthenticationDisabled());
public void testReceiverWhenClientConfigurationIsNull(Supplier<String> serviceUrl) {
new SparkStreamingPulsarReceiver(serviceUrl.get(), null, new AuthenticationDisabled());
}

private static void waitForTransmission() {
Expand Down

0 comments on commit 9801c43

Please sign in to comment.