Skip to content

Commit

Permalink
fix pause does not work for new created consumer (apache#8387)
Browse files Browse the repository at this point in the history
Fixes apache#8214 

### Motivation

fix pause does not work for new created consumer in case of multi topics consumer

### Modifications

add a flag for `MultiTopicsConsumerImpl`  which indicates wheter `MultiTopicsConsumerImpl` has been paused or not. If the flag is true, we should pause the new added consumers for new topic partition.
  • Loading branch information
aloyszhang authored Oct 30, 2020
1 parent 7979b9c commit b2c0f59
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
Expand Down Expand Up @@ -3037,6 +3038,73 @@ public void testConsumerSubscriptionInitialize() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testMultiTopicsConsumerImplPause() throws Exception {
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/my-ns/partition-topic";

admin.topics().createPartitionedTopic(topicName, 1);


Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
.create();

// 1. produce 5 messages
for (int i = 0; i < 5; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes(UTF_8));
}

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(1)
.autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
.subscriptionName("test-multi-topic-consumer").subscribe();

int counter = 0;
for (; counter < 5; counter ++) {
assertEquals(consumer.receive().getData(), ("my-message-" + counter).getBytes());
}

// 2. pause multi-topic consumer
consumer.pause();

// 3. update partition
admin.topics().updatePartitionedTopic(topicName, 3);

// 4. wait for client to update partitions
while(((MultiTopicsConsumerImpl)consumer).getConsumers().size() <= 1) {
Thread.sleep(1);
}

// 5. produce 5 messages more
for (int i = 5; i < 10; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}

while(consumer.receive(3, TimeUnit.SECONDS) != null) {
counter++;
}

assertTrue(counter < 10);
// 6. resume multi-topic consumer
consumer.resume();

// 7. continue consume
while(consumer.receive(3, TimeUnit.SECONDS) != null) {
counter++;
}
assertEquals(counter, 10);

producer.close();;
consumer.close();
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testFlushBatchEnabled() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,14 @@ private static Map<String, Metric> parseMetrics(String metrics) {
checkArgument(matcher.matches());
String name = matcher.group(1);
Metric m = new Metric();
m.value = Double.valueOf(matcher.group(3));
String numericValue = matcher.group(3);
if (numericValue.equalsIgnoreCase("-Inf")) {
m.value = Double.NEGATIVE_INFINITY;
} else if (numericValue.equalsIgnoreCase("+Inf")) {
m.value = Double.POSITIVE_INFINITY;
} else {
m.value = Double.valueOf(numericValue);
}
String tags = matcher.group(2);
Matcher tagsMatcher = tagsPattern.matcher(tags);
while (tagsMatcher.find()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number.
AtomicInteger allTopicPartitionsNumber;

private boolean paused = false;
private final Object pauseMutex = new Object();
// timeout related to auto check and subscribe partition increasement
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
Expand Down Expand Up @@ -1079,12 +1081,18 @@ public List<ConsumerImpl<T>> getConsumers() {

@Override
public void pause() {
consumers.forEach((name, consumer) -> consumer.pause());
synchronized (pauseMutex) {
paused = true;
consumers.forEach((name, consumer) -> consumer.pause());
}
}

@Override
public void resume() {
consumers.forEach((name, consumer) -> consumer.resume());
synchronized (pauseMutex) {
paused = false;
consumers.forEach((name, consumer) -> consumer.resume());
}
}

// This listener is triggered when topics partitions are updated.
Expand Down Expand Up @@ -1149,7 +1157,12 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture, null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
}
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
}
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
topicName, newConsumer.getTopic(), partitionName);
Expand Down

0 comments on commit b2c0f59

Please sign in to comment.