Skip to content

Commit

Permalink
Add messaging integration tests for different subscriptions. (apache#…
Browse files Browse the repository at this point in the history
…4352)

* Add messaging integration tests.

* Add messaging integration tests.

* remove unused consumer from consumer range map.

* put all messages to pendingAcks before write messages to ctx.

* check entry != null
  • Loading branch information
codelipenghui authored and merlimat committed May 24, 2019
1 parent 009bc29 commit b161328
Show file tree
Hide file tree
Showing 8 changed files with 831 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
return writePromise;
}

// Note
// Must ensure that the message is written to the pendingAcks before sent is first , because this consumer
// is possible to disconnect at this time.
if (pendingAcks != null) {
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry != null) {
int batchSize = batchSizes.getBatchSize(i);
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
}
}
}

// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
incrementUnackedMessages(totalMessages);
Expand All @@ -213,10 +226,6 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba

int batchSize = batchSizes.getBatchSize(i);

if (pendingAcks != null) {
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
}

if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}",
topicName, subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -89,7 +90,7 @@ public synchronized void addConsumer(Consumer consumer) throws ConsumerAssignExc

@Override
public synchronized void removeConsumer(Consumer consumer) {
Integer removeRange = consumerRange.get(consumer);
Integer removeRange = consumerRange.remove(consumer);
if (removeRange != null) {
if (removeRange == rangeSize && rangeMap.size() > 1) {
Map.Entry<Integer, Consumer> lowerEntry = rangeMap.lowerEntry(removeRange);
Expand All @@ -98,7 +99,6 @@ public synchronized void removeConsumer(Consumer consumer) {
consumerRange.put(lowerEntry.getValue(), removeRange);
} else {
rangeMap.remove(removeRange);
consumerRange.remove(consumer);
}
}
}
Expand Down Expand Up @@ -147,4 +147,12 @@ private boolean is2Power(int num) {
if(num < 2) return false;
return (num & num - 1) == 0;
}

Map<Consumer, Integer> getConsumerRange() {
return Collections.unmodifiableMap(consumerRange);
}

Map<Integer, Consumer> getRangeConsumer() {
return Collections.unmodifiableMap(rangeMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@ public class HashRangeStickyKeyConsumerSelectorTest {
@Test
public void testConsumerSelect() throws ConsumerAssignException {

StickyKeyConsumerSelector selector = new HashRangeStickyKeyConsumerSelector();
HashRangeStickyKeyConsumerSelector selector = new HashRangeStickyKeyConsumerSelector();
String key1 = "anyKey";
Assert.assertNull(selector.select(key1.getBytes()));

Consumer consumer1 = mock(Consumer.class);
selector.addConsumer(consumer1);
int consumer1Slot = DEFAULT_RANGE_SIZE;
Assert.assertEquals(selector.select(key1.getBytes()), consumer1);
Assert.assertEquals(selector.getConsumerRange().size(), 1);
Assert.assertEquals(selector.getRangeConsumer().size(), 1);

Consumer consumer2 = mock(Consumer.class);
selector.addConsumer(consumer2);
Assert.assertEquals(selector.getConsumerRange().size(), 2);
Assert.assertEquals(selector.getRangeConsumer().size(), 2);
int consumer2Slot = consumer1Slot >> 1;

for (int i = 0; i < 100; i++) {
Expand All @@ -58,6 +62,8 @@ public void testConsumerSelect() throws ConsumerAssignException {

Consumer consumer3 = mock(Consumer.class);
selector.addConsumer(consumer3);
Assert.assertEquals(selector.getConsumerRange().size(), 3);
Assert.assertEquals(selector.getRangeConsumer().size(), 3);
int consumer3Slot = consumer2Slot >> 1;

for (int i = 0; i < 100; i++) {
Expand All @@ -74,6 +80,8 @@ public void testConsumerSelect() throws ConsumerAssignException {

Consumer consumer4 = mock(Consumer.class);
selector.addConsumer(consumer4);
Assert.assertEquals(selector.getConsumerRange().size(), 4);
Assert.assertEquals(selector.getRangeConsumer().size(), 4);
int consumer4Slot = consumer1Slot - ((consumer1Slot - consumer2Slot) >> 1);

for (int i = 0; i < 100; i++) {
Expand All @@ -91,6 +99,8 @@ public void testConsumerSelect() throws ConsumerAssignException {
}

selector.removeConsumer(consumer1);
Assert.assertEquals(selector.getConsumerRange().size(), 3);
Assert.assertEquals(selector.getRangeConsumer().size(), 3);
for (int i = 0; i < 100; i++) {
String key = UUID.randomUUID().toString();
int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % DEFAULT_RANGE_SIZE;
Expand All @@ -104,6 +114,8 @@ public void testConsumerSelect() throws ConsumerAssignException {
}

selector.removeConsumer(consumer2);
Assert.assertEquals(selector.getConsumerRange().size(), 2);
Assert.assertEquals(selector.getRangeConsumer().size(), 2);
for (int i = 0; i < 100; i++) {
String key = UUID.randomUUID().toString();
int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % DEFAULT_RANGE_SIZE;
Expand All @@ -115,6 +127,8 @@ public void testConsumerSelect() throws ConsumerAssignException {
}

selector.removeConsumer(consumer3);
Assert.assertEquals(selector.getConsumerRange().size(), 1);
Assert.assertEquals(selector.getRangeConsumer().size(), 1);
for (int i = 0; i < 100; i++) {
String key = UUID.randomUUID().toString();
Assert.assertEquals(selector.select(key.getBytes()), consumer4);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.messaging;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertEquals;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.BeforeMethod;

import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Slf4j
public abstract class MessagingBase extends PulsarTestSuite {

protected String methodName;

@BeforeMethod
public void beforeMethod(Method m) throws Exception {
methodName = m.getName();
}

protected String getNonPartitionedTopic(String topicPrefix, boolean isPersistent) throws Exception {
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);

return generateTopicName(nsName, topicPrefix, true);
}

protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, int partitions) throws Exception {
assertTrue(partitions > 0, "partitions must greater than 1");
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);

String topicName = generateTopicName(nsName, topicPrefix, true);
pulsarCluster.createPartitionedTopic(topicName, partitions);
return topicName;
}

protected <T extends Comparable<T>> void receiveMessagesCheckOrderAndDuplicate
(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
Message<T> currentReceived = null;
Message<T> lastReceived = null;
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
} catch (PulsarClientException e) {
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
// Make sure that messages are received in order
if (lastReceived == null) {
assertNotNull(currentReceived);
} else {
assertTrue(currentReceived != null
&& (currentReceived.getValue().compareTo(lastReceived.getValue()) > 0),
"Received messages are not in order.");
}
lastReceived = currentReceived;
// Make sure that there are no duplicates
assertTrue(messagesReceived.add(currentReceived.getValue()),
"Received duplicate message " + currentReceived.getValue());
}
if (currentReceived != null) {
consumer.acknowledgeCumulative(currentReceived);
}
}
assertEquals(messagesReceived.size(), messagesToReceive);
}

protected <T> void receiveMessagesCheckDuplicate
(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
Message<T> currentReceived = null;
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
} catch (PulsarClientException e) {
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
// Make sure that there are no duplicates
assertTrue(messagesReceived.add(currentReceived.getValue()),
"Received duplicate message " + currentReceived.getValue());
}
if (currentReceived != null) {
consumer.acknowledgeCumulative(currentReceived);
}
}
assertEquals(messagesReceived.size(), messagesToReceive);
}

protected <T> void receiveMessagesCheckStickyKeyAndDuplicate
(List<Consumer<T>> consumerList, int messagesToReceive) throws PulsarClientException {
Map<String, Set<String>> consumerKeys = Maps.newHashMap();
Set<T> messagesReceived = Sets.newHashSet();
for (Consumer<T> consumer : consumerList) {
Message<T> currentReceived = null;
while (true) {
try {
currentReceived = consumer.receive(3, TimeUnit.SECONDS);
} catch (PulsarClientException e) {
log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
break;
}
assertNotNull(currentReceived.getKey());
consumerKeys.putIfAbsent(consumer.getConsumerName(), Sets.newHashSet());
consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
// Make sure that there are no duplicates
assertTrue(messagesReceived.add(currentReceived.getValue()),
"Received duplicate message " + currentReceived.getValue());
}
if (currentReceived != null) {
consumer.acknowledgeCumulative(currentReceived);
}
}
// Make sure key will not be distributed to multiple consumers
Set<String> allKeys = Sets.newHashSet();
consumerKeys.forEach((k, v) -> v.forEach(key -> {
assertTrue(allKeys.add(key),
"Key "+ key + "is distributed to multiple consumers" );
}));
assertEquals(messagesReceived.size(), messagesToReceive);
}

protected <T> void closeConsumers(List<Consumer<T>> consumerList) throws PulsarClientException {
Iterator<Consumer<T>> iterator = consumerList.iterator();
while (iterator.hasNext()) {
iterator.next().close();
iterator.remove();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.messaging;

import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.Test;

@Slf4j
public class NonPersistentTopicMessagingTest extends TopicMessagingBase {

@Test(dataProvider = "ServiceUrls")
public void testNonPartitionedTopicMessagingWithExclusive(String serviceUrl) throws Exception {
nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl, false);
}

@Test(dataProvider = "ServiceUrls")
public void testPartitionedTopicMessagingWithExclusive(String serviceUrl) throws Exception {
partitionedTopicSendAndReceiveWithExclusive(serviceUrl, false);
}

@Test(dataProvider = "ServiceUrls")
public void testNonPartitionedTopicMessagingWithFailover(String serviceUrl) throws Exception {
nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl, false);
}

@Test(dataProvider = "ServiceUrls")
public void testPartitionedTopicMessagingWithFailover(String serviceUrl) throws Exception {
partitionedTopicSendAndReceiveWithFailover(serviceUrl, false);
}

@Test(dataProvider = "ServiceUrls")
public void testNonPartitionedTopicMessagingWithShared(String serviceUrl) throws Exception {
nonPartitionedTopicSendAndReceiveWithShared(serviceUrl, false);
}

@Test(dataProvider = "ServiceUrls")
public void testPartitionedTopicMessagingWithShared(String serviceUrl) throws Exception {
partitionedTopicSendAndReceiveWithShared(serviceUrl, false);
}

@Test(dataProvider = "ServiceUrls")
public void testNonPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
}

@Test(dataProvider = "ServiceUrls")
public void testPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
}
}
Loading

0 comments on commit b161328

Please sign in to comment.