From c9bc70a141d393c0b7334b0ff866fb8298bfe5c5 Mon Sep 17 00:00:00 2001 From: vongosling Date: Sun, 18 Sep 2016 16:14:20 +0800 Subject: [PATCH] Update --- rocketmq-tools/pom.xml | 5 - .../broker/UpdateBrokerConfigSubCommand.java | 10 +- .../cluster/ClusterListSubCommand.java | 30 +- .../consumer/ConsumeOffsetRankSubCommand.java | 368 ------------------ .../command/stats/TpsStatsSubCommand.java | 316 --------------- 5 files changed, 32 insertions(+), 697 deletions(-) delete mode 100644 rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumeOffsetRankSubCommand.java delete mode 100644 rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/TpsStatsSubCommand.java diff --git a/rocketmq-tools/pom.xml b/rocketmq-tools/pom.xml index 1c78a643d..3ebefbff5 100644 --- a/rocketmq-tools/pom.xml +++ b/rocketmq-tools/pom.xml @@ -63,10 +63,5 @@ github-api 1.59 - - com.taobao.tlog - tlog-client - 2.0.0-SNAPSHOT - diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java index 402e3fe40..86938a7b8 100644 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java @@ -86,7 +86,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { defaultMQAdminExt.start(); defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties); - System.out.printf("update broker config success, %s%n", brokerAddr); + System.out.printf("update broker config success, %s\n", brokerAddr); return; } else if (commandLine.hasOption('c')) { @@ -97,8 +97,12 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String brokerAddr : masterSet) { - defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties); - System.out.printf("update broker config success, %s%n", brokerAddr); + try { + defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties); + System.out.printf("update broker config success, %s\n", brokerAddr); + } catch (Exception e) { + e.printStackTrace(); + } } return; } diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java index f87f1c51e..e29371b8a 100644 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -174,7 +174,7 @@ private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) thr ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); - System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s%n",// + System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",// "#Cluster Name",// "#Broker Name",// "#BID",// @@ -182,7 +182,9 @@ private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) thr "#Version",// "#InTPS(LOAD)",// "#OutTPS(LOAD)",// - "#PCWait(ms)"// + "#PCWait(ms)",// + "#Hour",// + "#SPACE"// ); Iterator>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator(); @@ -207,6 +209,8 @@ private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) thr String sendThreadPoolQueueHeadWaitTimeMills = ""; String pullThreadPoolQueueHeadWaitTimeMills = ""; String pageCacheLockTimeMills = ""; + String earliestMessageTimeStamp = ""; + String commitLogDiskRatio = ""; try { KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(next1.getValue()); String putTps = kvTable.getTable().get("putTps"); @@ -220,6 +224,8 @@ private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) thr sendThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills"); pullThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills"); pageCacheLockTimeMills = kvTable.getTable().get("pageCacheLockTimeMills"); + earliestMessageTimeStamp = kvTable.getTable().get("earliestMessageTimeStamp"); + commitLogDiskRatio = kvTable.getTable().get("commitLogDiskRatio"); version = kvTable.getTable().get("brokerVersionDesc"); { @@ -238,15 +244,29 @@ private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) thr } catch (Exception e) { } - System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s%n",// + double hour = 0.0; + double space = 0.0; + + if (earliestMessageTimeStamp != null && earliestMessageTimeStamp.length() > 0) { + long mills = System.currentTimeMillis() - Long.valueOf(earliestMessageTimeStamp); + hour = mills / 1000.0 / 60.0 / 60.0; + } + + if (commitLogDiskRatio != null && commitLogDiskRatio.length() > 0) { + space = Double.valueOf(commitLogDiskRatio); + } + + System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",// clusterName,// brokerName,// - String.valueOf(next1.getKey()),// + next1.getKey().longValue(),// next1.getValue(),// version,// String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),// String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),// - pageCacheLockTimeMills + pageCacheLockTimeMills,// + String.format("%2.2f", hour),// + String.format("%.4f", space)// ); } } diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumeOffsetRankSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumeOffsetRankSubCommand.java deleted file mode 100644 index b6c249a35..000000000 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumeOffsetRankSubCommand.java +++ /dev/null @@ -1,368 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.tools.command.consumer; - -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.MQVersion; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.admin.ConsumeStats; -import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; -import com.alibaba.rocketmq.common.protocol.body.ConsumeStatsList; -import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; -import com.alibaba.rocketmq.tools.command.SubCommand; -import com.taobao.tlog.client.JSONResult; -import com.taobao.tlog.client.KeyValueParam; -import com.taobao.tlog.client.KeyValueQuery; -import com.taobao.tlog.client.TLogQueryClient; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.slf4j.Logger; - -import java.util.*; - - -public class ConsumeOffsetRankSubCommand implements SubCommand { - private final Logger log = ClientLogger.getLog(); - - public static void main(String[] args) { - for (int i = 0; i < 1; ++i) { - long endTime = System.currentTimeMillis() - 60000; - long startTime = endTime - 60000; - - KeyValueQuery kvq = new KeyValueQuery("metaq_metaqstats", // StageId - "meta_stats_1min",// BizId - new KeyValueParam("type", "TOPIC_PUT_NUMS"), - new KeyValueParam("key", "TRADE"), - new KeyValueParam("date", startTime + "", endTime + "")); - - JSONResult queryData = TLogQueryClient.queryData("http://110.75.84.129:9999", kvq); - long tps = 0; - if (queryData != null) { - JSONResult.JSONRecord[] records = queryData.getRecords(); - for (JSONResult.JSONRecord r : records) { - tps += (Long) r.getValueByKeyName("sum"); - } - System.out.println(tps); - } - } - - - } - - @Override - public String commandName() { - return "consumeOffsetRank"; - } - - @Override - public String commandDesc() { - return "Query rank list of certain consume data"; - } - - @Override - public Options buildCommandlineOptions(Options options) { - Option opt = new Option("b", "brokerAddr", true, "Broker address"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("c", "clusterName", true, "cluster name for showing consume offset"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("t", "timeoutMillis", true, "request timeout Millis, default is 50000"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("a", "amount of rank list", true, "amount of rank list, default is 10"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } - - @Override - public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); - defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - Map consumeDataInfoMap = new HashMap(); - long timeoutMillis = 50000; - if (commandLine.hasOption('t')) { - timeoutMillis = Long.parseLong(commandLine.getOptionValue('t').trim()); - } - int amount = 10; - if (commandLine.hasOption('a')) { - amount = Integer.parseInt(commandLine.getOptionValue('a').trim()); - } - try { - defaultMQAdminExt.start(); - - if (commandLine.hasOption('b')) { - String brokerAddr = commandLine.getOptionValue('b').trim(); - List consumeDataInfoList = consumeRankInBroker(defaultMQAdminExt, brokerAddr, timeoutMillis); - if (null != consumeDataInfoList) - mergeConsumeDataInfoList(consumeDataInfoMap, consumeDataInfoList); - - printResult(defaultMQAdminExt, consumeDataInfoMap, amount); - } - - else { - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - if (null == clusterInfo) - return; - - if (commandLine.hasOption('c')) { - String clusterName = commandLine.getOptionValue('c').trim(); - printClusterConsumeDataInfo(clusterInfo, clusterName, defaultMQAdminExt, timeoutMillis, amount, consumeDataInfoMap); - } - - else { - for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) { - consumeDataInfoMap.clear(); - try { - System.out.println("consume offset rank list for cluster [" + clusterName + "]"); - printClusterConsumeDataInfo(clusterInfo, clusterName, defaultMQAdminExt, timeoutMillis, amount, consumeDataInfoMap); - System.out.println(); - } catch (Exception e) { - - System.out.println("get cluster [" + clusterName + "]" + "consume offset data error"); - System.out.println(); - } - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - defaultMQAdminExt.shutdown(); - } - } - - private List consumeRankInBroker(DefaultMQAdminExt defaultMQAdminExt, String brokerAddr, - long timeoutMillis) throws Exception { - ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, false, timeoutMillis); - List consumeDataInfoList = new ArrayList(); - for (Map> map : consumeStatsList.getConsumeStatsList()) { - for (String group : map.keySet()) { - List consumeStatsArray = map.get(group); - for (ConsumeStats consumeStats : consumeStatsArray) { - ConsumeDataInfo consumeDataInfo = new ConsumeDataInfo(); - consumeDataInfo.setGroup(group); - if (consumeStats != null) { - consumeDataInfo.setConsumeTps((int) consumeStats.getConsumeTps()); - consumeDataInfo.setDiffTotal(consumeStats.computeTotalDiff()); - } - consumeDataInfoList.add(consumeDataInfo); - } - } - } - return consumeDataInfoList; - } - - private void mergeConsumeDataInfoList(Map consumeDataInfoMap, List consumeDataInfoList) { - if (null == consumeDataInfoMap || null == consumeDataInfoList) - return; - for (ConsumeDataInfo consumeDataInfo : consumeDataInfoList) { - ConsumeDataInfo mergeConsumeDataInfo = consumeDataInfoMap.get(consumeDataInfo.getGroup()); - if (null == mergeConsumeDataInfo) { - mergeConsumeDataInfo = consumeDataInfo; - consumeDataInfoMap.put(mergeConsumeDataInfo.getGroup(), mergeConsumeDataInfo); - } else { - mergeConsumeDataInfo.setDiffTotal(mergeConsumeDataInfo.getDiffTotal() + consumeDataInfo.getDiffTotal()); - } - } - } - - private void printResult(DefaultMQAdminExt defaultMQAdminExt, Map consumeDataInfoMap, int amount) { - List consumeDataInfoRet = new ArrayList(); - for (String group : consumeDataInfoMap.keySet()) { - consumeDataInfoRet.add(consumeDataInfoMap.get(group)); - } - Collections.sort(consumeDataInfoRet); - System.out.printf("%-48s %-6s %-24s %-5s %-14s %-7s %s%n",// - "#Group",// - "#Count",// - "#Version",// - "#Type",// - "#Model",// - "#TPS",// - "#Diff Total"// - ); - int onlineCount = 0; - for (int i = 0; i < consumeDataInfoRet.size(); i++) { - if (onlineCount == amount) { - break; - } - ConsumeDataInfo consumeDataInfo = consumeDataInfoRet.get(i); - ConsumerConnection cc = null; - try { - cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumeDataInfo.getGroup()); - } catch (Exception e) { - log.error("examineConsumerConnectionInfo exception, " + consumeDataInfo.getGroup()); - } - if (cc != null) { - onlineCount++; - consumeDataInfo.setCount(cc.getConnectionSet().size()); - consumeDataInfo.setMessageModel(cc.getMessageModel()); - consumeDataInfo.setConsumeType(cc.getConsumeType()); - consumeDataInfo.setVersion(cc.computeMinVersion()); - System.out.printf("%-48s %-6d %-24s %-5s %-14s %-7d %d%n",// - UtilAll.frontStringAtLeast(consumeDataInfo.getGroup(), 32),// - consumeDataInfo.getCount(),// - consumeDataInfo.getCount() > 0 ? consumeDataInfo.versionDesc() : "OFFLINE",// - consumeDataInfo.consumeTypeDesc(),// - consumeDataInfo.messageModelDesc(),// - consumeDataInfo.getConsumeTps(),// - consumeDataInfo.getDiffTotal()// - ); - } - } - } - - private void printClusterConsumeDataInfo(ClusterInfo clusterInfo, String clusterName, DefaultMQAdminExt defaultMQAdminExt, - long timeoutMillis, int amount, Map consumeDataInfoMap) throws Exception { - Set brokerNameSet = clusterInfo.getClusterAddrTable().get(clusterName); - if (null == brokerNameSet) - return; - for (String brokerName : brokerNameSet) { - String brokerAddr = clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs().get(MixAll.MASTER_ID); - if (null != brokerAddr && brokerAddr.length() > 0) { - List consumeDataInfoList = consumeRankInBroker(defaultMQAdminExt, brokerAddr, timeoutMillis); - mergeConsumeDataInfoList(consumeDataInfoMap, consumeDataInfoList); - } - } - - printResult(defaultMQAdminExt, consumeDataInfoMap, amount); - } - - class ConsumeDataInfo implements Comparable { - private String group; - private int version; - private int count; - private ConsumeType consumeType; - private MessageModel messageModel; - private int consumeTps; - private long diffTotal; - private String clusterName; - - public String getGroup() { - return group; - } - - public void setGroup(String group) { - this.group = group; - } - - public String consumeTypeDesc() { - if (this.count != 0) { - return this.getConsumeType() == ConsumeType.CONSUME_ACTIVELY ? "PULL" : "PUSH"; - } - return ""; - } - - public ConsumeType getConsumeType() { - return consumeType; - } - - public void setConsumeType(ConsumeType consumeType) { - this.consumeType = consumeType; - } - - public String messageModelDesc() { - if (this.count != 0) { - return this.getMessageModel().toString(); - } - return ""; - } - - public MessageModel getMessageModel() { - return messageModel; - } - - public void setMessageModel(MessageModel messageModel) { - this.messageModel = messageModel; - } - - public String versionDesc() { - if (this.count != 0) { - return MQVersion.getVersionDesc(this.version); - } - return ""; - } - - @Override - public int compareTo(ConsumeDataInfo o) { - if (o.diffTotal == diffTotal) - return 0; - else if (o.diffTotal < diffTotal) - return -1; - else if (o.diffTotal > diffTotal) - return 1; - return 0; - } - - public int getCount() { - return count; - } - - public void setCount(int count) { - this.count = count; - } - - public long getDiffTotal() { - return diffTotal; - } - - - public void setDiffTotal(long diffTotal) { - this.diffTotal = diffTotal; - } - - public int getConsumeTps() { - return consumeTps; - } - - - public void setConsumeTps(int consumeTps) { - this.consumeTps = consumeTps; - } - - - public int getVersion() { - return version; - } - - - public void setVersion(int version) { - this.version = version; - } - - public String getClusterName() { - return clusterName; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - } - -} \ No newline at end of file diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/TpsStatsSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/TpsStatsSubCommand.java deleted file mode 100644 index 524a88b16..000000000 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/TpsStatsSubCommand.java +++ /dev/null @@ -1,316 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.tools.command.stats; - -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; -import com.alibaba.rocketmq.common.protocol.body.GroupList; -import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import com.alibaba.rocketmq.common.protocol.body.TopicList; -import com.alibaba.rocketmq.common.protocol.route.BrokerData; -import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.srvutil.ServerUtil; -import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; -import com.alibaba.rocketmq.tools.command.SubCommand; -import com.taobao.tlog.client.JSONResult; -import com.taobao.tlog.client.KeyValueParam; -import com.taobao.tlog.client.KeyValueQuery; -import com.taobao.tlog.client.TLogQueryClient; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; - -import java.util.*; - - -public class TpsStatsSubCommand implements SubCommand { - - private static final String PUT_TPS = "TOPIC_PUT_NUMS"; - private static final String GET_TPS = "GROUP_GET_NUMS"; - private static final String TLOG_DOMAIN = "http://110.75.84.129:9999"; - - class TpsDataInfo implements Comparable { - private String topicName; - private String groupName; - private int tps = 0; - private String clusterName; - - - @Override - public int compareTo(TpsDataInfo o) { - return o.tps - tps; - } - - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public String getGroupName() { - return groupName; - } - - public void setGroupName(String groupName) { - this.groupName = groupName; - } - - public int getTps() { - return tps; - } - - public void setTps(int tps) { - this.tps = tps; - } - - public String getClusterName() { - return clusterName; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - } - - @Override - public String commandName() { - return "tpsStats"; - } - - - @Override - public String commandDesc() { - return "produce and consume tps stats from tlog"; - } - - - @Override - public Options buildCommandlineOptions(Options options) { - Option opt = new Option("c", "clusterName", true, "cluster name"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("t", "topic", true, "topic name"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("g", "consumerGroup", true, "consumer group name"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("p", "put tps", false, "show put tps"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("s", "subscribe tps", false, "show consume tps"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("a", "amount of list", true, "amount of list, default is all"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } - - @Override - public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { - - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); - defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - try { - defaultMQAdminExt.start(); - int amount = -1; - String clusterName = ""; - if (commandLine.hasOption('a')) { - amount = Integer.parseInt(commandLine.getOptionValue('a').trim()); - } - if (commandLine.hasOption('c')) { - clusterName = commandLine.getOptionValue('c').trim(); - } - if (commandLine.hasOption('p')) { - List tpsDataInfoList = new ArrayList(); - if (clusterName.length() > 0) { - getClusterPutTps(defaultMQAdminExt, clusterName, tpsDataInfoList); - printClusterTps(tpsDataInfoList, amount, PUT_TPS); - } else { - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - if (null == clusterInfo) - return; - for (String cluster : clusterInfo.getClusterAddrTable().keySet()) { - tpsDataInfoList.clear(); - System.out.println("put tps for cluster [" + cluster + "]"); - getClusterPutTps(defaultMQAdminExt, cluster, tpsDataInfoList); - printClusterTps(tpsDataInfoList, amount, PUT_TPS); - System.out.println(); - } - } - return; - } - else if (commandLine.hasOption('s')) { - List tpsDataInfoList = new ArrayList(); - if (clusterName.length() > 0) { - getClusterGetTps(defaultMQAdminExt, clusterName, tpsDataInfoList); - printClusterTps(tpsDataInfoList, amount, GET_TPS); - } else { - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - if (null == clusterInfo) - return; - for (String cluster : clusterInfo.getClusterAddrTable().keySet()) { - tpsDataInfoList.clear(); - System.out.println("consume tps for cluster [" + cluster + "]"); - getClusterGetTps(defaultMQAdminExt, cluster, tpsDataInfoList); - printClusterTps(tpsDataInfoList, amount, PUT_TPS); - System.out.println(); - } - } - return; - } - ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); - } catch (Exception e) { - e.printStackTrace(); - } finally { - defaultMQAdminExt.shutdown(); - } - } - - - private Set getClusterTopics(DefaultMQAdminExt defaultMQAdminExt, String clusterName) throws Exception { - Set topics = new HashSet(); - TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - for (String topic : topicList.getTopicList()) { - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) - || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) { - continue; - } - TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); - BrokerData brokerData = topicRouteData.getBrokerDatas().get(0); - String brokerName = brokerData.getBrokerName(); - Iterator>> it = clusterInfo.getClusterAddrTable().entrySet().iterator(); - while (it.hasNext()) { - Map.Entry> next = it.next(); - if (next.getValue().contains(brokerName)) { - if (next.getKey().equals(clusterName)) { - topics.add(topic); - } - break; - } - } - } - return topics; - } - - - private void getClusterPutTps(DefaultMQAdminExt defaultMQAdminExt, String clusterName, List tpsDataInfoList) throws Exception { - Set topics = getClusterTopics(defaultMQAdminExt, clusterName); - for (String topic : topics) { - TpsDataInfo tpsDataInfo = getTpsData(PUT_TPS, topic, clusterName, topic, ""); - if (null != tpsDataInfo) - tpsDataInfoList.add(tpsDataInfo); - } - } - - private void getClusterGetTps(DefaultMQAdminExt defaultMQAdminExt, String clusterName, List tpsDataInfoList) throws Exception { - ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); - Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); - String brokerAddr = ""; - - for (String brokerName : brokerNameSet) { - BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); - if (null != brokerData) - brokerAddr = brokerData.getBrokerAddrs().get(0L); - break; - } - TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getAllTopicGroup(brokerAddr, 5000); - for (String topic : topicConfigSerializeWrapper.getTopicConfigTable().keySet()) { - GroupList groupList = defaultMQAdminExt.queryTopicConsumeByWho(topic); - for (String group : groupList.getGroupList()) { - TpsDataInfo tpsDataInfo = getTpsData(GET_TPS, topic + "@" + group, clusterName, topic, group); - if (null != tpsDataInfo) - tpsDataInfoList.add(tpsDataInfo); - } - } - } - - - private TpsDataInfo getTpsData(String type, String key, String clusterName, String topic, String group) { - long endTime = System.currentTimeMillis() - 60000; - long startTime = endTime - 60000; - KeyValueQuery kvq = new KeyValueQuery("metaq_metaqstats", // StageId - "meta_stats_1min",// BizId - new KeyValueParam("type", type), - new KeyValueParam("key", key), //topic or topic@group - new KeyValueParam("date", startTime + "", endTime + "")); - - JSONResult queryData = TLogQueryClient.queryData(TLOG_DOMAIN, kvq); - if (queryData != null) { - JSONResult.JSONRecord[] records = queryData.getRecords(); - int tps = 0; - TpsDataInfo tpsDataInfo = new TpsDataInfo(); - tpsDataInfo.setClusterName(clusterName); - tpsDataInfo.setTopicName(topic); - tpsDataInfo.setGroupName(group); - if (records.length > 0) - tps = (Integer) records[records.length - 1].getValueByKeyName("sum"); - tpsDataInfo.setTps(tps); - return tpsDataInfo; - } - return null; - } - - private void printClusterTps(List tpsDataInfoList, int amount, String type) throws Exception { - if (tpsDataInfoList.size() > 0) { - Collections.sort(tpsDataInfoList); - if (type.equals(PUT_TPS)) { - System.out.printf("%-64s %11s%n",// - "#Topic Name",// - "#InTPS" - ); - } else if (type.equals(GET_TPS)) { - System.out.printf("%-64s %64s %11s%n",// - "#Group Name",// - "#Topic Name",// - "#InTPS" - ); - } - for (int i = 0; i < tpsDataInfoList.size(); i++) { - TpsDataInfo tpsDataInfo = tpsDataInfoList.get(i); - if (amount > 0 && i >= amount) - break; - if (type.equals(PUT_TPS)) { - System.out.printf("%-64s %11s%n",// - tpsDataInfo.getTopicName(), - tpsDataInfo.getTps() - ); - } else if (type.equals(GET_TPS)) { - System.out.printf("%-64s %64s %11s%n",// - tpsDataInfo.getGroupName(), - tpsDataInfo.getTopicName(), - tpsDataInfo.getTps() - ); - } - } - } - } - - -} \ No newline at end of file