From 47fad3c17ab2d161743d4e52efc6258b7bcafde9 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Fri, 17 Mar 2017 18:59:43 +0800 Subject: [PATCH] ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53 --- .../rocketmq/broker/BrokerController.java | 2 + .../AbstractSendMessageProcessor.java | 12 +- .../processor/SendMessageProcessor.java | 350 +++++++++++------- .../apache/rocketmq/client/Validators.java | 1 + .../rocketmq/client/impl/MQClientAPIImpl.java | 56 +-- .../impl/producer/DefaultMQProducerImpl.java | 30 +- .../client/producer/DefaultMQProducer.java | 38 ++ .../rocketmq/client/producer/MQProducer.java | 14 + .../rocketmq/common/TopicFilterType.java | 1 + .../rocketmq/common/message/MessageBatch.java | 73 ++++ .../common/message/MessageDecoder.java | 103 ++++++ .../rocketmq/common/message/MessageExt.java | 2 +- .../common/message/MessageExtBatch.java | 42 +++ .../rocketmq/common/protocol/RequestCode.java | 3 + .../header/SendMessageRequestHeader.java | 10 + .../header/SendMessageRequestHeaderV2.java | 14 + .../rocketmq/common/MessageBatchTest.java | 70 ++++ .../common/MessageEncodeDecodeTest.java | 81 ++++ .../rocketmq/store/AppendMessageCallback.java | 15 +- .../rocketmq/store/AppendMessageResult.java | 11 + .../org/apache/rocketmq/store/CommitLog.java | 344 +++++++++++++++-- .../apache/rocketmq/store/ConsumeQueue.java | 2 +- .../rocketmq/store/DefaultMessageStore.java | 57 +++ .../org/apache/rocketmq/store/MappedFile.java | 32 +- .../apache/rocketmq/store/MessageStore.java | 3 + .../apache/rocketmq/store/RunningFlags.java | 11 + .../store/config/MessageStoreConfig.java | 2 + .../store/stats/BrokerStatsManager.java | 8 +- .../rocketmq/store/AppendCallbackTest.java | 150 ++++++++ .../client/producer/batch/BatchSendIT.java | 131 +++++++ .../exception/msg/MessageExceptionIT.java | 2 +- 31 files changed, 1464 insertions(+), 206 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java create mode 100644 common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java create mode 100644 common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java create mode 100644 store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index b656870b9e1..7e9e7ac9188 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -374,9 +374,11 @@ public void registerProcessor() { this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); /** * PullMessageProcessor diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 9f23bade96d..3faa7ae3b27 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -17,11 +17,6 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; @@ -51,6 +46,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Random; + public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -279,6 +280,7 @@ protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) SendMessageRequestHeaderV2 requestHeaderV2 = null; SendMessageRequestHeader requestHeader = null; switch (request.getCode()) { + case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: requestHeaderV2 = (SendMessageRequestHeaderV2) request diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index a4404621c0d..56a0b990e08 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; @@ -72,7 +73,13 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); - final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + + RemotingCommand response; + if (requestHeader.isBatch()) { + response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); + } else { + response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + } this.executeSendMessageHookAfter(response, mqtraceContext); return response; @@ -238,6 +245,50 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin return response; } + + private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, + MessageExt msg, TopicConfig topicConfig) { + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return false; + } + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); + if (reconsumeTimes >= maxReconsumeTimes) { + newTopic = MixAll.getDLQTopic(groupName); + int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + msg.setTopic(newTopic); + msg.setQueueId(queueIdInt); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return false; + } + } + } + int sysFlag = requestHeader.getSysFlag(); + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + msg.setSysFlag(sysFlag); + return true; + } + private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // final RemotingCommand request, // final SendMessageContext sendMessageContext, // @@ -251,9 +302,7 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); - if (log.isDebugEnabled()) { - log.debug("receive SendMessage request command, {}", request); - } + log.debug("receive SendMessage request command, {}", request); final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { @@ -270,6 +319,8 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // final byte[] body = request.getBody(); + + int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -277,53 +328,18 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } - int sysFlag = requestHeader.getSysFlag(); + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(requestHeader.getTopic()); + msgInner.setQueueId(queueIdInt); - if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { - sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { + return response; } - String newTopic = requestHeader.getTopic(); - if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); - if (null == subscriptionGroupConfig) { - response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark( - "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; - } - - int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); - if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { - maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); - } - int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); - if (reconsumeTimes >= maxReconsumeTimes) { - newTopic = MixAll.getDLQTopic(groupName); - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // - PermName.PERM_WRITE, 0 - ); - if (null == topicConfig) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("topic[" + newTopic + "] not exist"); - return response; - } - } - } - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(newTopic); msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); - msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); - - msgInner.setQueueId(queueIdInt); - msgInner.setSysFlag(sysFlag); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); @@ -340,105 +356,183 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // } PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - if (putMessageResult != null) { - boolean sendOK = false; - switch (putMessageResult.getPutMessageStatus()) { - // Success - case PUT_OK: - sendOK = true; - response.setCode(ResponseCode.SUCCESS); - break; - case FLUSH_DISK_TIMEOUT: - response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); - sendOK = true; - break; - case FLUSH_SLAVE_TIMEOUT: - response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); - sendOK = true; - break; - case SLAVE_NOT_AVAILABLE: - response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); - sendOK = true; - break; + return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); - // Failed - case CREATE_MAPEDFILE_FAILED: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("create mapped file failed, server is busy or broken."); - break; - case MESSAGE_ILLEGAL: - case PROPERTIES_SIZE_EXCEEDED: - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark( - "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); - break; - case SERVICE_NOT_AVAILABLE: - response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); - response.setRemark( - "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); - break; - case OS_PAGECACHE_BUSY: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); - break; - case UNKNOWN_ERROR: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR"); - break; - default: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR DEFAULT"); - break; - } + } - String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); - if (sendOK) { - this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); - this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); - this.brokerController.getBrokerStatsManager().incBrokerPutNums(); + private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, + SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) { + if (putMessageResult == null) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store putMessage return null"); + return response; + } + boolean sendOK = false; + + switch (putMessageResult.getPutMessageStatus()) { + // Success + case PUT_OK: + sendOK = true; + response.setCode(ResponseCode.SUCCESS); + break; + case FLUSH_DISK_TIMEOUT: + response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); + sendOK = true; + break; + case FLUSH_SLAVE_TIMEOUT: + response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); + sendOK = true; + break; + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); + sendOK = true; + break; + + // Failed + case CREATE_MAPEDFILE_FAILED: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("create mapped file failed, server is busy or broken."); + break; + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark( + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + break; + case SERVICE_NOT_AVAILABLE: + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark( + "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); + break; + case OS_PAGECACHE_BUSY: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); + break; + case UNKNOWN_ERROR: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR"); + break; + default: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR DEFAULT"); + break; + } + + String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); + if (sendOK) { - response.setRemark(null); + this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); - responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); - responseHeader.setQueueId(queueIdInt); - responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + response.setRemark(null); - doResponse(ctx, request, response); + responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); + responseHeader.setQueueId(queueIdInt); + responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); - if (hasSendMessageHook()) { - sendMessageContext.setMsgId(responseHeader.getMsgId()); - sendMessageContext.setQueueId(responseHeader.getQueueId()); - sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + doResponse(ctx, request, response); - int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); - int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + if (hasSendMessageHook()) { + sendMessageContext.setMsgId(responseHeader.getMsgId()); + sendMessageContext.setQueueId(responseHeader.getQueueId()); + sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); - sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); - sendMessageContext.setCommercialSendTimes(incValue); - sendMessageContext.setCommercialSendSize(wroteSize); - sendMessageContext.setCommercialOwner(owner); - } - return null; - } else { - if (hasSendMessageHook()) { - int wroteSize = request.getBody().length; - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); - - sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); - sendMessageContext.setCommercialSendTimes(incValue); - sendMessageContext.setCommercialSendSize(wroteSize); - sendMessageContext.setCommercialOwner(owner); - } + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); } + return null; } else { + if (hasSendMessageHook()) { + int wroteSize = request.getBody().length; + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + } + return response; + } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + log.debug("Receive SendMessage request command {}", request); + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("store putMessage return null"); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("message topic length too long " + requestHeader.getTopic().length()); + return response; } + if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); + return response; + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(requestHeader.getTopic()); + messageExtBatch.setQueueId(queueIdInt); + + int sysFlag = requestHeader.getSysFlag(); + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + messageExtBatch.setSysFlag(sysFlag); + + messageExtBatch.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties())); + messageExtBatch.setBody(request.getBody()); + messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); + messageExtBatch.setBornHost(ctx.channel().remoteAddress()); + messageExtBatch.setStoreHost(this.getStoreHost()); + messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); + + handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); return response; } diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index 899efa684d5..b49537f8c69 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -95,6 +95,7 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer } // topic Validators.checkTopic(msg.getTopic()); + // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 12580c14977..bdce8830500 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -18,14 +18,14 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.Iterator; +import java.util.Collections; +import java.util.ArrayList; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; @@ -50,10 +50,11 @@ import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -147,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.slf4j.Logger; + public class MQClientAPIImpl { private final static Logger log = ClientLogger.getLog(); @@ -278,14 +280,14 @@ public void createTopic(final String addr, final String defaultTopic, final Topi } public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendMessageContext context, // 7 - final DefaultMQProducerImpl producer // 8 + final String addr, // 1 + final String brokerName, // 2 + final Message msg, // 3 + final SendMessageRequestHeader requestHeader, // 4 + final long timeoutMillis, // 5 + final CommunicationMode communicationMode, // 6 + final SendMessageContext context, // 7 + final DefaultMQProducerImpl producer // 8 ) throws RemotingException, MQBrokerException, InterruptedException { return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); } @@ -305,9 +307,9 @@ public SendResult sendMessage(// final DefaultMQProducerImpl producer // 12 ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; - if (sendSmartMsg) { + if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } @@ -334,11 +336,11 @@ public SendResult sendMessage(// } private SendResult sendMessageSync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request// + final String addr, // + final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; @@ -507,8 +509,16 @@ private SendResult processSendResponse(// MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId()); + String uniqMsgId = MessageClientIDSetter.getUniqID(msg); + if (msg instanceof MessageBatch) { + StringBuilder sb = new StringBuilder(); + for (Message message : (MessageBatch) msg) { + sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message)); + } + uniqMsgId = sb.toString(); + } SendResult sendResult = new SendResult(sendStatus, - MessageClientIDSetter.getUniqID(msg), + uniqMsgId, responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); @@ -1452,7 +1462,7 @@ public Map invokeBrokerToResetOffset(final String addr, fina } public Map> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group, - final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 8e819792c4c..d828875d3f5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -30,6 +30,16 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -58,15 +68,6 @@ import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageId; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; @@ -595,8 +596,10 @@ private SendResult sendKernelImpl(final Message msg, // byte[] prevBody = msg.getBody(); try { - - MessageClientIDSetter.setUniqID(msg); + //for MessageBatch,ID has been set in the generating process + if (!(msg instanceof MessageBatch)) { + MessageClientIDSetter.setUniqID(msg); + } int sysFlag = 0; if (this.tryToCompressMessage(msg)) { @@ -652,6 +655,7 @@ private SendResult sendKernelImpl(final Message msg, // requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); + requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { @@ -737,6 +741,10 @@ public MQClientInstance getmQClientFactory() { } private boolean tryToCompressMessage(final Message msg) { + if (msg instanceof MessageBatch) { + //batch dose not support compressing right now + return false; + } byte[] body = msg.getBody(); if (body != null) { if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 3480c920e89..135a447eba4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -16,14 +16,18 @@ */ package org.apache.rocketmq.client.producer; +import java.util.Collection; import java.util.List; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; @@ -577,6 +581,40 @@ public MessageExt viewMessage(String topic, String msgId) throws RemotingExcepti return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); } + @Override + public SendResult send(Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs)); + } + + @Override + public SendResult send(Collection msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs), timeout); + } + + @Override + public SendResult send(Collection msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs), messageQueue); + } + + @Override + public SendResult send(Collection msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); + } + + private MessageBatch batch(Collection msgs) throws MQClientException { + MessageBatch msgBatch; + try { + msgBatch = MessageBatch.generateFromList(msgs); + for (Message message : msgBatch) { + Validators.checkMessage(message, this); + MessageClientIDSetter.setUniqID(message); + } + msgBatch.setBody(msgBatch.encode()); + } catch (Exception e) { + throw new MQClientException("Failed to initiate the MessageBatch", e); + } + return msgBatch; + } public String getProducerGroup() { return producerGroup; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 9fc7586113a..14caf6ffac9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.producer; +import java.util.Collection; import java.util.List; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -81,4 +82,17 @@ void sendOneway(final Message msg, final MessageQueueSelector selector, final Ob TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; + + //for batch + SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + SendResult send(final Collection msgs, final long timeout) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + SendResult send(final Collection msgs, final MessageQueue mq) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + SendResult send(final Collection msgs, final MessageQueue mq, final long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java index 58459e093f1..8dde5d829ed 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java @@ -19,4 +19,5 @@ public enum TopicFilterType { SINGLE_TAG, MULTI_TAG + } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java new file mode 100644 index 00000000000..ca2ce88ec7c --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.rocketmq.common.MixAll; + +public class MessageBatch extends Message implements Iterable { + + private static final long serialVersionUID = 621335151046335557L; + private final List messages; + + private MessageBatch(List messages) { + this.messages = messages; + } + + public byte[] encode() { + return MessageDecoder.encodeMessages(messages); + } + + public Iterator iterator() { + return messages.iterator(); + } + + public static MessageBatch generateFromList(Collection messages) { + assert messages != null; + assert messages.size() > 0; + List messageList = new ArrayList(messages.size()); + Message first = null; + for (Message message : messages) { + if (message.getDelayTimeLevel() > 0) { + throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching"); + } + if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + throw new UnsupportedOperationException("Retry Group is not supported for batching"); + } + if (first == null) { + first = message; + } else { + if (!first.getTopic().equals(message.getTopic())) { + throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); + } + if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) { + throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same"); + } + } + messageList.add(message); + } + MessageBatch messageBatch = new MessageBatch(messageList); + + messageBatch.setTopic(first.getTopic()); + messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK()); + return messageBatch; + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 4f4e1589a37..90b837a335a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -200,6 +200,8 @@ public static byte[] encode(MessageExt messageExt, boolean needCompress) throws return byteBuffer.array(); } + + public static MessageExt decode( java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) { return decode(byteBuffer, readBody, deCompressBody, false); @@ -372,4 +374,105 @@ public static Map string2messageProperties(final String properti return map; } + + + public static byte[] encodeMessage(Message message) { + //only need flag, body, properties + byte[] body = message.getBody(); + int bodyLen = body.length; + String properties = messageProperties2String(message.getProperties()); + byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); + //note properties length must not more than Short.MAX + short propertiesLength = (short) propertiesBytes.length; + int sysFlag = message.getFlag(); + int storeSize = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCOD + + 4 // 3 BODYCRC + + 4 // 4 FLAG + + 4 + bodyLen // 4 BODY + + 2 + propertiesLength; + ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize); + // 1 TOTALSIZE + byteBuffer.putInt(storeSize); + + // 2 MAGICCODE + byteBuffer.putInt(0); + + // 3 BODYCRC + byteBuffer.putInt(0); + + // 4 FLAG + int flag = message.getFlag(); + byteBuffer.putInt(flag); + + // 5 BODY + byteBuffer.putInt(bodyLen); + byteBuffer.put(body); + + // 6 properties + byteBuffer.putShort(propertiesLength); + byteBuffer.put(propertiesBytes); + + return byteBuffer.array(); + } + + public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception { + Message message = new Message(); + + // 1 TOTALSIZE + byteBuffer.getInt(); + + // 2 MAGICCODE + byteBuffer.getInt(); + + // 3 BODYCRC + byteBuffer.getInt(); + + // 4 FLAG + int flag = byteBuffer.getInt(); + message.setFlag(flag); + + // 5 BODY + int bodyLen = byteBuffer.getInt(); + byte[] body = new byte[bodyLen]; + byteBuffer.get(body); + message.setBody(body); + + // 6 properties + short propertiesLen = byteBuffer.getShort(); + byte[] propertiesBytes = new byte[propertiesLen]; + byteBuffer.get(propertiesBytes); + message.setProperties(string2messageProperties(new String(propertiesBytes, CHARSET_UTF8))); + + return message; + } + + public static byte[] encodeMessages(List messages) { + //TO DO refactor, accumulate in one buffer, avoid copies + List encodedMessages = new ArrayList(messages.size()); + int allSize = 0; + for (Message message: messages) { + byte[] tmp = encodeMessage(message); + encodedMessages.add(tmp); + allSize += tmp.length; + } + byte[] allBytes = new byte[allSize]; + int pos = 0; + for (byte[] bytes : encodedMessages) { + System.arraycopy(bytes, 0, allBytes, pos, bytes.length); + pos += bytes.length; + } + return allBytes; + } + + + public static List decodeMessages(ByteBuffer byteBuffer) throws Exception { + //TO DO add a callback for processing, avoid creating lists + List msgs = new ArrayList(); + while (byteBuffer.hasRemaining()) { + Message msg = decodeMessage(byteBuffer); + msgs.add(msg); + } + return msgs; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java index d11069fcac7..3f77767eb8d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java @@ -64,7 +64,7 @@ public static TopicFilterType parseTopicFilterType(final int sysFlag) { return TopicFilterType.SINGLE_TAG; } - private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { + public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); byteBuffer.putInt(inetSocketAddress.getPort()); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java new file mode 100644 index 00000000000..352ab376e15 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.message; + +import java.nio.ByteBuffer; + +public class MessageExtBatch extends MessageExt { + + private static final long serialVersionUID = -2353110995348498537L; + + + public ByteBuffer wrap() { + assert getBody() != null; + return ByteBuffer.wrap(getBody(), 0, getBody().length); + } + + + private ByteBuffer encodedBuff; + + public ByteBuffer getEncodedBuff() { + return encodedBuff; + } + + public void setEncodedBuff(ByteBuffer encodedBuff) { + this.encodedBuff = encodedBuff; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 217e8df9ac7..c6b0925535f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -159,4 +159,7 @@ public class RequestCode { * get config from name server */ public static final int GET_NAMESRV_CONFIG = 319; + + + public static final int SEND_BATCH_MESSAGE = 320; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java index 38b658981d4..2df31e6bb2a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java @@ -48,6 +48,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader { private Integer reconsumeTimes; @CFNullable private boolean unitMode = false; + @CFNullable + private boolean batch = false; private Integer maxReconsumeTimes; @Override @@ -149,4 +151,12 @@ public Integer getMaxReconsumeTimes() { public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } + + public boolean isBatch() { + return batch; + } + + public void setBatch(boolean batch) { + this.batch = batch; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index 34c83cbfdf6..757ef0c1526 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -51,6 +51,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { private Integer l; // consumeRetryTimes + @CFNullable + private boolean m; //batch + + public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { SendMessageRequestHeader v1 = new SendMessageRequestHeader(); v1.setProducerGroup(v2.a); @@ -65,6 +69,7 @@ public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final Se v1.setReconsumeTimes(v2.j); v1.setUnitMode(v2.k); v1.setMaxReconsumeTimes(v2.l); + v1.setBatch(v2.m); return v1; } @@ -82,6 +87,7 @@ public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final v2.j = v1.getReconsumeTimes(); v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); + v2.m = v1.isBatch(); return v2; } @@ -184,4 +190,12 @@ public Integer getL() { public void setL(final Integer l) { this.l = l; } + + public boolean isM() { + return m; + } + + public void setM(boolean m) { + this.m = m; + } } \ No newline at end of file diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java new file mode 100644 index 00000000000..1e406d2f64c --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.junit.Test; + +public class MessageBatchTest { + + + public List generateMessages() { + List messages = new ArrayList(); + Message message1 = new Message("topic1", "body".getBytes()); + Message message2 = new Message("topic1", "body".getBytes()); + + messages.add(message1); + messages.add(message2); + return messages; + } + + @Test + public void testGenerate_OK() throws Exception{ + List messages = generateMessages(); + MessageBatch.generateFromList(messages); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_DiffTopic() throws Exception{ + List messages = generateMessages(); + messages.get(1).setTopic("topic2"); + MessageBatch.generateFromList(messages); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_DiffWaitOK() throws Exception{ + List messages = generateMessages(); + messages.get(1).setWaitStoreMsgOK(false); + MessageBatch.generateFromList(messages); + } + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_Delay() throws Exception{ + List messages = generateMessages(); + messages.get(1).setDelayTimeLevel(1); + MessageBatch.generateFromList(messages); + } + @Test(expected = UnsupportedOperationException.class) + public void testGenerate_Retry() throws Exception{ + List messages = generateMessages(); + messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic"); + MessageBatch.generateFromList(messages); + } +} diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java new file mode 100644 index 00000000000..a219edac48a --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +/** + * Created by liuzhendong on 16/12/21. + */ +public class MessageEncodeDecodeTest { + + + @Test + public void testEncodeDecodeSingle() throws Exception{ + Message message = new Message("topic", "body".getBytes()); + message.setFlag(12); + message.putUserProperty("key","value"); + byte[] bytes = MessageDecoder.encodeMessage(message); + ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.put(bytes); + buffer.flip(); + Message newMessage = MessageDecoder.decodeMessage(buffer); + + assertTrue(message.getFlag() == newMessage.getFlag()); + assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key"))); + assertTrue(Arrays.equals(newMessage.getBody(), message.getBody())); + } + + @Test + public void testEncodeDecodeList() throws Exception { + List messages = new ArrayList(128); + for (int i = 0; i < 100; i++) { + Message message = new Message("topic", ("body" + i).getBytes()); + message.setFlag(i); + message.putUserProperty("key", "value" + i); + messages.add(message); + } + byte[] bytes = MessageDecoder.encodeMessages(messages); + + ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.put(bytes); + buffer.flip(); + + List newMsgs = MessageDecoder.decodeMessages(buffer); + + assertTrue(newMsgs.size() == messages.size()); + + for (int i = 0; i < newMsgs.size(); i++) { + Message message = messages.get(i); + Message newMessage = newMsgs.get(i); + assertTrue(message.getFlag() == newMessage.getFlag()); + assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key"))); + assertTrue(Arrays.equals(newMessage.getBody(), message.getBody())); + + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java index 70b702e720c..16a62fa48cf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store; import java.nio.ByteBuffer; +import org.apache.rocketmq.common.message.MessageExtBatch; /** * Write messages callback interface @@ -32,5 +33,17 @@ public interface AppendMessageCallback { * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBrokerInner msg); + final int maxBlank, final MessageExtBrokerInner msg); + + /** + * After batched message serialization, write MapedByteBuffer + * + * @param byteBuffer + * @param maxBlank + * @param messageExtBatch, backed up by a byte array + * + * @return How many bytes to write + */ + AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, + final int maxBlank, final MessageExtBatch messageExtBatch); } diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java index 5182dc496e6..d6d1aa6a31c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java @@ -34,6 +34,8 @@ public class AppendMessageResult { private long logicsOffset; private long pagecacheRT = 0; + private int msgNum = 1; + public AppendMessageResult(AppendMessageStatus status) { this(status, 0, 0, "", 0, 0, 0); } @@ -109,6 +111,14 @@ public void setLogicsOffset(long logicsOffset) { this.logicsOffset = logicsOffset; } + public int getMsgNum() { + return msgNum; + } + + public void setMsgNum(int msgNum) { + this.msgNum = msgNum; + } + @Override public String toString() { return "AppendMessageResult{" + @@ -119,6 +129,7 @@ public String toString() { ", storeTimestamp=" + storeTimestamp + ", logicsOffset=" + logicsOffset + ", pagecacheRT=" + pagecacheRT + + ", msgNum=" + msgNum + '}'; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3bbe675854f..5be8258e9f1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; @@ -57,6 +58,7 @@ public class CommitLog { private final FlushCommitLogService commitLogService; private final AppendMessageCallback appendMessageCallback; + private final ThreadLocal batchEncoderThreadLocal; private HashMap topicQueueTable = new HashMap(1024); private volatile long confirmOffset = -1L; @@ -81,6 +83,11 @@ public CommitLog(final DefaultMessageStore defaultMessageStore) { this.commitLogService = new CommitRealTimeService(); this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + batchEncoderThreadLocal = new ThreadLocal() { + @Override protected MessageExtBatchEncoder initialValue() { + return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + } + }; } public boolean load() { @@ -222,7 +229,8 @@ private void doNothingForDeadCode(final Object obj) { * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ - public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { + public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, + final boolean readBody) { try { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); @@ -370,7 +378,7 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, return new DispatchRequest(-1, false /* success */); } - private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { + private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { final int msgLen = 4 // 1 TOTALSIZE + 4 // 2 MAGICCODE + 4 // 3 BODYCRC @@ -633,18 +641,23 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); - GroupCommitRequest request = null; + handleDiskFlush(result, putMessageResult, msg); + handleHA(result, putMessageResult, msg); + return putMessageResult; + } + + public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; - if (msg.isWaitStoreMsgOK()) { - request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); + if (messageExt.isWaitStoreMsgOK()) { + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { - log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() - + " client address: " + msg.getBornHostString()); + log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { @@ -659,26 +672,22 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { commitLogService.wakeup(); } } + } - // Synchronous write double + public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); - if (msg.isWaitStoreMsgOK()) { + if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { - if (null == request) { - request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); - } + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); - service.getWaitNotifyObject().wakeupAll(); - boolean flushOK = - // TODO request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { - log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " - + msg.getTags() + " client address: " + msg.getBornHostString()); + log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } @@ -690,12 +699,109 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { } } + } + + public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { + messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); + AppendMessageResult result; + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); + + if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + if (messageExtBatch.getDelayTimeLevel() > 0) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + long eclipseTimeInLock = 0; + MappedFile unlockMappedFile = null; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + //fine-grained lock instead of the coarse-grained + MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + + lockForPutMessage(); //spin... + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; + + // Here settings are stored timestamp, in order to ensure an orderly + // global + messageExtBatch.setStoreTimestamp(beginLockTimestamp); + + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("Create maped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); + } + + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("Create maped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); + } + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); + case UNKNOWN_ERROR: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + default: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + } + + eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + beginTimeInLock = 0; + } finally { + releasePutMessageLock(); + } + + + if (eclipseTimeInLock > 500) { + log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, messageExtBatch.getBody().length, result); + } + + if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.defaultMessageStore.unlockMappedFile(unlockMappedFile); + } + + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); + + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); + + + handleDiskFlush(result, putMessageResult, messageExtBatch); + + handleHA(result, putMessageResult, messageExtBatch); + return putMessageResult; } /** - * According to receive certain message or offset storage time if an error - * occurs, it returns -1 + * According to receive certain message or offset storage time if an error occurs, it returns -1 */ public long pickupStoreTimestamp(final long offset, final int size) { if (offset >= this.getMinOffset()) { @@ -1098,6 +1204,8 @@ class DefaultAppendMessageCallback implements AppendMessageCallback { // Build Message Key private final StringBuilder keyBuilder = new StringBuilder(); + private final StringBuilder msgIdBuilder = new StringBuilder(); + private final ByteBuffer hostHolder = ByteBuffer.allocate(8); DefaultAppendMessageCallback(final int size) { @@ -1110,7 +1218,8 @@ public ByteBuffer getMsgStoreItemMemory() { return msgStoreItemMemory; } - public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { + public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, + final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
// PHY OFFSET @@ -1218,7 +1327,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer // 12 STOREHOSTADDRESS this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); - //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes()); + //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset @@ -1257,9 +1366,202 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer return result; } + public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, + final MessageExtBatch messageExtBatch) { + byteBuffer.mark(); + //physical offset + long wroteOffset = fileFromOffset + byteBuffer.position(); + // Record ConsumeQueue information + keyBuilder.setLength(0); + keyBuilder.append(messageExtBatch.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(messageExtBatch.getQueueId()); + String key = keyBuilder.toString(); + Long queueOffset = CommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + CommitLog.this.topicQueueTable.put(key, queueOffset); + } + long beginQueueOffset = queueOffset; + int totalMsgLen = 0; + int msgNum = 0; + msgIdBuilder.setLength(0); + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); + ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff(); + this.resetByteBuffer(hostHolder, 8); + ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder); + messagesByteBuff.mark(); + while (messagesByteBuff.hasRemaining()) { + // 1 TOTALSIZE + final int msgPos = messagesByteBuff.position(); + final int msgLen = messagesByteBuff.getInt(); + final int bodyLen = msgLen - 40; //only for log, just estimate it + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + + ", maxMessageSize: " + this.maxMessageSize); + return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); + } + totalMsgLen += msgLen; + // Determines whether there is sufficient free space + if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { + this.resetByteBuffer(this.msgStoreItemMemory, 8); + // 1 TOTALSIZE + this.msgStoreItemMemory.putInt(maxBlank); + // 2 MAGICCODE + this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); + // 3 The remaining space may be any value + // + //ignore previous read + messagesByteBuff.reset(); + // Here the length of the specially set maxBlank + byteBuffer.reset(); //ignore the previous appended messages + byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), + beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + } + //move to add queue offset and commitlog offset + messagesByteBuff.position(msgPos + 20); + messagesByteBuff.putLong(queueOffset); + messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen); + + storeHostBytes.rewind(); + String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen); + if (msgIdBuilder.length() > 0) { + msgIdBuilder.append(',').append(msgId); + } else { + msgIdBuilder.append(msgId); + } + queueOffset++; + msgNum++; + messagesByteBuff.position(msgPos + msgLen); + } + + messagesByteBuff.position(0); + messagesByteBuff.limit(totalMsgLen); + byteBuffer.put(messagesByteBuff); + messageExtBatch.setEncodedBuff(null); + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(), + messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + result.setMsgNum(msgNum); + CommitLog.this.topicQueueTable.put(key, queueOffset); + + return result; + } + + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { + byteBuffer.flip(); + byteBuffer.limit(limit); + } + + } + + public static class MessageExtBatchEncoder { + // Store the message content + private final ByteBuffer msgBatchMemory; + // The maximum length of the message + private final int maxMessageSize; + + private final ByteBuffer hostHolder = ByteBuffer.allocate(8); + + MessageExtBatchEncoder(final int size) { + this.msgBatchMemory = ByteBuffer.allocateDirect(size); + this.maxMessageSize = size; + } + + + public ByteBuffer encode(final MessageExtBatch messageExtBatch) { + msgBatchMemory.clear(); //not thread-safe + int totalMsgLen = 0; + ByteBuffer messagesByteBuff = messageExtBatch.wrap(); + while (messagesByteBuff.hasRemaining()) { + // 1 TOTALSIZE + messagesByteBuff.getInt(); + // 2 MAGICCODE + messagesByteBuff.getInt(); + // 3 BODYCRC + messagesByteBuff.getInt(); + // 4 FLAG + int flag = messagesByteBuff.getInt(); + // 5 BODY + int bodyLen = messagesByteBuff.getInt(); + int bodyPos = messagesByteBuff.position(); + int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen); + messagesByteBuff.position(bodyPos + bodyLen); + // 6 properties + short propertiesLen = messagesByteBuff.getShort(); + int propertiesPos = messagesByteBuff.position(); + messagesByteBuff.position(propertiesPos + propertiesLen); + + final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + + final int topicLength = topicData.length; + + final int msgLen = calMsgLength(bodyLen, topicLength, propertiesLen); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + + ", maxMessageSize: " + this.maxMessageSize); + throw new RuntimeException("message size exceeded"); + } + + totalMsgLen += msgLen; + // Determines whether there is sufficient free space + if (totalMsgLen > maxMessageSize) { + throw new RuntimeException("message size exceeded"); + } + + // 1 TOTALSIZE + this.msgBatchMemory.putInt(msgLen); + // 2 MAGICCODE + this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + this.msgBatchMemory.putInt(bodyCrc); + // 4 QUEUEID + this.msgBatchMemory.putInt(messageExtBatch.getQueueId()); + // 5 FLAG + this.msgBatchMemory.putInt(flag); + // 6 QUEUEOFFSET + this.msgBatchMemory.putLong(0); + // 7 PHYSICALOFFSET + this.msgBatchMemory.putLong(0); + // 8 SYSFLAG + this.msgBatchMemory.putInt(messageExtBatch.getSysFlag()); + // 9 BORNTIMESTAMP + this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp()); + // 10 BORNHOST + this.resetByteBuffer(hostHolder, 8); + this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(hostHolder)); + // 11 STORETIMESTAMP + this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + this.resetByteBuffer(hostHolder, 8); + this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(hostHolder)); + // 13 RECONSUMETIMES + this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes()); + // 14 Prepared Transaction Offset, batch does not support transaction + this.msgBatchMemory.putLong(0); + // 15 BODY + this.msgBatchMemory.putInt(bodyLen); + if (bodyLen > 0) + this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); + // 16 TOPIC + this.msgBatchMemory.put((byte) topicLength); + this.msgBatchMemory.put(topicData); + // 17 PROPERTIES + this.msgBatchMemory.putShort(propertiesLen); + if (propertiesLen > 0) + this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); + } + msgBatchMemory.flip(); + return msgBatchMemory; + } + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); } + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index dc1c96c0a15..eeb359808d1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() { public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, long logicOffset) { final int maxRetries = 30; - boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset); if (result) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 2594ef32341..5c2d27f1f36 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.sysflag.MessageSysFlag; @@ -325,6 +326,62 @@ public PutMessageResult putMessage(MessageExtBrokerInner msg) { return result; } + public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { + if (this.shutdown) { + log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } + + if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { + log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); + } + + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } + + if (!this.runningFlags.isWriteable()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { + log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits()); + } + + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } else { + this.printTimes.set(0); + } + + if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { + log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { + log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + if (this.isOSPageCacheBusy()) { + return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + } + + long beginTime = this.getSystemClock().now(); + PutMessageResult result = this.commitLog.putMessages(messageExtBatch); + + long eclipseTime = this.getSystemClock().now() - beginTime; + if (eclipseTime > 500) { + log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length); + } + this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); + + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } + + return result; + } + @Override public boolean isOSPageCacheBusy() { long begin = this.getCommitLog().getBeginTimeInLock(); diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index feb505d6dba..5cb72ce2dd7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.util.LibC; import org.slf4j.Logger; @@ -187,7 +189,15 @@ public FileChannel getFileChannel() { } public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { - assert msg != null; + return appendMessagesInner(msg, cb); + } + + public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) { + return appendMessagesInner(messageExtBatch, cb); + } + + public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { + assert messageExt != null; assert cb != null; int currentPos = this.wrotePosition.get(); @@ -195,30 +205,28 @@ public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); - AppendMessageResult result = - cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); + AppendMessageResult result = null; + if (messageExt instanceof MessageExtBrokerInner) { + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); + } else if (messageExt instanceof MessageExtBatch) { + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt); + } else { + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } - - log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: " - + this.fileSize); + log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } - /** - */ public long getFileFromOffset() { return this.fileFromOffset; } - /** - - * - */ public boolean appendMessage(final byte[] data) { int currentPos = this.wrotePosition.get(); diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 79e3a8fd1c8..65c546b706c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public interface MessageStore { @@ -33,6 +34,8 @@ public interface MessageStore { PutMessageResult putMessage(final MessageExtBrokerInner msg); + PutMessageResult putMessages(final MessageExtBatch messageExtBatch); + GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final SubscriptionData subscriptionData); diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java index 4f610b8afae..3dcd8611ab4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java +++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java @@ -27,6 +27,8 @@ public class RunningFlags { private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3; private static final int DISK_FULL_BIT = 1 << 4; + + private volatile int flagBits = 0; public RunningFlags() { @@ -76,6 +78,15 @@ public boolean isWriteable() { return false; } + //for consume queue, just ignore the DISK_FULL_BIT + public boolean isCQWriteable() { + if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) { + return true; + } + + return false; + } + public boolean getAndMakeNotWriteable() { boolean result = this.isWriteable(); if (result) { diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 9cfd1c34caa..7ae2ab5c37b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -132,6 +132,7 @@ public class MessageStoreConfig { private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -629,4 +630,5 @@ public int getCommitCommitLogThoroughInterval() { public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 29ec71ecc99..1515eb47713 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -123,7 +123,9 @@ public StatsItem getStatsItem(final String statsName, final String statsKey) { public void incTopicPutNums(final String topic) { this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); } - + public void incTopicPutNums(final String topic, int num, int times) { + this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times); + } public void incTopicPutSize(final String topic, final int size) { this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1); } @@ -154,7 +156,9 @@ public void incGroupGetLatency(final String group, final String topic, final int public void incBrokerPutNums() { this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); } - + public void incBrokerPutNums(final int incValue) { + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); + } public void incBrokerGetNums(final int incValue) { this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); } diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java new file mode 100644 index 00000000000..fc667b6318b --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + + +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class AppendCallbackTest { + + AppendMessageCallback callback; + + CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024); + + @Before + public void init() throws Exception{ + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore"); + messageStoreConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog"); + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null); + CommitLog commitLog = new CommitLog(messageStore); + callback = commitLog.new DefaultAppendMessageCallback(1024); + } + + + @Test + public void testAppendMessageBatchEndOfFile() throws Exception{ + List messages = new ArrayList<>(); + String topic = "test-topic"; + int queue= 0; + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + messages.add(msg); + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(queue); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124)); + messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + //encounter end of file when append half of the data + AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); + assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); + assertEquals(0, result.getWroteOffset()); + assertEquals(0, result.getLogicsOffset()); + assertEquals(1000, result.getWroteBytes()); + assertEquals(8, buff.position()); //write blank size and magic value + + assertTrue(result.getMsgId().length() > 0); //should have already constructed some message ids + } + @Test + public void testAppendMessageBatchSucc() throws Exception { + List messages = new ArrayList<>(); + String topic = "test-topic"; + int queue= 0; + for (int i = 0; i < 10; i++) { + Message msg = new Message(); + msg.setBody("body".getBytes()); + msg.setTopic(topic); + msg.setTags("abc"); + messages.add(msg); + } + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(topic); + messageExtBatch.setQueueId(queue); + messageExtBatch.setBornTimestamp(System.currentTimeMillis()); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124)); + messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + ByteBuffer buff = ByteBuffer.allocate(1024 * 10); + AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); + + assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); + assertEquals(0, allresult.getWroteOffset()); + assertEquals(0, allresult.getLogicsOffset()); + assertEquals(buff.position(), allresult.getWroteBytes()); + + assertEquals(messages.size(), allresult.getMsgNum()); + + Set msgIds = new HashSet<>(); + for (String msgId: allresult.getMsgId().split(",")) { + assertEquals(32, msgId.length()); + msgIds.add(msgId); + } + assertEquals(messages.size(), msgIds.size()); + + List decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip()); + assertEquals(decodeMsgs.size(), decodeMsgs.size()); + long queueOffset = decodeMsgs.get(0).getQueueOffset(); + long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp(); + for (int i = 0; i < messages.size(); i++) { + assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic()); + assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody())); + assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags()); + + assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString()); + + assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp()); + assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp()); + assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset()); + } + + } + +} diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java new file mode 100644 index 00000000000..e372a1b2601 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.batch; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.apache.rocketmq.test.util.RandomUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BatchSendIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private String topic = null; + private Random random = new Random(); + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testBatchSend_ViewMessage() throws Exception { + List messageList = new ArrayList<>(); + int batchNum = 100; + for (int i = 0; i < batchNum; i++) { + messageList.add(new Message(topic, RandomUtils.getStringByUUID().getBytes())); + } + + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + SendResult sendResult = producer.send(messageList); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + + String[] offsetIds = sendResult.getOffsetMsgId().split(","); + String[] msgIds = sendResult.getMsgId().split(","); + Assert.assertEquals(messageList.size(), offsetIds.length); + Assert.assertEquals(messageList.size(), msgIds.length); + + Thread.sleep(2000); + + for (int i = 0; i < 3; i++) { + producer.viewMessage(offsetIds[random.nextInt(batchNum)]); + } + for (int i = 0; i < 3; i++) { + producer.viewMessage(topic, msgIds[random.nextInt(batchNum)]); + } + } + + + @Test + public void testBatchSend_CheckProperties() throws Exception { + List messageList = new ArrayList<>(); + Message message = new Message(); + message.setTopic(topic); + message.setKeys("keys123"); + message.setTags("tags123"); + message.setWaitStoreMsgOK(false); + message.setBuyerId("buyerid123"); + message.setFlag(123); + message.setBody("body".getBytes()); + messageList.add(message); + + + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + SendResult sendResult = producer.send(messageList); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + + String[] offsetIds = sendResult.getOffsetMsgId().split(","); + String[] msgIds = sendResult.getMsgId().split(","); + Assert.assertEquals(messageList.size(), offsetIds.length); + Assert.assertEquals(messageList.size(), msgIds.length); + + Thread.sleep(2000); + + Message messageByOffset = producer.viewMessage(offsetIds[0]); + Message messageByMsgId = producer.viewMessage(topic, msgIds[0]); + + System.out.println(messageByOffset); + System.out.println(messageByMsgId); + + Assert.assertEquals(message.getTopic(), messageByMsgId.getTopic()); + Assert.assertEquals(message.getTopic(), messageByOffset.getTopic()); + + Assert.assertEquals(message.getKeys(), messageByOffset.getKeys()); + Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys()); + + Assert.assertEquals(message.getTags(), messageByOffset.getTags()); + Assert.assertEquals(message.getTags(), messageByMsgId.getTags()); + + Assert.assertEquals(message.isWaitStoreMsgOK(), messageByOffset.isWaitStoreMsgOK()); + Assert.assertEquals(message.isWaitStoreMsgOK(), messageByMsgId.isWaitStoreMsgOK()); + + Assert.assertEquals(message.getBuyerId(), messageByOffset.getBuyerId()); + Assert.assertEquals(message.getBuyerId(), messageByMsgId.getBuyerId()); + + Assert.assertEquals(message.getFlag(), messageByOffset.getFlag()); + Assert.assertEquals(message.getFlag(), messageByMsgId.getFlag()); + } + +} diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java index 6fa8bd9bb07..716ac511afa 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java @@ -61,7 +61,7 @@ public void testProducerSmoke() { @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) public void testSynSendNullMessage() throws Exception { - producer.send(null); + producer.send((Message) null); } @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)