Skip to content

Commit

Permalink
produce/consume with 1 partitioned topic (apache#4883)
Browse files Browse the repository at this point in the history
Motivation
In PR apache#4764, we allow to create partitioned topic with 1 partition, But in Pulsar Client, user still not able to do it.
This fix try to make sure user could create consumer/producer for 1 partitioned topic .

Modifications
- old and new added unit test passed.
  • Loading branch information
jiazhai authored Aug 6, 2019
1 parent 822d8ec commit 128287e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.pulsar.client.api;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -3117,4 +3119,82 @@ public void testFailOverConsumerPriority() throws Exception {
consumer5.close();
log.info("-- Exiting {} test --", methodName);
}

/**
* This test verifies Producer and Consumer of PartitionedTopic with 1 partition works well.
*
* <pre>
* 1. create producer/consumer with both original name and PARTITIONED_TOPIC_SUFFIX.
* 2. verify producer/consumer could produce/consume messages from same underline persistent topic.
* </pre>
*
* @throws Exception
*/
@Test
public void testPartitionedTopicWithOnePartition() throws Exception {
log.info("-- Starting {} test --", methodName);

final String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
final String subscriptionName = "my-sub-";

// create partitioned topic
admin.topics().createPartitionedTopic(topicName, 1);
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 1);

@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName + 1)
.consumerName("aaa")
.subscribe();
log.info("Consumer1 created. topic: {}", consumer1.getTopic());

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
.subscriptionName(subscriptionName + 2)
.consumerName("bbb")
.subscribe();
log.info("Consumer2 created. topic: {}", consumer2.getTopic());

@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();
log.info("Producer1 created. topic: {}", producer1.getTopic());

@Cleanup
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
.enableBatching(false)
.create();
log.info("Producer2 created. topic: {}", producer2.getTopic());

final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
producer1.newMessage()
.value(("one-partitioned-topic-value-producer1-" + i).getBytes(UTF_8))
.send();

producer2.newMessage()
.value(("one-partitioned-topic-value-producer2-" + i).getBytes(UTF_8))
.send();
}

for (int i = 0; i < numMessages * 2; i++) {
Message<byte[]> msg = consumer1.receive(200, TimeUnit.MILLISECONDS);
assertNotNull(msg);
log.info("Consumer1 Received message '{}'.", new String(msg.getValue(), UTF_8));

msg = consumer2.receive(200, TimeUnit.MILLISECONDS);
assertNotNull(msg);
log.info("Consumer2 Received message '{}'.", new String(msg.getValue(), UTF_8));
}

assertNull(consumer1.receive(200, TimeUnit.MILLISECONDS));
assertNull(consumer2.receive(200, TimeUnit.MILLISECONDS));

log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult,
}

List<CompletableFuture<Consumer<T>>> futureList;
if (numPartitions > 1) {
if (numPartitions > 0) {
this.topics.putIfAbsent(topicName, numPartitions);
allTopicPartitionsNumber.addAndGet(numPartitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
}

ProducerBase<T> producer;
if (metadata.partitions > 1) {
if (metadata.partitions > 0) {
producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
producerCreatedFuture, schema, interceptors);
} else {
Expand Down Expand Up @@ -342,7 +342,7 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
ConsumerBase<T> consumer;
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
if (metadata.partitions > 1) {
if (metadata.partitions > 0) {
consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
Expand Down Expand Up @@ -469,7 +469,7 @@ <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T>
log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
}

if (metadata.partitions > 1) {
if (metadata.partitions > 0) {
readerFuture.completeExceptionally(
new PulsarClientException("Topic reader cannot be created on a partitioned topic"));
return;
Expand Down Expand Up @@ -655,7 +655,7 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(S
@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
if (metadata.partitions > 1) {
if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List<String> partitions = new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {
Expand Down

0 comments on commit 128287e

Please sign in to comment.