Skip to content

Commit

Permalink
[ROCKETMQ-231]Wrong Pull result sizebugfix
Browse files Browse the repository at this point in the history
Author: lindzh <[email protected]>

Closes apache#126 from lindzh/fix_consumer_pull_msg_size.
  • Loading branch information
lindzh authored and vongosling committed Aug 11, 2017
1 parent 98bd032 commit bcc65e5
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int
return false;
}

if ((messageTotal + 1) >= maxMsgNums) {
if (maxMsgNums <= messageTotal) {
return true;
}

Expand All @@ -1120,15 +1120,15 @@ private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int
return true;
}

if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
return true;
}
} else {
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
return true;
}

if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -45,19 +47,22 @@ public void init() throws Exception {
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
}

public MessageStore buildMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
}

@Test
public void testWriteAndRead() throws Exception {
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();

MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());

MessageStore master = buildMessageStore();
boolean load = master.load();
assertTrue(load);

Expand Down Expand Up @@ -86,7 +91,7 @@ public MessageExtBrokerInner buildMessage() {
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(4);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
Expand Down Expand Up @@ -123,6 +128,36 @@ public void testGroupCommit() throws Exception {
}
}

@Test
public void testPullSize() throws Exception {
MessageStore messageStore = buildMessageStore();
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
String topic = "pullSizeTopic";

for (int i = 0; i < 32; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
}
//wait for consume queue build
Thread.sleep(100);
String group = "simple";
GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);


GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);

GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);

messageStore.shutdown();
}

private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class BaseConf {
protected static String clusterName;
protected static int brokerNum;
protected static int waitTime = 5;
protected static int consumeTime = 1 * 60 * 1000;
protected static int consumeTime = 5 * 60 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,17 @@ public static BrokerController createAndStartBroker(String nsAddr) {
return brokerController;
}

public static boolean initTopic(String topic, String nsAddr, String clusterName) {
public static boolean initTopic(String topic, String nsAddr, String clusterName,int queueNumbers){
long startTime = System.currentTimeMillis();
boolean createResult;

while (true) {
createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers);
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
System.currentTimeMillis() - startTime));
System.currentTimeMillis() - startTime));
break;
} else {
TestUtils.waitForMoment(500);
Expand All @@ -169,6 +169,10 @@ public static boolean initTopic(String topic, String nsAddr, String clusterName)
return createResult;
}

public static boolean initTopic(String topic, String nsAddr, String clusterName) {
return initTopic(topic, nsAddr, clusterName,8);
}

public static void deleteFile(File file) {
if (!file.exists()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
private RMQNormalProducer producer = null;
private String topic = null;

private int broadcastConsumeTime = 1 * 60 * 1000;

@Before
public void setUp() {
topic = initTopic();
Expand All @@ -60,12 +62,12 @@ public void testTwoConsumerSubTag() {
consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener());
TestUtils.waitForSeconds(waitTime);


List<MessageQueue> mqs = producer.getMessageQueue();
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ());

consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime);
consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime);

assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true);
Expand Down

0 comments on commit bcc65e5

Please sign in to comment.