Skip to content

Commit

Permalink
Issue 3230: auto update topic partitions extend for consumer and prod…
Browse files Browse the repository at this point in the history
…ucer (apache#3513)

* consumer auto subscribe

* producer update for partiton extends

* change following Matteo's comments

* enable by default
  • Loading branch information
jiazhai authored and merlimat committed Feb 7, 2019
1 parent b833bce commit 3a5df1a
Show file tree
Hide file tree
Showing 12 changed files with 578 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -667,6 +669,116 @@ public void testGetPartitionsForTopic() throws Exception {
Collections.singletonList(nonPartitionedTopic));
}


/**
* It verifies that consumer producer auto update for partitions extend.
*
* Steps:
* 1. create topic with 2 partitions, and producer consumer
* 2. update partition from 2 to 3.
* 3. trigger auto update in producer, after produce, consumer will only get messages from 2 partitions.
* 4. trigger auto update in consumer, after produce, consumer will get all messages from 3 partitions.
*
* @throws Exception
*/
@Test(timeOut = 30000)
public void testAutoUpdatePartitionsForProducerConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection

final int numPartitions = 2;
final String topicName = "persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis();
final String producerMsg = "producerMsg";
final int totalMessages = 30;

admin.topics().createPartitionedTopic(topicName, numPartitions);

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.enableBatching(false)
.autoUpdatePartitions(true)
.create();

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber")
.subscriptionType(SubscriptionType.Shared)
.autoUpdatePartitions(true)
.subscribe();

// 1. produce and consume 2 partitions
for (int i = 0; i < totalMessages; i++) {
producer.send((producerMsg + " first round " + "message index: " + i).getBytes());
}
int messageSet = 0;
Message<byte[]> message = consumer.receive();
do {
messageSet ++;
consumer.acknowledge(message);
log.info("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(200, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages);

// 2. update partition from 2 to 3.
admin.topics().updatePartitionedTopic(topicName,3);

// 3. trigger auto update in producer, after produce, consumer will get 2/3 messages.
log.info("trigger partitionsAutoUpdateTimerTask for producer");
Timeout timeout = ((PartitionedProducerImpl<byte[]>)producer).getPartitionsAutoUpdateTimeout();
timeout.task().run(timeout);
Thread.sleep(200);

for (int i = 0; i < totalMessages; i++) {
producer.send((producerMsg + " second round " + "message index: " + i).getBytes());
}
messageSet = 0;
message = consumer.receive();
do {
messageSet ++;
consumer.acknowledge(message);
log.info("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(200, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages * 2 / 3);

// 4. trigger auto update in consumer, after produce, consumer will get all messages.
log.info("trigger partitionsAutoUpdateTimerTask for consumer");
timeout = ((MultiTopicsConsumerImpl<byte[]>)consumer).getPartitionsAutoUpdateTimeout();
timeout.task().run(timeout);
Thread.sleep(200);

// former produced messages
messageSet = 0;
message = consumer.receive();
do {
messageSet ++;
consumer.acknowledge(message);
log.info("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(200, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages / 3);

// former produced messages
for (int i = 0; i < totalMessages; i++) {
producer.send((producerMsg + " third round " + "message index: " + i).getBytes());
}
messageSet = 0;
message = consumer.receive();
do {
messageSet ++;
consumer.acknowledge(message);
log.info("Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(200, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, totalMessages);

pulsarClient.close();
admin.topics().deletePartitionedTopic(topicName);

log.info("-- Exiting {} test --", methodName);
}


private class AlwaysTwoMessageRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -672,4 +674,90 @@ public void testMultiTopicsMessageListener() throws Exception {

consumer.close();
}


/**
* Test topic partitions auto subscribed.
*
* Steps:
* 1. Create a consumer with 2 topics, and each topic has 2 partitions: xx-partition-0, xx-partition-1.
* 2. produce message to xx-partition-2, and verify consumer could not receive message.
* 3. update topics to have 3 partitions.
* 4. trigger partitionsAutoUpdate. this should be done automatically, this is to save time to manually trigger.
* 5. produce message to xx-partition-2 again, and verify consumer could receive message.
*
*/
@Test(timeOut = 30000)
public void testTopicAutoUpdatePartitions() throws Exception {
String key = "TestTopicAutoUpdatePartitions";
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 6;

final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
List<String> topicNames = Lists.newArrayList(topicName1, topicName2);

admin.tenants().createTenant("prop", new TenantInfo());
admin.topics().createPartitionedTopic(topicName1, 2);
admin.topics().createPartitionedTopic(topicName2, 2);

// 1. Create a consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topics(topicNames)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.autoUpdatePartitions(true)
.subscribe();
assertTrue(consumer instanceof MultiTopicsConsumerImpl);

MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl) consumer;

// 2. use partition-2 producer,
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1 + "-partition-2")
.enableBatching(false)
.create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2 + "-partition-2")
.enableBatching(false)
.create();
for (int i = 0; i < totalMessages; i++) {
producer1.send((messagePredicate + "topic1-partition-2 index:" + i).getBytes());
producer2.send((messagePredicate + "topic2-partition-2 index:" + i).getBytes());
log.info("produce message to partition-2. message index: {}", i);
}
// since partition-2 not subscribed, could not receive any message.
Message<byte[]> message = consumer.receive(200, TimeUnit.MILLISECONDS);
assertNull(message);

// 3. update to 3 partitions
admin.topics().updatePartitionedTopic(topicName1, 3);
admin.topics().updatePartitionedTopic(topicName2, 3);

// 4. trigger partitionsAutoUpdate. this should be done automatically in 1 minutes,
// this is to save time to manually trigger.
log.info("trigger partitionsAutoUpdateTimerTask");
Timeout timeout = topicsConsumer.getPartitionsAutoUpdateTimeout();
timeout.task().run(timeout);
Thread.sleep(200);

// 5. produce message to xx-partition-2 again, and verify consumer could receive message.
for (int i = 0; i < totalMessages; i++) {
producer1.send((messagePredicate + "topic1-partition-2 index:" + i).getBytes());
producer2.send((messagePredicate + "topic2-partition-2 index:" + i).getBytes());
log.info("produce message to partition-2 again. messageindex: {}", i);
}
int messageSet = 0;
message = consumer.receive();
do {
messageSet ++;
consumer.acknowledge(message);
log.info("4 Consumer acknowledged : " + new String(message.getData()));
message = consumer.receive(200, TimeUnit.MILLISECONDS);
} while (message != null);
assertEquals(messageSet, 2 * totalMessages);

consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,13 @@ public interface ConsumerBuilder<T> extends Cloneable {
* then the ack timeout will be set to 30000 millisecond
*/
ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);

/**
* If enabled, the consumer will auto subscribe for partitions increasement.
* This is only for partitioned consumer.
*
* @param autoUpdate
* whether to auto update partition increasement
*/
ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,13 @@ public interface ProducerBuilder<T> extends Cloneable {
* @return producer builder.
*/
ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);

/**
* If enabled, partitioned producer will auto create new producers for new partitions.
* This is only for partitioned producer.
*
* @param autoUpdate
* whether to auto update partition increasement
*/
ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.util.FutureUtil;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -293,6 +292,12 @@ public ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
return this;
}

@Override
public ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate) {
conf.setAutoUpdatePartitions(autoUpdate);
return this;
}

public ConsumerConfigurationData<T> getConf() {
return conf;
}
Expand Down
Loading

0 comments on commit 3a5df1a

Please sign in to comment.