From 4252fcc9738ce96b339af8ef272f1ee53f945008 Mon Sep 17 00:00:00 2001 From: vintagewang Date: Mon, 5 Jan 2015 17:24:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=87=86=E5=A4=87=E5=8F=91=E5=B8=833.2.4-beta1?= =?UTF-8?q?=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 5 +- develop.sh | 13 - install.sh | 2 +- pom.xml | 645 +++++++++--------- rocketmq-broker/pom.xml | 2 +- .../rocketmq/broker/BrokerStartup.java | 4 + .../broker/client/ConsumerGroupInfo.java | 21 +- .../DefaultConsumerIdsChangeListener.java | 3 +- .../broker/filtersrv/FilterServerUtil.java | 28 +- .../processor/AdminBrokerProcessor.java | 127 ++-- .../SubscriptionGroupManager.java | 6 + rocketmq-client/pom.xml | 2 +- .../alibaba/rocketmq/client/Validators.java | 6 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 117 ++-- .../consumer/DefaultMQPushConsumerImpl.java | 25 +- .../client/impl/factory/MQClientInstance.java | 53 +- .../impl/producer/DefaultMQProducerImpl.java | 32 +- .../impl/producer/TopicPublishInfo.java | 4 +- rocketmq-common/pom.xml | 2 +- .../alibaba/rocketmq/common/BrokerConfig.java | 6 +- .../alibaba/rocketmq/common/MQVersion.java | 2 +- .../com/alibaba/rocketmq/common/MixAll.java | 25 +- .../conflict/PackageConflictDetect.java | 37 + .../common/namesrv/TopAddressing.java | 24 +- .../rocketmq/common/protocol/RequestCode.java | 2 + .../common/protocol/body/BrokerStatsData.java | 43 ++ .../common/protocol/body/BrokerStatsItem.java | 37 + .../header/GetConsumeStatsRequestHeader.java | 11 + .../ViewBrokerStatsDataRequestHeader.java | 39 ++ .../common/stats/MomentStatsItem.java | 68 ++ .../common/stats/MomentStatsItemSet.java | 77 +++ .../rocketmq/common/stats/StatsItemSet.java | 14 +- rocketmq-example/pom.xml | 2 +- rocketmq-filtersrv/pom.xml | 2 +- .../rocketmq/filtersrv/FiltersrvStartup.java | 4 + rocketmq-namesrv/pom.xml | 2 +- .../rocketmq/namesrv/NamesrvStartup.java | 4 + rocketmq-remoting/pom.xml | 2 +- .../alibaba/rocketmq/remoting/RPCHook.java | 3 +- .../remoting/netty/NettyRemotingAbstract.java | 3 +- .../remoting/netty/NettyRemotingClient.java | 3 +- .../remoting/protocol/LanguageCode.java | 1 + rocketmq-srvutil/pom.xml | 2 +- rocketmq-store/pom.xml | 2 +- .../rocketmq/store/DefaultMessageStore.java | 77 ++- .../alibaba/rocketmq/store/MessageStore.java | 6 +- .../rocketmq/store/index/IndexFile.java | 15 +- .../store/stats/BrokerStatsManager.java | 134 ++-- .../store/DefaultMessageStoreTest.java | 13 +- .../alibaba/rocketmq/store/RecoverTest.java | 15 +- .../store/schedule/ScheduleMessageTest.java | 17 +- rocketmq-tools/pom.xml | 2 +- .../tools/admin/DefaultMQAdminExt.java | 32 +- .../tools/admin/DefaultMQAdminExtImpl.java | 139 ++-- .../rocketmq/tools/admin/MQAdminExt.java | 36 +- .../tools/command/MQAdminStartup.java | 93 +-- .../consumer/ConsumerProgressSubCommand.java | 27 +- .../command/stats/StatsAllSubCommand.java | 191 ++++++ sbin/github.sh | 6 + wiki/quickstart.md | 6 + 60 files changed, 1477 insertions(+), 844 deletions(-) delete mode 100644 develop.sh create mode 100644 rocketmq-common/src/main/java/com/alibaba/rocketmq/common/conflict/PackageConflictDetect.java create mode 100644 rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java create mode 100644 rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java create mode 100644 rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ViewBrokerStatsDataRequestHeader.java create mode 100644 rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java create mode 100644 rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java create mode 100644 rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java create mode 100644 sbin/github.sh create mode 100644 wiki/quickstart.md diff --git a/README.md b/README.md index 5a9b39305..58c685acd 100644 --- a/README.md +++ b/README.md @@ -32,9 +32,8 @@ RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点 ---------- ### 联系我们 - -* [向我们提交建议、BUG、寻求技术帮助](https://github.com/alibaba/RocketMQ/issues/new) -* 欢迎参与RocketMQ项目,只需在Github上fork、pull request即可。 +* [交流&建议](https://groups.google.com/forum/?hl=en#!forum/rocketmq) +* [Issues&Bugs](https://github.com/alibaba/RocketMQ/issues/new) * [到新浪微博交流RocketMQ](http://q.weibo.com/1628465) * 加入QQ群交流,[5776652](http://url.cn/Knxm0o) diff --git a/develop.sh b/develop.sh deleted file mode 100644 index 700b6f595..000000000 --- a/develop.sh +++ /dev/null @@ -1,13 +0,0 @@ -git pull - -rm -rf target -rm -f devenv - -if [ -z "$JAVA_HOME" ]; then - JAVA_HOME=/opt/taobao/java -fi - -export PATH=/opt/taobao/mvn/bin:$JAVA_HOME/bin:$PATH -mvn -Dmaven.test.skip=true clean package install assembly:assembly -U - -ln -s target/alibaba-rocketmq-3.2.0-SNAPSHOT.dir/alibaba-rocketmq devenv diff --git a/install.sh b/install.sh index 41e9f5efa..c3bd65806 100644 --- a/install.sh +++ b/install.sh @@ -8,4 +8,4 @@ fi export PATH=/opt/taobao/mvn/bin:$JAVA_HOME/bin:$PATH mvn -Dmaven.test.skip=true clean package install assembly:assembly -U -ln -s target/alibaba-rocketmq-3.2.0-SNAPSHOT.dir/alibaba-rocketmq devenv +ln -s target/alibaba-rocketmq.dir/alibaba-rocketmq devenv diff --git a/pom.xml b/pom.xml index 1ec5161fe..1f0e38d06 100644 --- a/pom.xml +++ b/pom.xml @@ -1,156 +1,155 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - + + 4.0.0 + 2012 + com.alibaba.rocketmq + rocketmq-all + 3.2.4-SNAPSHOT + pom + rocketmq-all ${project.version} + https://github.com/alibaba/rocketmq + https://github.com/alibaba/RocketMQ/blob/develop/README.md - 4.0.0 - 2012 - com.alibaba.rocketmq - rocketmq-all - 3.2.2 - pom - rocketmq-all ${project.version} - https://github.com/alibaba/rocketmq - https://github.com/alibaba/RocketMQ/blob/develop/README.md + + rocketmq-client + rocketmq-common + rocketmq-broker + rocketmq-tools + rocketmq-store + rocketmq-namesrv + rocketmq-remoting + rocketmq-example + rocketmq-filtersrv + rocketmq-srvutil + - - rocketmq-client - rocketmq-common - rocketmq-broker - rocketmq-tools - rocketmq-store - rocketmq-namesrv - rocketmq-remoting - rocketmq-example - rocketmq-filtersrv - rocketmq-srvutil - + + + vintagewang + https://github.com/vintagewang + vintage.wang@gmail.com + 8 + + + manhong + https://github.com/YangJodie + manhong.yqd@alibaba-inc.com + 8 + + + allenzhu + https://github.com/allenzhu + allen.jie.zhu@gmail.com + 8 + + - - - vintagewang - https://github.com/vintagewang - vintage.wang@gmail.com - 8 - - - manhong - https://github.com/YangJodie - manhong.yqd@alibaba-inc.com - 8 - - - allenzhu - https://github.com/allenzhu - allen.jie.zhu@gmail.com - 8 - - + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + + - - - Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0 - - + + http://gitlab.alibaba-inc.com/middleware/rocketmq.git + scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git + scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git + - - http://gitlab.alibaba-inc.com/middleware/rocketmq.git - scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git - scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git - + + UTF-8 + + true + true + true + + 1.6 + 1.6 + UTF-8 + - - UTF-8 - - true - true - true - - 1.6 - 1.6 - UTF-8 - + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + ${java_source_version} + ${java_target_version} + ${file_encoding} + true + true + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.5.1 + + true + false + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.3 + + ${maven.test.skip} + -Xms512m -Xmx1024m + once + + **/*Test.java + + + com/alibaba/rocketmq/remoting/ExceptionTest.java + com/alibaba/rocketmq/remoting/SyncInvokeTest.java + com/alibaba/rocketmq/remoting/NettyIdleTest.java + com/alibaba/rocketmq/remoting/NettyConnectionTest.java + com/alibaba/rocketmq/common/filter/PolishExprTest.java + com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java + + com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java + + com/alibaba/rocketmq/store/RecoverTest.java + com/alibaba/rocketmq/broker/api/SendMessageTest.java + com/alibaba/rocketmq/test/integration/*/*.java + com/alibaba/rocketmq/test/integration/BaseTest.java + com/alibaba/rocketmq/test/*/*.java + com/alibaba/rocketmq/test/BaseTest.java + + + - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.3.2 - - ${java_source_version} - ${java_target_version} - ${file_encoding} - true - true - - - - org.apache.maven.plugins - maven-eclipse-plugin - 2.5.1 - - true - false - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.3 - - ${maven.test.skip} - -Xms512m -Xmx1024m - once - - **/*Test.java - - - com/alibaba/rocketmq/remoting/ExceptionTest.java - com/alibaba/rocketmq/remoting/SyncInvokeTest.java - com/alibaba/rocketmq/remoting/NettyIdleTest.java - com/alibaba/rocketmq/remoting/NettyConnectionTest.java - com/alibaba/rocketmq/common/filter/PolishExprTest.java - com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java - - com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java - - com/alibaba/rocketmq/store/RecoverTest.java - com/alibaba/rocketmq/broker/api/SendMessageTest.java - com/alibaba/rocketmq/test/integration/*/*.java - com/alibaba/rocketmq/test/integration/BaseTest.java - com/alibaba/rocketmq/test/*/*.java - com/alibaba/rocketmq/test/BaseTest.java - - - - - - maven-assembly-plugin - - alibaba-rocketmq-${project.version} - - release.xml - - - + + maven-assembly-plugin + + alibaba-rocketmq + + release.xml + + + - - org.apache.maven.plugins - maven-javadoc-plugin - 2.7 - - - attach-javadocs - - jar - - - - - ${maven.jdoc.skip} - ${file_encoding} - ${file_encoding} - org.jboss.apiviz.APIviz - - org.jboss.apiviz - apiviz - 1.3.0.GA - - true - true - true - true - true - - - - org.apache.maven.plugins - maven-source-plugin - 2.1.2 - - - attach-sources - - jar - - - - - - org.codehaus.mojo - clirr-maven-plugin - 2.6.1 - - + + org.apache.maven.plugins + maven-javadoc-plugin + 2.7 + + + attach-javadocs + + jar + + + + + ${maven.jdoc.skip} + ${file_encoding} + ${file_encoding} + org.jboss.apiviz.APIviz + + org.jboss.apiviz + apiviz + 1.3.0.GA + + true + true + true + true + true + + + + org.apache.maven.plugins + maven-source-plugin + 2.1.2 + + + attach-sources + + jar + + + + + + org.codehaus.mojo + clirr-maven-plugin + 2.6.1 + + - - - src/main/resources - false - - - + + + src/main/resources + false + + + - - - release-sign-artifacts - - - performRelease - true - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.1 - - - sign-artifacts - verify - - sign - - - - - - - - + + + release-sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.1 + + + sign-artifacts + verify + + sign + + + + + + + + - - - - ${project.groupId} - rocketmq-client - ${project.version} - - - ${project.groupId} - rocketmq-broker - ${project.version} - - - ${project.groupId} - rocketmq-common - ${project.version} - - - ${project.groupId} - rocketmq-store - ${project.version} - - - ${project.groupId} - rocketmq-namesrv - ${project.version} - - - ${project.groupId} - rocketmq-tools - ${project.version} - - - ${project.groupId} - rocketmq-remoting - ${project.version} - - - ${project.groupId} - rocketmq-qatest - ${project.version} - - - ${project.groupId} - rocketmq-filtersrv - ${project.version} - - - ${project.groupId} - rocketmq-srvutil - ${project.version} - - - junit - junit - 4.11 - test - - - org.slf4j - slf4j-api - 1.7.5 - - - ch.qos.logback - logback-classic - 1.0.13 - - - ch.qos.logback - logback-core - 1.0.13 - - - commons-io - commons-io - 2.4 - - - commons-cli - commons-cli - 1.2 - - - io.netty - netty-all - 4.0.23.Final - - - com.alibaba - fastjson - 1.1.41 - - - mysql - mysql-connector-java - 5.1.31 - - - org.apache.derby - derby - 10.10.2.0 - - - jboss - javassist - 3.7.ga - - - + + + + ${project.groupId} + rocketmq-client + ${project.version} + + + ${project.groupId} + rocketmq-broker + ${project.version} + + + ${project.groupId} + rocketmq-common + ${project.version} + + + ${project.groupId} + rocketmq-store + ${project.version} + + + ${project.groupId} + rocketmq-namesrv + ${project.version} + + + ${project.groupId} + rocketmq-tools + ${project.version} + + + ${project.groupId} + rocketmq-remoting + ${project.version} + + + ${project.groupId} + rocketmq-qatest + ${project.version} + + + ${project.groupId} + rocketmq-filtersrv + ${project.version} + + + ${project.groupId} + rocketmq-srvutil + ${project.version} + + + junit + junit + 4.11 + test + + + org.slf4j + slf4j-api + 1.7.5 + + + ch.qos.logback + logback-classic + 1.0.13 + + + ch.qos.logback + logback-core + 1.0.13 + + + commons-io + commons-io + 2.4 + + + commons-cli + commons-cli + 1.2 + + + io.netty + netty-all + 4.0.24.Final + + + com.alibaba + fastjson + 1.2.3 + + + mysql + mysql-connector-java + 5.1.31 + + + org.apache.derby + derby + 10.10.2.0 + + + jboss + javassist + 3.7.ga + + + diff --git a/rocketmq-broker/pom.xml b/rocketmq-broker/pom.xml index 5013ece9b..a279f873c 100644 --- a/rocketmq-broker/pom.xml +++ b/rocketmq-broker/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java index d06359ba1..8bec92d83 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java @@ -34,6 +34,7 @@ import com.alibaba.rocketmq.common.BrokerConfig; import com.alibaba.rocketmq.common.MQVersion; import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.conflict.PackageConflictDetect; import com.alibaba.rocketmq.common.constant.LoggerName; import com.alibaba.rocketmq.remoting.common.RemotingUtil; import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; @@ -94,6 +95,9 @@ public static BrokerController createBrokerController(String[] args) { } try { + // 检测包冲突 + PackageConflictDetect.detectFastjson(); + // 解析命令行 Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java index e262be6d7..918220505 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java @@ -15,7 +15,14 @@ */ package com.alibaba.rocketmq.broker.client; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; @@ -24,15 +31,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; - /** * 整个Consumer Group信息 @@ -230,6 +228,11 @@ else if (sub.getSubVersion() > old.getSubVersion()) { } + public Set getSubscribeTopics() { + return subscriptionTable.keySet(); + } + + public SubscriptionData findSubscriptionData(final String topic) { return this.subscriptionTable.get(topic); } diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java index 34c6b25fb..dbda809b5 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -15,11 +15,12 @@ */ package com.alibaba.rocketmq.broker.client; -import com.alibaba.rocketmq.broker.BrokerController; import io.netty.channel.Channel; import java.util.List; +import com.alibaba.rocketmq.broker.BrokerController; + /** * ConsumerId列表变化,通知所有Consumer diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java index 42ac39d39..1699526c7 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -1,15 +1,9 @@ package com.alibaba.rocketmq.broker.filtersrv; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - import org.slf4j.Logger; public class FilterServerUtil { - private static String[] splitShellString(final String shellString) { String[] split = shellString.split(" "); return split; @@ -21,27 +15,7 @@ public static void callShell(final String shellString, final Logger log) { try { String[] cmdArray = splitShellString(shellString); process = Runtime.getRuntime().exec(cmdArray); - final InputStream is1 = process.getInputStream(); - new Thread(new Runnable() { - public void run() { - BufferedReader br = new BufferedReader(new InputStreamReader(is1)); - try { - while (br.readLine() != null) - ; - } - catch (IOException e) { - log.error("callShell: readLine IOException, " + shellString, e); - } - } - }).start(); // 启动单独的线程来清空process.getInputStream()的缓冲区 - - InputStream is2 = process.getErrorStream(); - BufferedReader br2 = new BufferedReader(new InputStreamReader(is2)); - StringBuilder buf = new StringBuilder(); // 保存输出结果流 - String line = null; - while ((line = br2.readLine()) != null) - buf.append(line); // 循环等待ffmpeg进程结束 - + process.waitFor(); log.info("callShell: <{}> OK", shellString); } catch (Throwable e) { diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java index 466f47450..7466e4235 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -15,17 +15,9 @@ */ package com.alibaba.rocketmq.broker.processor; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; - import java.io.UnsupportedEncodingException; import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,46 +39,13 @@ import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.common.protocol.RequestCode; import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.Connection; -import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; -import com.alibaba.rocketmq.common.protocol.body.GroupList; -import com.alibaba.rocketmq.common.protocol.body.KVTable; -import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody; -import com.alibaba.rocketmq.common.protocol.body.LockBatchResponseBody; -import com.alibaba.rocketmq.common.protocol.body.ProducerConnection; -import com.alibaba.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; -import com.alibaba.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; -import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan; -import com.alibaba.rocketmq.common.protocol.body.TopicList; -import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import com.alibaba.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.CreateTopicRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.DeleteTopicRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SearchOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import com.alibaba.rocketmq.common.protocol.body.*; +import com.alibaba.rocketmq.common.protocol.header.*; import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.common.stats.StatsItem; +import com.alibaba.rocketmq.common.stats.StatsSnapshot; import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; import com.alibaba.rocketmq.remoting.common.RemotingHelper; import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; @@ -97,6 +56,9 @@ import com.alibaba.rocketmq.store.DefaultMessageStore; import com.alibaba.rocketmq.store.SelectMapedBufferResult; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; + /** * 管理类请求处理 @@ -221,6 +183,9 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand case RequestCode.CLONE_GROUP_OFFSET: return this.cloneGroupOffset(ctx, request); + // 查看Broker统计信息 + case RequestCode.VIEW_BROKER_STATS_DATA: + return ViewBrokerStatsData(ctx, request); default: break; } @@ -229,6 +194,62 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand } + private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final ViewBrokerStatsDataRequestHeader requestHeader = + (ViewBrokerStatsDataRequestHeader) request + .decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + + StatsItem statsItem = + messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), + requestHeader.getStatsKey()); + if (null == statsItem) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), + requestHeader.getStatsKey())); + return response; + } + + BrokerStatsData brokerStatsData = new BrokerStatsData(); + // 分钟 + { + BrokerStatsItem it = new BrokerStatsItem(); + StatsSnapshot ss = statsItem.getStatsDataInMinute(); + it.setSum(ss.getSum()); + it.setTps(ss.getTps()); + it.setAvgpt(ss.getAvgpt()); + brokerStatsData.setStatsMinute(it); + } + + // 小时 + { + BrokerStatsItem it = new BrokerStatsItem(); + StatsSnapshot ss = statsItem.getStatsDataInHour(); + it.setSum(ss.getSum()); + it.setTps(ss.getTps()); + it.setAvgpt(ss.getAvgpt()); + brokerStatsData.setStatsHour(it); + } + + // 天 + { + BrokerStatsItem it = new BrokerStatsItem(); + StatsSnapshot ss = statsItem.getStatsDataInDay(); + it.setSum(ss.getSum()); + it.setTps(ss.getTps()); + it.setAvgpt(ss.getAvgpt()); + brokerStatsData.setStatsDay(it); + } + + response.setBody(brokerStatsData.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand callConsumer(// final int requestCode,// final RemotingCommand request, // @@ -364,9 +385,15 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingComma ConsumeStats consumeStats = new ConsumeStats(); - Set topics = - this.brokerController.getConsumerOffsetManager().whichTopicByConsumer( - requestHeader.getConsumerGroup()); + Set topics = new HashSet(); + if (UtilAll.isBlank(requestHeader.getTopic())) { + topics = + this.brokerController.getConsumerOffsetManager().whichTopicByConsumer( + requestHeader.getConsumerGroup()); + } + else { + topics.add(requestHeader.getTopic()); + } for (String topic : topics) { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); @@ -519,7 +546,7 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, Rem return response; } - response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not online"); return response; } diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java index 0baef873e..5be550326 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -60,6 +60,12 @@ private void init() { subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP); this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig); } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP); + this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig); + } } diff --git a/rocketmq-client/pom.xml b/rocketmq-client/pom.xml index 1f19212dd..246fe48b0 100644 --- a/rocketmq-client/pom.xml +++ b/rocketmq-client/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java index 7c7823d85..e8e768e0e 100644 --- a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java @@ -1,5 +1,8 @@ package com.alibaba.rocketmq.client; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.common.MixAll; @@ -7,9 +10,6 @@ import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.protocol.ResponseCode; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - /** * 有效性检查公用类。 diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java index 4d4ab629e..40d7fed00 100644 --- a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java @@ -17,14 +17,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -56,70 +49,10 @@ import com.alibaba.rocketmq.common.namesrv.TopAddressing; import com.alibaba.rocketmq.common.protocol.RequestCode; import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody; -import com.alibaba.rocketmq.common.protocol.body.GroupList; -import com.alibaba.rocketmq.common.protocol.body.KVTable; -import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody; -import com.alibaba.rocketmq.common.protocol.body.LockBatchResponseBody; -import com.alibaba.rocketmq.common.protocol.body.ProducerConnection; -import com.alibaba.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; -import com.alibaba.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; -import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan; -import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody; -import com.alibaba.rocketmq.common.protocol.body.TopicList; -import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import com.alibaba.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.CreateTopicRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.DeleteTopicRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SearchOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SearchOffsetResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; -import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.UnregisterClientRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.ViewMessageRequestHeader; +import com.alibaba.rocketmq.common.protocol.body.*; +import com.alibaba.rocketmq.common.protocol.header.*; import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; +import com.alibaba.rocketmq.common.protocol.header.namesrv.*; import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumerData; import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData; import com.alibaba.rocketmq.common.protocol.heartbeat.ProducerData; @@ -130,12 +63,7 @@ import com.alibaba.rocketmq.remoting.RPCHook; import com.alibaba.rocketmq.remoting.RemotingClient; import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; -import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import com.alibaba.rocketmq.remoting.exception.*; import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; import com.alibaba.rocketmq.remoting.netty.ResponseFuture; @@ -1249,6 +1177,13 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return getConsumeStats(addr, consumerGroup, null, timeoutMillis); + } + + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, + final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { // 添加虚拟运行环境相关的projectGroupPrefix String consumerGroupWithProjectGroup = consumerGroup; if (!UtilAll.isBlank(projectGroupPrefix)) { @@ -1258,6 +1193,7 @@ public ConsumeStats getConsumeStats(final String addr, final String consumerGrou GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); requestHeader.setConsumerGroup(consumerGroupWithProjectGroup); + requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); @@ -2404,4 +2340,31 @@ public void cloneGroupOffset(final String addr, final String srcGroup, final Str throw new MQClientException(response.getCode(), response.getRemark()); } + + + public BrokerStatsData ViewBrokerStatsData(String brokerAddr, String statsName, String statsKey, + long timeoutMillis) throws MQClientException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + ViewBrokerStatsDataRequestHeader requestHeader = new ViewBrokerStatsDataRequestHeader(); + requestHeader.setStatsName(statsName); + requestHeader.setStatsKey(statsKey); + + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.VIEW_BROKER_STATS_DATA, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + byte[] body = response.getBody(); + if (body != null) { + return BrokerStatsData.decode(body, BrokerStatsData.class); + } + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } } diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index c25ce5267..0229ded5c 100644 --- a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -15,6 +15,19 @@ */ package com.alibaba.rocketmq.client.impl.consumer; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; + import com.alibaba.rocketmq.client.QueryResult; import com.alibaba.rocketmq.client.Validators; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -43,7 +56,11 @@ import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.filter.FilterAPI; import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus; import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo; @@ -58,12 +75,6 @@ import com.alibaba.rocketmq.remoting.common.RemotingHelper; import com.alibaba.rocketmq.remoting.exception.RemotingException; -import org.slf4j.Logger; - -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - /** * Push方式的Consumer实现 diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/factory/MQClientInstance.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/factory/MQClientInstance.java index 7140a4a8b..49ec73b70 100644 --- a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/factory/MQClientInstance.java +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/factory/MQClientInstance.java @@ -15,12 +15,42 @@ */ package com.alibaba.rocketmq.client.impl.factory; +import java.io.UnsupportedEncodingException; +import java.net.DatagramSocket; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; + import com.alibaba.rocketmq.client.ClientConfig; import com.alibaba.rocketmq.client.admin.MQAdminExtInner; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.*; -import com.alibaba.rocketmq.client.impl.consumer.*; +import com.alibaba.rocketmq.client.impl.ClientRemotingProcessor; +import com.alibaba.rocketmq.client.impl.FindBrokerResult; +import com.alibaba.rocketmq.client.impl.MQAdminImpl; +import com.alibaba.rocketmq.client.impl.MQClientAPIImpl; +import com.alibaba.rocketmq.client.impl.MQClientManager; +import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; +import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner; +import com.alibaba.rocketmq.client.impl.consumer.ProcessQueue; +import com.alibaba.rocketmq.client.impl.consumer.PullMessageService; +import com.alibaba.rocketmq.client.impl.consumer.RebalanceService; import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl; import com.alibaba.rocketmq.client.impl.producer.MQProducerInner; import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo; @@ -31,13 +61,18 @@ import com.alibaba.rocketmq.common.MixAll; import com.alibaba.rocketmq.common.ServiceState; import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.conflict.PackageConflictDetect; import com.alibaba.rocketmq.common.constant.PermName; import com.alibaba.rocketmq.common.filter.FilterAPI; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.heartbeat.*; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumerData; +import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData; +import com.alibaba.rocketmq.common.protocol.heartbeat.ProducerData; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; import com.alibaba.rocketmq.common.protocol.route.BrokerData; import com.alibaba.rocketmq.common.protocol.route.QueueData; import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; @@ -45,16 +80,6 @@ import com.alibaba.rocketmq.remoting.common.RemotingHelper; import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import org.slf4j.Logger; - -import java.io.UnsupportedEncodingException; -import java.net.DatagramSocket; -import java.net.URL; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.*; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** @@ -159,6 +184,8 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli public void start() throws MQClientException { + PackageConflictDetect.detectFastjson(); + synchronized (this) { switch (this.serviceState) { case CREATE_JUST: diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 8fbba682c..2c90ef5fd 100644 --- a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -17,8 +17,17 @@ import java.io.IOException; import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -34,12 +43,27 @@ import com.alibaba.rocketmq.client.impl.MQClientManager; import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.client.producer.*; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; +import com.alibaba.rocketmq.client.producer.LocalTransactionState; +import com.alibaba.rocketmq.client.producer.MessageQueueSelector; +import com.alibaba.rocketmq.client.producer.SendCallback; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.client.producer.SendStatus; +import com.alibaba.rocketmq.client.producer.TransactionCheckListener; +import com.alibaba.rocketmq.client.producer.TransactionMQProducer; +import com.alibaba.rocketmq.client.producer.TransactionSendResult; import com.alibaba.rocketmq.common.MixAll; import com.alibaba.rocketmq.common.ServiceState; import com.alibaba.rocketmq.common.UtilAll; import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageId; +import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.common.protocol.ResponseCode; import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader; diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java index 46fbc62e4..503022c9b 100644 --- a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -15,12 +15,12 @@ */ package com.alibaba.rocketmq.client.impl.producer; -import com.alibaba.rocketmq.common.message.MessageQueue; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import com.alibaba.rocketmq.common.message.MessageQueue; + /** * 发布Topic用到的路由信息 diff --git a/rocketmq-common/pom.xml b/rocketmq-common/pom.xml index df7c6aa1b..2e0ebea71 100644 --- a/rocketmq-common/pom.xml +++ b/rocketmq-common/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java index a0ed68fa2..5aad9a481 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java @@ -15,13 +15,13 @@ */ package com.alibaba.rocketmq.common; +import java.net.InetAddress; +import java.net.UnknownHostException; + import com.alibaba.rocketmq.common.annotation.ImportantField; import com.alibaba.rocketmq.common.constant.PermName; import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import java.net.InetAddress; -import java.net.UnknownHostException; - /** * 服务器配置 diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java index ab20ce1b3..6d0f84c2b 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java @@ -22,7 +22,7 @@ */ public class MQVersion { // TODO 每次发布版本都要修改此处版本号 - public static final int CurrentVersion = Version.V3_2_2_SNAPSHOT.ordinal(); + public static final int CurrentVersion = Version.V3_2_4_SNAPSHOT.ordinal(); public static String getVersionDesc(int value) { diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java index 1ee933b53..071e926a8 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java @@ -15,14 +15,31 @@ */ package com.alibaba.rocketmq.common; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.net.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.URL; +import java.net.URLConnection; +import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -56,6 +73,8 @@ public class MixAll { public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER"; public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER"; public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"; + public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP"; + public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP"; public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/conflict/PackageConflictDetect.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/conflict/PackageConflictDetect.java new file mode 100644 index 000000000..3b4834e7e --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/conflict/PackageConflictDetect.java @@ -0,0 +1,37 @@ +package com.alibaba.rocketmq.common.conflict; + +/** + * 用来检测包冲突问题,如果低于某个版本,则要求用户升级 + */ +public class PackageConflictDetect { + private static boolean detectEnable = Boolean.parseBoolean(System.getProperty( + "com.alibaba.rocketmq.packageConflictDetect.enable", "true")); + + + /** + * fastjson的依赖冲突解决 + */ + public static void detectFastjson() { + if (detectEnable) { + final String fastjsonVersion = "1.2.3"; + boolean conflict = false; + try { + String version = com.alibaba.fastjson.JSON.VERSION; + int code = version.compareTo(fastjsonVersion); + // 说明依赖的版本比要求的版本低 + if (code < 0) { + conflict = true; + } + } + catch (Throwable e) { + conflict = true; + } + + if (conflict) { + throw new RuntimeException(String.format( + "Your fastjson version is too low, or no fastjson, RocketMQ minimum version required: %s",// + fastjsonVersion)); + } + } + } +} diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java index 28dbb93f1..bdf05b21a 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java @@ -49,8 +49,13 @@ private static String clearNewLine(final String str) { public final String fetchNSAddr() { + return fetchNSAddr(true, 3000); + } + + + public final String fetchNSAddr(boolean verbose, long timeoutMills) { try { - HttpResult result = HttpTinyClient.httpGet(this.wsAddr, null, null, "UTF-8", 3000); + HttpResult result = HttpTinyClient.httpGet(this.wsAddr, null, null, "UTF-8", timeoutMills); if (200 == result.code) { String responseStr = result.content; if (responseStr != null) { @@ -65,16 +70,19 @@ public final String fetchNSAddr() { } } catch (IOException e) { - log.error("fetchZKAddr exception", e); + if (verbose) { + log.error("fetchZKAddr exception", e); + } } - String errorMsg = - "connect to " + wsAddr + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME - + " not bind in /etc/hosts"; - errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); + if (verbose) { + String errorMsg = + "connect to " + wsAddr + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + + " not bind in /etc/hosts"; + errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); - log.warn(errorMsg); - System.out.println(errorMsg); + log.warn(errorMsg); + } return null; } diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java index f0314e307..88a4f8249 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java @@ -167,4 +167,6 @@ public class RequestCode { // 克隆某一个组的消费进度到新的组 public static final int CLONE_GROUP_OFFSET = 314; + // 查看Broker上的各种统计信息 + public static final int VIEW_BROKER_STATS_DATA = 315; } diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java new file mode 100644 index 000000000..aa3ff87c2 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java @@ -0,0 +1,43 @@ +package com.alibaba.rocketmq.common.protocol.body; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + + +public class BrokerStatsData extends RemotingSerializable { + // 最近一分钟内的统计 + private BrokerStatsItem statsMinute; + // 最近一小时内的统计 + private BrokerStatsItem statsHour; + // 最近一天内的的统计 + private BrokerStatsItem statsDay; + + + public BrokerStatsItem getStatsMinute() { + return statsMinute; + } + + + public void setStatsMinute(BrokerStatsItem statsMinute) { + this.statsMinute = statsMinute; + } + + + public BrokerStatsItem getStatsHour() { + return statsHour; + } + + + public void setStatsHour(BrokerStatsItem statsHour) { + this.statsHour = statsHour; + } + + + public BrokerStatsItem getStatsDay() { + return statsDay; + } + + + public void setStatsDay(BrokerStatsItem statsDay) { + this.statsDay = statsDay; + } +} diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java new file mode 100644 index 000000000..324db09dc --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java @@ -0,0 +1,37 @@ +package com.alibaba.rocketmq.common.protocol.body; + +public class BrokerStatsItem { + private long sum; + private double tps; + private double avgpt; + + + public long getSum() { + return sum; + } + + + public void setSum(long sum) { + this.sum = sum; + } + + + public double getTps() { + return tps; + } + + + public void setTps(double tps) { + this.tps = tps; + } + + + public double getAvgpt() { + return avgpt; + } + + + public void setAvgpt(double avgpt) { + this.avgpt = avgpt; + } +} diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java index 1b6972f13..1534f23a6 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java @@ -27,6 +27,7 @@ public class GetConsumeStatsRequestHeader implements CommandCustomHeader { @CFNotNull private String consumerGroup; + private String topic; @Override @@ -44,4 +45,14 @@ public String getConsumerGroup() { public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } } diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ViewBrokerStatsDataRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ViewBrokerStatsDataRequestHeader.java new file mode 100644 index 000000000..542ca3887 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ViewBrokerStatsDataRequestHeader.java @@ -0,0 +1,39 @@ +package com.alibaba.rocketmq.common.protocol.header; + +import com.alibaba.rocketmq.remoting.CommandCustomHeader; +import com.alibaba.rocketmq.remoting.annotation.CFNotNull; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; + + +public class ViewBrokerStatsDataRequestHeader implements CommandCustomHeader { + @CFNotNull + private String statsName; + @CFNotNull + private String statsKey; + + + @Override + public void checkFields() throws RemotingCommandException { + + } + + + public String getStatsName() { + return statsName; + } + + + public void setStatsName(String statsName) { + this.statsName = statsName; + } + + + public String getStatsKey() { + return statsKey; + } + + + public void setStatsKey(String statsKey) { + this.statsKey = statsKey; + } +} diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java new file mode 100644 index 000000000..232d8ee4b --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java @@ -0,0 +1,68 @@ +package com.alibaba.rocketmq.common.stats; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; + +import com.alibaba.rocketmq.common.UtilAll; + + +public class MomentStatsItem { + // 具体的统计值 + private final AtomicLong value = new AtomicLong(0); + + private final String statsName; + private final String statsKey; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public MomentStatsItem(String statsName, String statsKey, + ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.statsKey = statsKey; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + } + + + public void init() { + // 分钟整点执行 + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } + catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), // + 1000 * 60 * 5, TimeUnit.MILLISECONDS); + } + + + public void printAtMinutes() { + log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d", // + this.statsName,// + this.statsKey,// + this.value.get())); + } + + + public AtomicLong getValue() { + return value; + } + + + public String getStatsKey() { + return statsKey; + } + + + public String getStatsName() { + return statsName; + } +} diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java new file mode 100644 index 000000000..577ffc5f1 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java @@ -0,0 +1,77 @@ +package com.alibaba.rocketmq.common.stats; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; + +import com.alibaba.rocketmq.common.UtilAll; + + +public class MomentStatsItemSet { + private final ConcurrentHashMap statsItemTable = + new ConcurrentHashMap(128); + + private final String statsName; + private final ScheduledExecutorService scheduledExecutorService; + private final Logger log; + + + public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) { + this.statsName = statsName; + this.scheduledExecutorService = scheduledExecutorService; + this.log = log; + this.init(); + } + + + public MomentStatsItem getAndCreateStatsItem(final String statsKey) { + MomentStatsItem statsItem = this.statsItemTable.get(statsKey); + if (null == statsItem) { + statsItem = + new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem); + // 说明是第一次插入 + if (null == prev) { + // 内部不需要定时,外部统一定时 + // statsItem.init(); + } + } + + return statsItem; + } + + + public void setValue(final String statsKey, final int value) { + MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey); + statsItem.getValue().set(value); + } + + + public void init() { + // 分钟整点执行 + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printAtMinutes(); + } + catch (Throwable e) { + } + } + }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), // + 1000 * 60 * 5, TimeUnit.MILLISECONDS); + } + + + private void printAtMinutes() { + Iterator> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + next.getValue().printAtMinutes(); + } + } +} diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java index c53a1dacf..2de15c748 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java @@ -1,14 +1,15 @@ package com.alibaba.rocketmq.common.stats; -import com.alibaba.rocketmq.common.UtilAll; -import org.slf4j.Logger; - import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; + +import com.alibaba.rocketmq.common.UtilAll; + public class StatsItemSet { private final ConcurrentHashMap statsItemTable = @@ -27,7 +28,7 @@ public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutor } - private StatsItem getAndCreateStatsItem(final String statsKey) { + public StatsItem getAndCreateStatsItem(final String statsKey) { StatsItem statsItem = this.statsItemTable.get(statsKey); if (null == statsItem) { statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); @@ -77,6 +78,11 @@ public StatsSnapshot getStatsDataInDay(final String statsKey) { } + public StatsItem getStatsItem(final String statsKey) { + return this.statsItemTable.get(statsKey); + } + + public void init() { // 每隔10s执行一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { diff --git a/rocketmq-example/pom.xml b/rocketmq-example/pom.xml index d0c32fcea..eac3989f5 100644 --- a/rocketmq-example/pom.xml +++ b/rocketmq-example/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-filtersrv/pom.xml b/rocketmq-filtersrv/pom.xml index 454fcf12b..4cccecb25 100644 --- a/rocketmq-filtersrv/pom.xml +++ b/rocketmq-filtersrv/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java index c958ef6e8..3c2416f81 100644 --- a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java +++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java @@ -33,6 +33,7 @@ import com.alibaba.rocketmq.common.MQVersion; import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.conflict.PackageConflictDetect; import com.alibaba.rocketmq.common.constant.LoggerName; import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; import com.alibaba.rocketmq.remoting.netty.NettySystemConfig; @@ -100,6 +101,9 @@ public static FiltersrvController createController(String[] args) { } try { + // 检测包冲突 + PackageConflictDetect.detectFastjson(); + // 解析命令行 Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = diff --git a/rocketmq-namesrv/pom.xml b/rocketmq-namesrv/pom.xml index d7a13d0a5..b392adb7e 100644 --- a/rocketmq-namesrv/pom.xml +++ b/rocketmq-namesrv/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java index ce489cc07..88a18edf0 100644 --- a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java @@ -33,6 +33,7 @@ import com.alibaba.rocketmq.common.MQVersion; import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.conflict.PackageConflictDetect; import com.alibaba.rocketmq.common.constant.LoggerName; import com.alibaba.rocketmq.common.namesrv.NamesrvConfig; import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; @@ -84,6 +85,9 @@ public static NamesrvController main0(String[] args) { } try { + // 检测包冲突 + PackageConflictDetect.detectFastjson(); + // 解析命令行 Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = diff --git a/rocketmq-remoting/pom.xml b/rocketmq-remoting/pom.xml index ab2fd6178..b30646a44 100644 --- a/rocketmq-remoting/pom.xml +++ b/rocketmq-remoting/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java index 1ed3037b8..d245bba33 100644 --- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java @@ -7,5 +7,6 @@ public interface RPCHook { public void doBeforeRequest(final String remoteAddr, final RemotingCommand request); - public void doAfterResponse(final RemotingCommand request, final RemotingCommand response); + public void doAfterResponse(final String remoteAddr, final RemotingCommand request, + final RemotingCommand response); } diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java index 341ac4d95..a3832deb2 100644 --- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -171,7 +171,8 @@ public void run() { final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { - rpcHook.doAfterResponse(cmd, response); + rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + cmd, response); } // Oneway形式忽略应答结果 diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java index 82194e901..ddbf09ab1 100644 --- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java @@ -624,7 +624,8 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis); if (this.rpcHook != null) { - this.rpcHook.doAfterResponse(request, response); + this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), + request, response); } return response; } diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java index 1ec089841..93c2afd72 100644 --- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java @@ -9,4 +9,5 @@ public enum LanguageCode { ERLANG, RUBY, OTHER, + HTTP, } diff --git a/rocketmq-srvutil/pom.xml b/rocketmq-srvutil/pom.xml index 2a521df96..4fc00b9cb 100644 --- a/rocketmq-srvutil/pom.xml +++ b/rocketmq-srvutil/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-store/pom.xml b/rocketmq-store/pom.xml index cc2599c47..02967f810 100644 --- a/rocketmq-store/pom.xml +++ b/rocketmq-store/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java index 040ed5a4c..ddb555a2f 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java @@ -15,6 +15,29 @@ */ package com.alibaba.rocketmq.store; +import static com.alibaba.rocketmq.store.config.BrokerRole.SLAVE; + +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.alibaba.rocketmq.common.ServiceThread; import com.alibaba.rocketmq.common.SystemClock; import com.alibaba.rocketmq.common.ThreadFactoryImpl; @@ -34,22 +57,6 @@ import com.alibaba.rocketmq.store.index.QueryOffsetResult; import com.alibaba.rocketmq.store.schedule.ScheduleMessageService; import com.alibaba.rocketmq.store.stats.BrokerStatsManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static com.alibaba.rocketmq.store.config.BrokerRole.SLAVE; /** @@ -465,6 +472,9 @@ public GetMessageResult getMessage(final String group, final String topic, final GetMessageResult getResult = new GetMessageResult(); + // 有个读写锁,所以只访问一次,避免锁开销影响性能 + final long maxOffsetPy = this.commitLog.getMaxOffset(); + ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQuque(); @@ -502,6 +512,7 @@ else if (offset > maxOffset) { int i = 0; final int MaxFilterMessageCount = 16000; + boolean diskFallRecorded = false; for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); @@ -517,7 +528,7 @@ else if (offset > maxOffset) { } // 判断是否拉磁盘数据 - boolean isInDisk = checkInDiskByCommitOffset(offsetPy); + boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); // 此批消息达到上限了 if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { @@ -535,12 +546,12 @@ else if (offset > maxOffset) { status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; - // 统计消息数据 - if (isInDisk && brokerStatsManager != null) { - brokerStatsManager.incGroupGetFromDiskNums(group, topic, 1); - brokerStatsManager.incGroupGetFromDiskSize(group, topic, - selectResult.getSize()); - brokerStatsManager.incBrokerGetFromDiskNums(1); + // 统计读取磁盘落后情况 + if (diskFallRecorded) { + diskFallRecorded = true; + long fallBehind = consumeQueue.getMaxPhysicOffset() - offsetPy; + brokerStatsManager.recordDiskFallBehind(group, topic, queueId, + fallBehind); } } else { @@ -1728,6 +1739,7 @@ private void doReput() { if (size > 0) { DefaultMessageStore.this.putDispatchRequest(dispatchRequest); + // FIXED BUG By shijia this.reputFromOffset += size; readSize += size; DefaultMessageStore.this.storeStatsService @@ -1894,26 +1906,28 @@ public Map getMessageIds(final String topic, final int queueId, lo } - private boolean checkInDiskByCommitOffset(long offsetPy) { - long maxOffsetPy = this.commitLog.getMaxOffset(); + private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) { long memory = (long) (StoreUtil.TotalPhysicalMemorySize * (this.messageStoreConfig .getAccessMessageInMemoryMaxRatio() / 100.0)); - return maxOffsetPy - offsetPy > memory; + return (maxOffsetPy - offsetPy) > memory; } @Override public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) { + // 有个读写锁,所以只访问一次,避免锁开销影响性能 + final long maxOffsetPy = this.commitLog.getMaxOffset(); + ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset); if (bufferConsumeQueue != null) { try { - int i = 0; - for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQStoreUnitSize) { + for (int i = 0; i < bufferConsumeQueue.getSize();) { + i += ConsumeQueue.CQStoreUnitSize; long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); - return checkInDiskByCommitOffset(offsetPy); + return checkInDiskByCommitOffset(offsetPy, maxOffsetPy); } } finally { @@ -1927,4 +1941,9 @@ public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, } return false; } + + + public BrokerStatsManager getBrokerStatsManager() { + return brokerStatsManager; + } } diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java index d9f5246f2..36b210193 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java @@ -15,14 +15,14 @@ */ package com.alibaba.rocketmq.store; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; - import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; import java.util.Set; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; + /** * 存储层对外提供的接口 diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java index 036c7b7c1..912668a2e 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java @@ -128,6 +128,11 @@ public boolean putKey(final String key, final long phyOffset, final long storeTi } long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); + + // 时间差存储单位由毫秒改为秒 + timeDiff = timeDiff / 1000; + + // 25000天后溢出 if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } @@ -138,9 +143,6 @@ else if (timeDiff < 0) { timeDiff = 0; } - // 时间差存储单位由毫秒改为秒 - timeDiff = timeDiff / 1000; - int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE + this.indexHeader.getIndexCount() * INDEX_SIZE; @@ -274,7 +276,8 @@ public void selectPhyOffset(final List phyOffsets, final String key, final int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); - int timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); + // int转为long,避免下面计算时间差值时溢出 + long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); // 读到了未知数据 @@ -282,8 +285,8 @@ public void selectPhyOffset(final List phyOffsets, final String key, final break; } - // 时间差存储的是秒,再还原为毫秒 - timeDiff *= 1000; + // 时间差存储的是秒,再还原为毫秒, long避免溢出 + timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java index 3490a95e5..a13dd1eb4 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java @@ -1,14 +1,17 @@ package com.alibaba.rocketmq.store.stats; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.alibaba.rocketmq.common.ThreadFactoryImpl; import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.stats.MomentStatsItemSet; import com.alibaba.rocketmq.common.stats.StatsItem; import com.alibaba.rocketmq.common.stats.StatsItemSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; public class BrokerStatsManager { @@ -20,70 +23,42 @@ public class BrokerStatsManager { public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE"; public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS"; public static final String GROUP_GET_SIZE = "GROUP_GET_SIZE"; - public static final String GROUP_GET_FROM_DISK_NUMS = "GROUP_GET_FROM_DISK_NUMS"; - public static final String GROUP_GET_FROM_DISK_SIZE = "GROUP_GET_FROM_DISK_SIZE"; public static final String SNDBCK_PUT_NUMS = "SNDBCK_PUT_NUMS"; public static final String BROKER_PUT_NUMS = "BROKER_PUT_NUMS"; public static final String BROKER_GET_NUMS = "BROKER_GET_NUMS"; - public static final String BROKER_GET_FROM_DISK_NUMS = "BROKER_GET_FROM_DISK_NUMS"; - - // Topic Put Nums - private final StatsItemSet topicPutNums = new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, - log); - - // Topic Put Size - private final StatsItemSet topicPutSize = new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, - log); - - // Topic@ConsumerGroup Get Nums - private final StatsItemSet groupGetNums = new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, - log); - - // Topic@ConsumerGroup Get Size - private final StatsItemSet groupGetSize = new StatsItemSet(GROUP_GET_SIZE, this.scheduledExecutorService, - log); - - // Topic@ConsumerGroup get in disk nums - private final StatsItemSet groupGetFromDiskNums = new StatsItemSet(GROUP_GET_FROM_DISK_NUMS, - this.scheduledExecutorService, log); - // Topic@ConsumerGroup group get in disk size - private final StatsItemSet groupGetFromDiskSize = new StatsItemSet(GROUP_GET_FROM_DISK_SIZE, - this.scheduledExecutorService, log); + private final HashMap statsTable = new HashMap(); + private final String clusterName; - // Broker Put Nums - private final StatsItem brokerPutNums; - - // Broker Get Nums - private final StatsItem brokerGetNums; - - // Broker Get From Disk Nums - private final StatsItem brokerGetFromDiskNums; - - // Topic@ConsumerGroup sendback Nums - private final StatsItemSet sndbckPutNums = new StatsItemSet(SNDBCK_PUT_NUMS, - this.scheduledExecutorService, log); + /** + * 读磁盘落后统计 + */ + public static final String GROUP_GET_FALL = "GROUP_GET_FALL"; + private final MomentStatsItemSet momentStatsItemSet = new MomentStatsItemSet(GROUP_GET_FALL, + scheduledExecutorService, log); public BrokerStatsManager(String clusterName) { - // Broker Put Nums - this.brokerPutNums = new StatsItem(BROKER_PUT_NUMS, // - clusterName, this.scheduledExecutorService, log); - - // Broker Get Nums - this.brokerGetNums = new StatsItem(BROKER_GET_NUMS, // - clusterName, this.scheduledExecutorService, log); - - // Broker Get From Disk Nums - this.brokerGetFromDiskNums = new StatsItem(BROKER_GET_FROM_DISK_NUMS, // - clusterName, this.scheduledExecutorService, log); + this.clusterName = clusterName; + + this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, + log)); + this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, + log)); + this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, + log)); + this.statsTable.put(GROUP_GET_SIZE, new StatsItemSet(GROUP_GET_SIZE, this.scheduledExecutorService, + log)); + this.statsTable.put(SNDBCK_PUT_NUMS, new StatsItemSet(SNDBCK_PUT_NUMS, this.scheduledExecutorService, + log)); + this.statsTable.put(BROKER_PUT_NUMS, new StatsItemSet(BROKER_PUT_NUMS, this.scheduledExecutorService, + log)); + this.statsTable.put(BROKER_GET_NUMS, new StatsItemSet(BROKER_GET_NUMS, this.scheduledExecutorService, + log)); } public void start() { - this.brokerPutNums.init(); - this.brokerGetNums.init(); - this.brokerGetFromDiskNums.init(); } @@ -92,57 +67,62 @@ public void shutdown() { } + public StatsItem getStatsItem(final String statsName, final String statsKey) { + try { + return this.statsTable.get(statsName).getStatsItem(statsKey); + } + catch (Exception e) { + } + + return null; + } + + public void incTopicPutNums(final String topic) { - this.topicPutNums.addValue(topic, 1, 1); + this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); } public void incTopicPutSize(final String topic, final int size) { - this.topicPutSize.addValue(topic, size, 1); + this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1); } public void incGroupGetNums(final String group, final String topic, final int incValue) { - this.groupGetNums.addValue(topic + "@" + group, incValue, 1); + this.statsTable.get(GROUP_GET_NUMS).addValue(topic + "@" + group, incValue, 1); } public void incGroupGetSize(final String group, final String topic, final int incValue) { - this.groupGetSize.addValue(topic + "@" + group, incValue, 1); + this.statsTable.get(GROUP_GET_SIZE).addValue(topic + "@" + group, incValue, 1); } public void incBrokerPutNums() { - this.brokerPutNums.getValue().incrementAndGet(); + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue() + .incrementAndGet(); } public void incBrokerGetNums(final int incValue) { - this.brokerGetNums.getValue().addAndGet(incValue); + this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue() + .addAndGet(incValue); } public void incSendBackNums(final String group, final String topic) { - this.sndbckPutNums.addValue(topic + "@" + group, 1, 1); + this.statsTable.get(SNDBCK_PUT_NUMS).addValue(topic + "@" + group, 1, 1); } public double tpsGroupGetNums(final String group, final String topic) { - return this.groupGetNums.getStatsDataInMinute(topic + "@" + group).getTps(); - } - - - public void incBrokerGetFromDiskNums(final int incValue) { - this.brokerGetFromDiskNums.getValue().addAndGet(incValue); - } - - - public void incGroupGetFromDiskNums(final String group, final String topic, final int incValue) { - this.groupGetFromDiskNums.addValue(topic + "@" + group, incValue, 1); + return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(topic + "@" + group).getTps(); } - public void incGroupGetFromDiskSize(final String group, final String topic, final int incValue) { - this.groupGetFromDiskSize.addValue(topic + "@" + group, incValue, 1); + public void recordDiskFallBehind(final String group, final String topic, final int queueId, + final long fallBehind) { + final String statsKey = String.format("%d@%s@%s", queueId, topic, group); + this.momentStatsItemSet.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); } } diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java index 1a1ac8a5e..63da06747 100644 --- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java @@ -1,17 +1,18 @@ package com.alibaba.rocketmq.store; -import com.alibaba.rocketmq.store.config.FlushDiskType; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import static org.junit.Assert.assertTrue; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertTrue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.alibaba.rocketmq.store.config.FlushDiskType; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; /** diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java index 62897d790..744a270f7 100644 --- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java @@ -3,12 +3,7 @@ */ package com.alibaba.rocketmq.store; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import static org.junit.Assert.assertTrue; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -17,7 +12,13 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertTrue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; public class RecoverTest { diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java index 47a52dc83..a0cfdfaa0 100644 --- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java @@ -3,18 +3,23 @@ */ package com.alibaba.rocketmq.store.schedule; -import com.alibaba.rocketmq.store.*; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import static org.junit.Assert.assertTrue; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertTrue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.alibaba.rocketmq.store.DefaultMessageStore; +import com.alibaba.rocketmq.store.GetMessageResult; +import com.alibaba.rocketmq.store.MessageExtBrokerInner; +import com.alibaba.rocketmq.store.MessageStore; +import com.alibaba.rocketmq.store.PutMessageResult; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; public class ScheduleMessageTest { diff --git a/rocketmq-tools/pom.xml b/rocketmq-tools/pom.xml index 3cbd9fa3f..319ed4203 100644 --- a/rocketmq-tools/pom.xml +++ b/rocketmq-tools/pom.xml @@ -3,7 +3,7 @@ com.alibaba.rocketmq rocketmq-all - 3.2.2 + 3.2.4-SNAPSHOT 4.0.0 diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java index 7e3f238f4..5719baaa6 100644 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -32,23 +32,11 @@ import com.alibaba.rocketmq.common.admin.TopicStatsTable; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.body.GroupList; -import com.alibaba.rocketmq.common.protocol.body.KVTable; -import com.alibaba.rocketmq.common.protocol.body.ProducerConnection; -import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan; -import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.body.*; import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.*; import com.alibaba.rocketmq.tools.admin.api.MessageTrack; @@ -179,7 +167,14 @@ public TopicStatsTable examineTopicStats(String topic) throws RemotingException, @Override public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup); + return examineConsumeStats(consumerGroup, null); + } + + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic); } @@ -433,4 +428,11 @@ public void cloneGroupOffset(String srcGroup, String destGroup, String topic, bo throws RemotingException, MQClientException, InterruptedException, MQBrokerException { this.defaultMQAdminExtImpl.cloneGroupOffset(srcGroup, destGroup, topic, isOffline); } + + + @Override + public BrokerStatsData ViewBrokerStatsData(String brokerAddr, String statsName, String statsKey) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return this.defaultMQAdminExtImpl.ViewBrokerStatsData(brokerAddr, statsName, statsKey); + } } diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 176b4766e..a53a125c5 100644 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -16,17 +16,8 @@ package com.alibaba.rocketmq.tools.admin; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; import org.slf4j.Logger; @@ -41,37 +32,23 @@ import com.alibaba.rocketmq.common.ServiceState; import com.alibaba.rocketmq.common.TopicConfig; import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.admin.ConsumeStats; -import com.alibaba.rocketmq.common.admin.OffsetWrapper; -import com.alibaba.rocketmq.common.admin.RollbackStats; -import com.alibaba.rocketmq.common.admin.TopicStatsTable; +import com.alibaba.rocketmq.common.admin.*; import com.alibaba.rocketmq.common.help.FAQUrl; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.common.namesrv.NamesrvUtil; import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.body.GroupList; -import com.alibaba.rocketmq.common.protocol.body.KVTable; -import com.alibaba.rocketmq.common.protocol.body.ProducerConnection; -import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan; -import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.body.*; import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.QueueData; import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; import com.alibaba.rocketmq.remoting.RPCHook; import com.alibaba.rocketmq.remoting.common.RemotingHelper; import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.*; import com.alibaba.rocketmq.tools.admin.api.MessageTrack; import com.alibaba.rocketmq.tools.admin.api.TrackType; @@ -213,7 +190,7 @@ public TopicStatsTable examineTopicStats(String topic) throws RemotingException, @Override - public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, + public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { String retryTopic = MixAll.getRetryTopic(consumerGroup); TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic); @@ -224,8 +201,8 @@ public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingExc if (addr != null) { // 由于查询时间戳会产生IO操作,可能会耗时较长,所以超时时间设置为15s ConsumeStats consumeStats = - this.mqClientInstance.getMQClientAPIImpl() - .getConsumeStats(addr, consumerGroup, 15000); + this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, + topic, 15000); result.getOffsetTable().putAll(consumeStats.getOffsetTable()); long value = result.getConsumeTps() + consumeStats.getConsumeTps(); result.setConsumeTps(value); @@ -242,6 +219,13 @@ public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingExc } + @Override + public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException { + return examineConsumeStats(consumerGroup, null); + } + + @Override public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { @@ -337,7 +321,8 @@ public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) } if (result.getConnectionSet().isEmpty()) { - throw new MQClientException("Not found the consumer group connection", null); + throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, + "Not found the consumer group connection"); } return result; @@ -466,6 +451,12 @@ public List resetOffsetByTimestampOld(String consumerGroup, Strin MQClientException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); List rollbackStatsList = new ArrayList(); + Map topicRouteMap = new HashMap(); + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + for (QueueData queueData : topicRouteData.getQueueDatas()) { + topicRouteMap.put(bd.selectBrokerAddr(), queueData.getReadQueueNums()); + } + } for (BrokerData bd : topicRouteData.getBrokerDatas()) { String addr = bd.selectBrokerAddr(); if (addr != null) { @@ -474,34 +465,32 @@ public List resetOffsetByTimestampOld(String consumerGroup, Strin this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, 3000); // 根据 topic 过滤不需要的 mq + boolean hasConsumed = false; for (Map.Entry entry : consumeStats.getOffsetTable().entrySet()) { MessageQueue queue = entry.getKey(); OffsetWrapper offsetWrapper = entry.getValue(); if (topic.equals(queue.getTopic())) { - // 根据 timestamp 查找对应的offset - long offset = - this.mqClientInstance.getMQClientAPIImpl().searchOffset(addr, topic, - queue.getQueueId(), timestamp, 3000); - // 构建按时间回溯消费进度 - RollbackStats rollbackStats = new RollbackStats(); - rollbackStats.setBrokerName(bd.getBrokerName()); - rollbackStats.setQueueId(queue.getQueueId()); - rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset()); - rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset()); - rollbackStats.setTimestampOffset(offset); - rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset()); - // 更新 offset - if (force || offset <= offsetWrapper.getConsumerOffset()) { - rollbackStats.setRollbackOffset(offset); - UpdateConsumerOffsetRequestHeader requestHeader = - new UpdateConsumerOffsetRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queue.getQueueId()); - requestHeader.setCommitOffset(offset); - this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(addr, - requestHeader, 3000); - } + hasConsumed = true; + RollbackStats rollbackStats = + resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, + timestamp, force); + rollbackStatsList.add(rollbackStats); + } + } + + if (!hasConsumed) { + HashMap topicStatus = + this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, 3000) + .getOffsetTable(); + for (int i = 0; i < topicRouteMap.get(addr); i++) { + MessageQueue queue = new MessageQueue(topic, bd.getBrokerName(), i); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + offsetWrapper.setBrokerOffset(topicStatus.get(queue).getMaxOffset()); + offsetWrapper.setConsumerOffset(topicStatus.get(queue).getMinOffset()); + + RollbackStats rollbackStats = + resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, + timestamp, force); rollbackStatsList.add(rollbackStats); } } @@ -511,6 +500,36 @@ public List resetOffsetByTimestampOld(String consumerGroup, Strin } + private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, + MessageQueue queue, OffsetWrapper offsetWrapper, long timestamp, boolean force) + throws RemotingException, InterruptedException, MQBrokerException { + // 根据 timestamp 查找对应的offset + long resetOffset = + this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), + queue.getQueueId(), timestamp, 3000); + // 构建按时间回溯消费进度 + RollbackStats rollbackStats = new RollbackStats(); + rollbackStats.setBrokerName(queue.getBrokerName()); + rollbackStats.setQueueId(queue.getQueueId()); + rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset()); + rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset()); + rollbackStats.setTimestampOffset(resetOffset); + rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset()); + + // 更新 offset + if (force || resetOffset <= offsetWrapper.getConsumerOffset()) { + rollbackStats.setRollbackOffset(resetOffset); + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setConsumerGroup(consumeGroup); + requestHeader.setTopic(queue.getTopic()); + requestHeader.setQueueId(queue.getQueueId()); + requestHeader.setCommitOffset(resetOffset); + this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, 3000); + } + return rollbackStats; + } + + @Override public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException { @@ -837,4 +856,12 @@ public void cloneGroupOffset(String srcGroup, String destGroup, String topic, bo } } } + + + @Override + public BrokerStatsData ViewBrokerStatsData(String brokerAddr, String statsName, String statsKey) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().ViewBrokerStatsData(brokerAddr, statsName, + statsKey, 3000); + } } diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java index 3b4635e5e..a2e1f91b7 100644 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java @@ -30,22 +30,10 @@ import com.alibaba.rocketmq.common.admin.TopicStatsTable; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.body.ClusterInfo; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.body.GroupList; -import com.alibaba.rocketmq.common.protocol.body.KVTable; -import com.alibaba.rocketmq.common.protocol.body.ProducerConnection; -import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan; -import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.body.*; import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.exception.RemotingConnectException; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.exception.*; import com.alibaba.rocketmq.tools.admin.api.MessageTrack; @@ -176,6 +164,10 @@ public ConsumeStats examineConsumeStats(final String consumerGroup) throws Remot MQClientException, InterruptedException, MQBrokerException; + public ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + + /** * 查看集群信息 * @@ -608,4 +600,20 @@ public List messageTrackDetail(MessageExt msg) throws RemotingExce */ public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + + + /** + * 服务器统计数据输出 + * + * @param statsName + * @param statsKey + * @return + * @throws InterruptedException + * @throws MQClientException + * @throws RemotingTimeoutException + * @throws RemotingSendRequestException + * @throws RemotingConnectException + */ + public BrokerStatsData ViewBrokerStatsData(final String brokerAddr, final String statsName, + final String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException; } diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java index 6e60720d2..e5eacbcc4 100644 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java @@ -29,6 +29,7 @@ import com.alibaba.rocketmq.common.MQVersion; import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.conflict.PackageConflictDetect; import com.alibaba.rocketmq.remoting.RPCHook; import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; import com.alibaba.rocketmq.srvutil.ServerUtil; @@ -49,13 +50,11 @@ import com.alibaba.rocketmq.tools.command.message.QueryMsgByKeySubCommand; import com.alibaba.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import com.alibaba.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; -import com.alibaba.rocketmq.tools.command.namesrv.DeleteProjectGroupCommand; -import com.alibaba.rocketmq.tools.command.namesrv.GetProjectGroupCommand; import com.alibaba.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; -import com.alibaba.rocketmq.tools.command.namesrv.UpdateProjectGroupCommand; import com.alibaba.rocketmq.tools.command.namesrv.WipeWritePermSubCommand; import com.alibaba.rocketmq.tools.command.offset.CloneGroupOffsetCommand; import com.alibaba.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; +import com.alibaba.rocketmq.tools.command.stats.StatsAllSubCommand; import com.alibaba.rocketmq.tools.command.topic.DeleteTopicSubCommand; import com.alibaba.rocketmq.tools.command.topic.TopicListSubCommand; import com.alibaba.rocketmq.tools.command.topic.TopicRouteSubCommand; @@ -72,45 +71,50 @@ */ public class MQAdminStartup { protected static List subCommandList = new ArrayList(); - static { - subCommandList.add(new UpdateTopicSubCommand()); - subCommandList.add(new DeleteTopicSubCommand()); - subCommandList.add(new UpdateSubGroupSubCommand()); - subCommandList.add(new DeleteSubscriptionGroupCommand()); - subCommandList.add(new UpdateBrokerConfigSubCommand()); - - subCommandList.add(new TopicRouteSubCommand()); - subCommandList.add(new TopicStatusSubCommand()); - - subCommandList.add(new BrokerStatusSubCommand()); - subCommandList.add(new QueryMsgByIdSubCommand()); - subCommandList.add(new QueryMsgByKeySubCommand()); - subCommandList.add(new QueryMsgByOffsetSubCommand()); - subCommandList.add(new PrintMessageSubCommand()); - - subCommandList.add(new ProducerConnectionSubCommand()); - subCommandList.add(new ConsumerConnectionSubCommand()); - subCommandList.add(new ConsumerProgressSubCommand()); - subCommandList.add(new ConsumerStatusSubCommand()); - subCommandList.add(new CloneGroupOffsetCommand()); - - subCommandList.add(new ClusterListSubCommand()); - subCommandList.add(new TopicListSubCommand()); - - subCommandList.add(new UpdateKvConfigCommand()); - subCommandList.add(new DeleteKvConfigCommand()); - - subCommandList.add(new UpdateProjectGroupCommand()); - subCommandList.add(new DeleteProjectGroupCommand()); - subCommandList.add(new GetProjectGroupCommand()); - subCommandList.add(new WipeWritePermSubCommand()); - subCommandList.add(new ResetOffsetByTimeCommand()); - - subCommandList.add(new UpdateOrderConfCommand()); - subCommandList.add(new CleanExpiredCQSubCommand()); - - subCommandList.add(new StartMonitoringSubCommand()); - subCommandList.add(new CheckMsgSubCommand()); + + + public static void initCommand() { + initCommand(new UpdateTopicSubCommand()); + initCommand(new DeleteTopicSubCommand()); + initCommand(new UpdateSubGroupSubCommand()); + initCommand(new DeleteSubscriptionGroupCommand()); + initCommand(new UpdateBrokerConfigSubCommand()); + + initCommand(new TopicRouteSubCommand()); + initCommand(new TopicStatusSubCommand()); + + initCommand(new BrokerStatusSubCommand()); + initCommand(new QueryMsgByIdSubCommand()); + initCommand(new QueryMsgByKeySubCommand()); + initCommand(new QueryMsgByOffsetSubCommand()); + initCommand(new PrintMessageSubCommand()); + + initCommand(new ProducerConnectionSubCommand()); + initCommand(new ConsumerConnectionSubCommand()); + initCommand(new ConsumerProgressSubCommand()); + initCommand(new ConsumerStatusSubCommand()); + initCommand(new CloneGroupOffsetCommand()); + + initCommand(new ClusterListSubCommand()); + initCommand(new TopicListSubCommand()); + + initCommand(new UpdateKvConfigCommand()); + initCommand(new DeleteKvConfigCommand()); + + initCommand(new WipeWritePermSubCommand()); + initCommand(new ResetOffsetByTimeCommand()); + + initCommand(new UpdateOrderConfCommand()); + initCommand(new CleanExpiredCQSubCommand()); + + initCommand(new StartMonitoringSubCommand()); + initCommand(new CheckMsgSubCommand()); + initCommand(new StatsAllSubCommand()); + } + + + public static void initCommand(SubCommand command) { + subCommandList.add(command); } @@ -122,6 +126,11 @@ public static void main(String[] args) { public static void main0(String[] args, RPCHook rpcHook) { System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion)); + // 检测包冲突 + PackageConflictDetect.detectFastjson(); + + initCommand(); + try { initLogback(); switch (args.length) { diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index 04e43f1cc..000df5b61 100644 --- a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -132,9 +132,6 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { "#TPS",// "#Diff Total"// ); - - List groupConsumeInfoList = new LinkedList(); - TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); for (String topic : topicList.getTopicList()) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { @@ -172,7 +169,15 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { groupConsumeInfo.setVersion(cc.computeMinVersion()); } - groupConsumeInfoList.add(groupConsumeInfo); + System.out.printf("%-32s %-6d %-24s %-5s %-14s %-7d %d\n",// + UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32),// + groupConsumeInfo.getCount(),// + groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE",// + groupConsumeInfo.consumeTypeDesc(),// + groupConsumeInfo.messageModelDesc(),// + groupConsumeInfo.getConsumeTps(),// + groupConsumeInfo.getDiffTotal()// + ); } catch (Exception e) { log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " @@ -180,20 +185,6 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { } } } - - Collections.sort(groupConsumeInfoList); - - for (GroupConsumeInfo info : groupConsumeInfoList) { - System.out.printf("%-32s %-6d %-24s %-5s %-14s %-7d %d\n",// - UtilAll.frontStringAtLeast(info.getGroup(), 32),// - info.getCount(),// - info.versionDesc(),// - info.consumeTypeDesc(),// - info.messageModelDesc(),// - info.getConsumeTps(),// - info.getDiffTotal()// - ); - } } } catch (Exception e) { diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java new file mode 100644 index 000000000..b23a87c77 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java @@ -0,0 +1,191 @@ +package com.alibaba.rocketmq.tools.command.stats; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.protocol.body.BrokerStatsData; +import com.alibaba.rocketmq.common.protocol.body.GroupList; +import com.alibaba.rocketmq.common.protocol.body.TopicList; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; +import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt; +import com.alibaba.rocketmq.tools.command.MQAdminStartup; +import com.alibaba.rocketmq.tools.command.SubCommand; + + +public class StatsAllSubCommand implements SubCommand { + + @Override + public String commandName() { + return "statsAll"; + } + + + @Override + public String commandDesc() { + return "Topic and Consumer tps stats"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("a", "activeTopic", false, "print active topic only"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + public static long compute24HourSum(BrokerStatsData bsd) { + if (bsd.getStatsDay().getSum() != 0) { + return bsd.getStatsDay().getSum(); + } + + if (bsd.getStatsHour().getSum() != 0) { + return bsd.getStatsHour().getSum(); + } + + if (bsd.getStatsMinute().getSum() != 0) { + return bsd.getStatsMinute().getSum(); + } + + return 0; + } + + + public static void printTopicDetail(final DefaultMQAdminExt admin, final String topic, + final boolean activeTopic) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic); + + GroupList groupList = admin.queryTopicConsumeByWho(topic); + + double inTPS = 0; + + long inMsgCntToday = 0; + + // 统计Topic写入 + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + try { + BrokerStatsData bsd = + admin.ViewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic); + inTPS += bsd.getStatsMinute().getTps(); + inMsgCntToday += compute24HourSum(bsd); + } + catch (Exception e) { + } + } + } + + if (groupList != null && !groupList.getGroupList().isEmpty()) { + // 统计订阅 + for (String group : groupList.getGroupList()) { + double outTPS = 0; + long outMsgCntToday = 0; + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + try { + String statsKey = String.format("%s@%s", topic, group); + BrokerStatsData bsd = + admin.ViewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, + statsKey); + outTPS += bsd.getStatsMinute().getTps(); + outMsgCntToday += compute24HourSum(bsd); + } + catch (Exception e) { + } + } + } + + if (!activeTopic || (inMsgCntToday > 0) || // + (outMsgCntToday > 0)) { + // 打印 + System.out.printf("%-32s %-32s %11.2f %11.2f %14d %14d\n",// + UtilAll.frontStringAtLeast(topic, 32),// + UtilAll.frontStringAtLeast(group, 32),// + inTPS,// + outTPS,// + inMsgCntToday,// + outMsgCntToday// + ); + } + } + } + // 没有订阅者 + else { + if (!activeTopic || (inMsgCntToday > 0)) { + // 打印 + System.out.printf("%-32s %-32s %11.2f %11s %14d %14s\n",// + UtilAll.frontStringAtLeast(topic, 32),// + "",// + inTPS,// + "",// + inMsgCntToday,// + "NO_CONSUMER"// + ); + } + } + } + + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); + + System.out.printf("%-32s %-32s %11s %11s %14s %14s\n",// + "#Topic",// + "#Consumer Group",// + "#InTPS",// + "#OutTPS",// + "#InMsg24Hour",// + "#OutMsg24Hour"// + ); + + boolean activeTopic = commandLine.hasOption('a'); + + for (String topic : topicList.getTopicList()) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + continue; + } + + try { + printTopicDetail(defaultMQAdminExt, topic, activeTopic); + } + catch (Exception e) { + } + } + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + defaultMQAdminExt.shutdown(); + } + } + + + public static void main(String[] args) { + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "10.101.87.102:9876"); + MQAdminStartup.main(new String[] { new StatsAllSubCommand().commandName() }); + } +} diff --git a/sbin/github.sh b/sbin/github.sh new file mode 100644 index 000000000..11aee7437 --- /dev/null +++ b/sbin/github.sh @@ -0,0 +1,6 @@ +curl -k https://github.com/alibaba/RocketMQ/issues/1 > z + +grep -o '\b[0-9a-zA-Z_.\-]\+@[0-9a-zA-Z_]\+\.[0-9a-zA-Z_]\+\b' z |uniq > z1 + +echo "RocketMQ github user==========================" +cat z1 diff --git a/wiki/quickstart.md b/wiki/quickstart.md new file mode 100644 index 000000000..1b33dbbaa --- /dev/null +++ b/wiki/quickstart.md @@ -0,0 +1,6 @@ +## 快速开始 + +### 1、搭建RocketMQ环境 + +### 2、运行示例代码 +