Skip to content

Commit

Permalink
Include client IP per message queue of consumer progress command output
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhanhui authored and dongeforever committed Jun 6, 2017
1 parent 3401296 commit 8c793c0
Showing 1 changed file with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
*/
package org.apache.rocketmq.tools.command.consumer;

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
Expand All @@ -30,7 +26,9 @@
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
Expand All @@ -40,6 +38,13 @@
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class ConsumerProgressSubCommand implements SubCommand {
private final Logger log = ClientLogger.getLog();

Expand All @@ -62,6 +67,24 @@ public Options buildCommandlineOptions(Options options) {
return options;
}

private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
String groupName) {
Map<MessageQueue, String> results = new HashMap<>();
try {
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
for (Connection connection : consumerConnection.getConnectionSet()) {
String clientId = connection.getClientId();
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
false);
for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
results.put(messageQueue, clientId.split("@")[0]);
}
}
} catch (Exception ignore) {
}
return results;
}

@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
Expand All @@ -75,13 +98,14 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
List<MessageQueue> mqList = new LinkedList<MessageQueue>();
mqList.addAll(consumeStats.getOffsetTable().keySet());
Collections.sort(mqList);

System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n",
Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %-20s %s%n",
"#Topic",
"#Broker Name",
"#QID",
"#Broker Offset",
"#Consumer Offset",
"#Client IP",
"#Diff",
"#LastTime");

Expand All @@ -95,12 +119,15 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
} catch (Exception e) {
}
System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n",

String clientIP = messageQueueAllocationResult.get(mq);
System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20s %-20d %s%n",
UtilAll.frontStringAtLeast(mq.getTopic(), 32),
UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
mq.getQueueId(),
offsetWrapper.getBrokerOffset(),
offsetWrapper.getConsumerOffset(),
null != clientIP ? clientIP : "NA",
diff,
lastTime
);
Expand Down

0 comments on commit 8c793c0

Please sign in to comment.