Skip to content

Commit

Permalink
ROCKETMQ-264 Fix ut's time-consuming problems closes apache#145
Browse files Browse the repository at this point in the history
  • Loading branch information
lindzh authored and vongosling committed Aug 28, 2017
1 parent 629c3e9 commit 76464ba
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ public class BrokerControllerTest {
*/
@Test
public void testBrokerRestart() throws Exception {
for (int i = 0; i < 2; i++) {
BrokerController brokerController = new BrokerController(
new BrokerConfig(),
new NettyServerConfig(),
new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
brokerController.shutdown();
}
BrokerController brokerController = new BrokerController(
new BrokerConfig(),
new NettyServerConfig(),
new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
brokerController.shutdown();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -63,6 +65,14 @@ public class MessageStoreWithFilterTest {

private static SocketAddress StoreHost;

private DefaultMessageStore master;

private ConsumerFilterManager filterManager;

private int topicCount = 3;

private int msgPerTopic = 30;

static {
try {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
Expand All @@ -76,6 +86,24 @@ public class MessageStoreWithFilterTest {
}
}

@Before
public void init() {
filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
try {
master = gen(filterManager);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
}

@After
public void destroy() {
master.shutdown();
master.destroy();
UtilAll.deleteFile(new File(storePath));
}

public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
Expand Down Expand Up @@ -202,177 +230,143 @@ protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs,

@Test
public void testGetMessage_withFilterBitMapAndConsumerChanged() {
int topicCount = 10, msgPerTopic = 10;
ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);

DefaultMessageStore master = null;
List<MessageExtBrokerInner> msgs = null;
try {
master = gen(filterManager);
msgs = putMsg(master, topicCount, msgPerTopic);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

// sleep to wait for consume queue has been constructed.
try {
List<MessageExtBrokerInner> msgs = null;
try {
msgs = putMsg(master, topicCount, msgPerTopic);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

// sleep to wait for consume queue has been constructed.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
assertThat(true).isFalse();
}
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
assertThat(true).isFalse();
}

// reset consumer;
String topic = "topic" + 0;
String resetGroup = "CID_" + 2;
String normalGroup = "CID_" + 3;
// reset consumer;
String topic = "topic" + 0;
String resetGroup = "CID_" + 2;
String normalGroup = "CID_" + 3;

{
// reset CID_2@topic0 to get all messages.
SubscriptionData resetSubData = new SubscriptionData();
resetSubData.setExpressionType(ExpressionType.SQL92);
resetSubData.setTopic(topic);
resetSubData.setClassFilterMode(false);
resetSubData.setSubString("a is not null OR a is null");
{
// reset CID_2@topic0 to get all messages.
SubscriptionData resetSubData = new SubscriptionData();
resetSubData.setExpressionType(ExpressionType.SQL92);
resetSubData.setTopic(topic);
resetSubData.setClassFilterMode(false);
resetSubData.setSubString("a is not null OR a is null");

ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic,
resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(),
System.currentTimeMillis());
ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic,
resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(),
System.currentTimeMillis());

GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager));
GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager));

try {
assertThat(resetGetResult).isNotNull();
try {
assertThat(resetGetResult).isNotNull();

List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, resetFilterData);
List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, resetFilterData);

assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
resetGetResult.release();
}
assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
resetGetResult.release();
}
}

{
ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup);
assertThat(normalFilterData).isNotNull();
assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis());
{
ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup);
assertThat(normalFilterData).isNotNull();
assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis());

SubscriptionData normalSubData = new SubscriptionData();
normalSubData.setExpressionType(normalFilterData.getExpressionType());
normalSubData.setTopic(topic);
normalSubData.setClassFilterMode(false);
normalSubData.setSubString(normalFilterData.getExpression());
SubscriptionData normalSubData = new SubscriptionData();
normalSubData.setExpressionType(normalFilterData.getExpressionType());
normalSubData.setTopic(topic);
normalSubData.setClassFilterMode(false);
normalSubData.setSubString(normalFilterData.getExpression());

List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, normalFilterData);
List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, normalFilterData);

GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager));
GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager));

try {
assertThat(normalGetResult).isNotNull();
assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
normalGetResult.release();
}
try {
assertThat(normalGetResult).isNotNull();
assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
normalGetResult.release();
}
} finally {
master.shutdown();
master.destroy();
UtilAll.deleteFile(new File(storePath));
}
}

@Test
public void testGetMessage_withFilterBitMap() {
int topicCount = 10, msgPerTopic = 500;
ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);

DefaultMessageStore master = null;
List<MessageExtBrokerInner> msgs = null;
try {
master = gen(filterManager);
msgs = putMsg(master, topicCount, msgPerTopic);
// sleep to wait for consume queue has been constructed.
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

try {
List<MessageExtBrokerInner> msgs = null;
try {
msgs = putMsg(master, topicCount, msgPerTopic);
// sleep to wait for consume queue has been constructed.
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;

for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;

for (int j = 0; j < msgPerTopic; j++) {
String group = "CID_" + j;

ConsumerFilterData filterData = filterManager.get(realTopic, group);
assertThat(filterData).isNotNull();

List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, filterData);

SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setExpressionType(filterData.getExpressionType());
subscriptionData.setTopic(filterData.getTopic());
subscriptionData.setClassFilterMode(false);
subscriptionData.setSubString(filterData.getExpression());

GetMessageResult getMessageResult = master.getMessage(group, realTopic, queueId, 0, 10000,
new ExpressionMessageFilter(subscriptionData, filterData, filterManager));
String assertMsg = group + "-" + realTopic;
try {
assertThat(getMessageResult).isNotNull();
assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus());
assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty();
assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());

for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) {
MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false);
assertThat(messageExt).isNotNull();

Object evlRet = null;
try {
evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties()));
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
for (int j = 0; j < msgPerTopic; j++) {
String group = "CID_" + j;

assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE);
ConsumerFilterData filterData = filterManager.get(realTopic, group);
assertThat(filterData).isNotNull();

// check
boolean find = false;
for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) {
if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) {
find = true;
}
List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, filterData);

SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setExpressionType(filterData.getExpressionType());
subscriptionData.setTopic(filterData.getTopic());
subscriptionData.setClassFilterMode(false);
subscriptionData.setSubString(filterData.getExpression());

GetMessageResult getMessageResult = master.getMessage(group, realTopic, queueId, 0, 10000,
new ExpressionMessageFilter(subscriptionData, filterData, filterManager));
String assertMsg = group + "-" + realTopic;
try {
assertThat(getMessageResult).isNotNull();
assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus());
assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty();
assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());

for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) {
MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false);
assertThat(messageExt).isNotNull();

Object evlRet = null;
try {
evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties()));
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE);

// check
boolean find = false;
for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) {
if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) {
find = true;
}
assertThat(find).isTrue();
}
} finally {
getMessageResult.release();
assertThat(find).isTrue();
}
} finally {
getMessageResult.release();
}
}
} finally {
master.shutdown();
master.destroy();
UtilAll.deleteFile(new File(storePath));
}
}
}
1 change: 0 additions & 1 deletion broker/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
<configuration>

<appender name="DefaultAppender" class="ch.qos.logback.core.ConsoleAppender">
<append>true</append>
<encoder>
<pattern>%d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
Expand Down
Loading

0 comments on commit 76464ba

Please sign in to comment.