Skip to content

Commit

Permalink
消费轨迹。
Browse files Browse the repository at this point in the history
  • Loading branch information
YangJodie committed Aug 15, 2014
1 parent a78eb2a commit 2aa30e2
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class ConsumeMessageContext {
private Integer queueId;
private String clientHost;
private String storeHost;
private Map<Long, String> messageIds;
private Map<String, Long> messageIds;
private int bodyLength;
private boolean success;
private String status;
private Object mqTraceContext;
Expand Down Expand Up @@ -80,12 +81,12 @@ public void setStoreHost(String storeHost) {
}


public Map<Long, String> getMessageIds() {
public Map<String, Long> getMessageIds() {
return messageIds;
}


public void setMessageIds(Map<Long, String> messageIds) {
public void setMessageIds(Map<String, Long> messageIds) {
this.messageIds = messageIds;
}

Expand Down Expand Up @@ -118,4 +119,14 @@ public Object getMqTraceContext() {
public void setMqTraceContext(Object mqTraceContext) {
this.mqTraceContext = mqTraceContext;
}


public int getBodyLength() {
return bodyLength;
}


public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.common.MQVersion;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.TopicConfig;
Expand Down Expand Up @@ -826,7 +827,7 @@ private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, Remoting
context.setStoreHost(this.brokerController.getBrokerAddr());
context.setQueueId(requestHeader.getQueueId());
context.setSuccess(true);

context.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
final SocketAddress storeHost =
new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
.getNettyServerConfig().getListenPort());
Expand All @@ -835,7 +836,7 @@ private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, Remoting
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
Map<Long, String> messageIds =
Map<String, Long> messageIds =
this.brokerController.getMessageStore()
.getMessageIds(requestHeader.getTopic(), requestHeader.getQueueId(), preOffset + 1,
requestHeader.getCommitOffset(), storeHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,14 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
final SocketAddress storeHost =
new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(),
brokerController.getNettyServerConfig().getListenPort());
Map<Long, String> messageIds =
Map<String, Long> messageIds =
this.brokerController.getMessageStore().getMessageIds(requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(),
requestHeader.getQueueOffset() + getMessageResult.getMessageCount(),
storeHost);
context.setMessageIds(messageIds);
context.setBodyLength(getMessageResult.getBufferTotalSize()
/ getMessageResult.getMessageCount());
this.executeConsumeMessageHookBefore(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.TopicFilterType;
Expand Down Expand Up @@ -124,8 +125,9 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
context.setClientHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
context.setStoreHost(this.brokerController.getBrokerAddr());
context.setSuccess(false);
Map<Long, String> messageIds = new HashMap<Long, String>();
messageIds.put(requestHeader.getOffset(), requestHeader.getMsgId());
context.setStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER.toString());
Map<String, Long> messageIds = new HashMap<String, Long>();
messageIds.put(requestHeader.getMsgId(), requestHeader.getOffset());
context.setMessageIds(messageIds);
this.executeConsumeMessageHookAfter(context);
}
Expand Down Expand Up @@ -502,7 +504,6 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final Remot
if (hasSendMessageHook()) {
mqtraceContext.setMsgId(responseHeader.getMsgId());
mqtraceContext.setQueueOffset(responseHeader.getQueueOffset());
this.executeSendMessageHookAfter(response, mqtraceContext);
}
return null;
}
Expand Down Expand Up @@ -568,11 +569,8 @@ public void executeSendMessageHookAfter(final RemotingCommand response, final Se
if (response != null) {
context.setCode(response.getCode());
context.setErrorMsg(response.getRemark());
hook.sendMessageAfter(context);
}
else {
hook.sendMessageAfter(context);
}
hook.sendMessageAfter(context);
}
catch (Throwable e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1779,9 +1779,9 @@ public int cleanUnusedTopic(Set<String> topics) {
}


public Map<Long, String> getMessageIds(final String topic, final int queueId, long minOffset,
public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset,
long maxOffset, SocketAddress storeHost) {
Map<Long, String> messageIds = new HashMap<Long, String>();
Map<String, Long> messageIds = new HashMap<String, Long>();
if (this.shutdown) {
return messageIds;
}
Expand All @@ -1807,7 +1807,7 @@ public Map<Long, String> getMessageIds(final String topic, final int queueId, lo
String msgId =
MessageDecoder.createMessageId(msgIdMemory,
MessageExt.SocketAddress2ByteBuffer(storeHost), offsetPy);
messageIds.put(nextOffset++, msgId);
messageIds.put(msgId, nextOffset++);
if (nextOffset > maxOffset) {
return messageIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@ public QueryMessageResult queryMessage(final String topic, final String key, fin
/**
* 批量获取 messageId
*/
public Map<Long, String> getMessageIds(final String topic, int queueId, long minOffset,
public Map<String, Long> getMessageIds(final String topic, int queueId, long minOffset,
final long maxOffset, SocketAddress storeHost);
}

0 comments on commit 2aa30e2

Please sign in to comment.