From b1d8d306a54321e1d729a095b7b6962776636dae Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Fri, 12 Jan 2024 10:46:15 +0800 Subject: [PATCH] [ISSUE #7699] Add namespace v2 in client (#7700) * Add namespace v2 * Add NamespaceRpcHook * Refector extends header * Use Boolean in request header to remove unnecessary encode * Add unit test * Add NamespaceRpcHookTest * Remove GrpcConverter#wrapResourceWithNamespace * Optimize readability of RpcRequestHeader --- .../rocketmq/broker/out/BrokerOuterAPI.java | 2 +- .../processor/AdminBrokerProcessor.java | 8 +- .../processor/ConsumerManageProcessor.java | 4 +- .../processor/PullMessageProcessor.java | 2 +- .../topic/TopicQueueMappingCleanService.java | 4 +- ...ractTransactionalMessageCheckListener.java | 2 +- .../apache/rocketmq/client/ClientConfig.java | 21 +++ .../consumer/DefaultLitePullConsumer.java | 11 +- .../consumer/DefaultMQPullConsumer.java | 14 +- .../consumer/DefaultMQPushConsumer.java | 100 +++++++------ .../store/RemoteBrokerOffsetStore.java | 4 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 19 +-- .../consumer/DefaultMQPushConsumerImpl.java | 4 +- .../client/impl/consumer/PullAPIWrapper.java | 4 +- .../client/impl/mqclient/MQClientAPIExt.java | 5 +- .../impl/producer/DefaultMQProducerImpl.java | 6 +- .../client/producer/DefaultMQProducer.java | 133 ++++++++---------- .../producer/TransactionMQProducer.java | 26 ++-- .../client/rpchook/NamespaceRpcHook.java | 51 +++++++ .../client/rpchook/NamespaceRpcHookTest.java | 56 ++++++++ .../trace/DefaultMQConsumerWithTraceTest.java | 2 +- .../trace/DefaultMQProducerWithTraceTest.java | 2 +- .../TransactionMQProducerWithTraceTest.java | 2 +- .../benchmark/TransactionProducer.java | 1 - .../namespace/ProducerWithNamespace.java | 3 +- .../namespace/PullConsumerWithNamespace.java | 3 +- .../namespace/PushConsumerWithNamespace.java | 3 +- .../example/tracemessage/TraceProducer.java | 2 +- .../tracemessage/TracePushConsumer.java | 2 +- .../proxy/grpc/v2/client/ClientActivity.java | 14 +- .../v2/common/GrpcClientSettingsManager.java | 8 +- .../proxy/grpc/v2/common/GrpcConverter.java | 4 - .../proxy/grpc/v2/common/GrpcValidator.java | 4 +- .../grpc/v2/consumer/AckMessageActivity.java | 5 +- .../ChangeInvisibleDurationActivity.java | 5 +- .../v2/consumer/ReceiveMessageActivity.java | 4 +- .../ReceiveMessageResponseStreamWriter.java | 4 +- .../producer/ForwardMessageToDLQActivity.java | 7 +- .../grpc/v2/producer/SendMessageActivity.java | 7 +- .../proxy/grpc/v2/route/RouteActivity.java | 7 +- .../transaction/EndTransactionActivity.java | 3 +- .../AbstractSystemMessageSyncer.java | 2 +- .../activity/PullMessageActivityTest.java | 6 +- .../activity/SendMessageActivityTest.java | 2 +- .../header/CloneGroupOffsetRequestHeader.java | 4 +- ...umeMessageDirectlyResultRequestHeader.java | 4 +- .../header/CreateTopicRequestHeader.java | 4 +- .../DeleteSubscriptionGroupRequestHeader.java | 4 +- .../header/DeleteTopicRequestHeader.java | 4 +- .../header/GetConsumeStatsRequestHeader.java | 4 +- ...etConsumerConnectionListRequestHeader.java | 4 +- .../GetConsumerListByGroupRequestHeader.java | 4 +- .../GetConsumerRunningInfoRequestHeader.java | 4 +- .../GetConsumerStatusRequestHeader.java | 4 +- ...etProducerConnectionListRequestHeader.java | 4 +- ...tSubscriptionGroupConfigRequestHeader.java | 4 +- .../header/HeartbeatRequestHeader.java | 29 ++++ .../InitConsumerOffsetRequestHeader.java | 4 +- ...NotifyConsumerIdsChangedRequestHeader.java | 4 +- .../QueryConsumeQueueRequestHeader.java | 8 +- .../QueryConsumeTimeSpanRequestHeader.java | 4 +- .../header/QueryCorrectionOffsetHeader.java | 4 +- .../header/QueryMessageRequestHeader.java | 4 +- ...rySubscriptionByConsumerRequestHeader.java | 4 +- .../QueryTopicConsumeByWhoRequestHeader.java | 4 +- .../QueryTopicsByConsumerRequestHeader.java | 4 +- .../header/ReplyMessageRequestHeader.java | 4 +- .../header/ResetOffsetRequestHeader.java | 8 +- .../header/SendMessageRequestHeaderV2.java | 39 +++-- .../StatisticsMessagesRequestHeader.java | 8 +- .../header/UnregisterClientRequestHeader.java | 4 +- .../UpdateGroupForbiddenRequestHeader.java | 4 +- .../DeleteTopicFromNamesrvRequestHeader.java | 4 +- .../namesrv/GetRouteInfoRequestHeader.java | 4 +- .../namesrv/RegisterTopicRequestHeader.java | 4 +- .../rocketmq/remoting/rpc/RequestBuilder.java | 8 +- .../rocketmq/remoting/rpc/RpcClientImpl.java | 2 +- .../remoting/rpc/RpcRequestHeader.java | 63 +++++++-- .../SendMessageRequestHeaderV2Test.java | 51 +++++++ .../remoting/rpc/RpcRequestHeaderTest.java | 64 +++++++++ .../rocketmq/store/kv/MessageFetcher.java | 3 +- .../tools/admin/DefaultMQAdminExtImpl.java | 4 +- 82 files changed, 651 insertions(+), 318 deletions(-) create mode 100644 client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java create mode 100644 client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/HeartbeatRequestHeader.java create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2Test.java create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeaderTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 6fde48dd995..3827beb5b65 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -1377,7 +1377,7 @@ public CompletableFuture pullMessageFromSpecificBrokerAsync(String b requestHeader.setSubVersion(System.currentTimeMillis()); requestHeader.setMaxMsgBytes(Integer.MAX_VALUE); requestHeader.setExpressionType(ExpressionType.TAG); - requestHeader.setBname(brokerName); + requestHeader.setBrokerName(brokerName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); CompletableFuture pullResultFuture = new CompletableFuture<>(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4cc6d53b155..67a4d45447c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1019,7 +1019,7 @@ private RemotingCommand rewriteRequestForStaticTopic(SearchOffsetRequestHeader r requestHeader.setLo(false); requestHeader.setTimestamp(timestamp); requestHeader.setQueueId(item.getQueueId()); - requestHeader.setBname(item.getBname()); + requestHeader.setBrokerName(item.getBname()); RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { @@ -1086,7 +1086,7 @@ private RemotingCommand rewriteRequestForStaticTopic(GetMaxOffsetRequestHeader r LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), Long.MAX_VALUE, true); assert maxItem != null; assert maxItem.getLogicOffset() >= 0; - requestHeader.setBname(maxItem.getBname()); + requestHeader.setBrokerName(maxItem.getBname()); requestHeader.setLo(false); requestHeader.setQueueId(mappingItem.getQueueId()); @@ -1152,7 +1152,7 @@ private CompletableFuture handleGetMinOffsetForStaticTopic(RpcReque LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); assert mappingItem != null; try { - requestHeader.setBname(mappingItem.getBname()); + requestHeader.setBrokerName(mappingItem.getBname()); requestHeader.setLo(false); requestHeader.setQueueId(mappingItem.getQueueId()); long physicalOffset; @@ -1219,7 +1219,7 @@ private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequ LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); assert mappingItem != null; try { - requestHeader.setBname(mappingItem.getBname()); + requestHeader.setBrokerName(mappingItem.getBname()); requestHeader.setLo(false); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null); //TO DO check if it is in current broker diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 9c6d28d3dcf..e16a1e9090f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -124,7 +124,7 @@ public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRe LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true); requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setLo(false); - requestHeader.setBname(mappingItem.getBname()); + requestHeader.setBrokerName(mappingItem.getBname()); requestHeader.setCommitOffset(mappingItem.computePhysicalQueueOffset(globalOffset)); //leader, let it go, do not need to rewrite the response if (mappingDetail.getBname().equals(mappingItem.getBname())) { @@ -237,7 +237,7 @@ public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHe } } else { //maybe we need to reconstruct an object - requestHeader.setBname(mappingItem.getBname()); + requestHeader.setBrokerName(mappingItem.getBname()); requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setLo(false); requestHeader.setSetZeroIfNotFound(false); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index b2794b1289b..ea9c327e98a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -129,7 +129,7 @@ private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader re int sysFlag = requestHeader.getSysFlag(); requestHeader.setLo(false); - requestHeader.setBname(bname); + requestHeader.setBrokerName(bname); sysFlag = PullSysFlag.clearSuspendFlag(sysFlag); sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag); requestHeader.setSysFlag(sysFlag); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java index 7047ef8b47a..c4fd4c62082 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java @@ -137,7 +137,7 @@ public void cleanItemExpired() { for (String broker: brokers) { GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); header.setTopic(topic); - header.setBname(broker); + header.setBrokerName(broker); header.setLo(false); try { RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); @@ -265,7 +265,7 @@ public void cleanItemListMoreThanSecondGen() { String broker = entry.getValue(); GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); header.setTopic(topic); - header.setBname(broker); + header.setBrokerName(broker); header.setLo(true); try { RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index 982355d783f..b5a86d3d083 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -55,7 +55,7 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception { checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); - checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName()); + checkTransactionStateRequestHeader.setBrokerName(brokerController.getBrokerConfig().getBrokerName()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index f9843cc0231..8a7beffc704 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -45,8 +45,10 @@ public class ClientConfig { private String clientIP = NetworkUtil.getLocalAddress(); private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); + @Deprecated protected String namespace; private boolean namespaceInitialized = false; + protected String namespaceV2; protected AccessChannel accessChannel = AccessChannel.LOCAL; /** @@ -137,10 +139,12 @@ public void changeInstanceNameToPID() { } } + @Deprecated public String withNamespace(String resource) { return NamespaceUtil.wrapNamespace(this.getNamespace(), resource); } + @Deprecated public Set withNamespace(Set resourceSet) { Set resourceWithNamespace = new HashSet<>(); for (String resource : resourceSet) { @@ -149,10 +153,12 @@ public Set withNamespace(Set resourceSet) { return resourceWithNamespace; } + @Deprecated public String withoutNamespace(String resource) { return NamespaceUtil.withoutNamespace(resource, this.getNamespace()); } + @Deprecated public Set withoutNamespace(Set resourceSet) { Set resourceWithoutNamespace = new HashSet<>(); for (String resource : resourceSet) { @@ -161,6 +167,7 @@ public Set withoutNamespace(Set resourceSet) { return resourceWithoutNamespace; } + @Deprecated public MessageQueue queueWithNamespace(MessageQueue queue) { if (StringUtils.isEmpty(this.getNamespace())) { return queue; @@ -168,6 +175,7 @@ public MessageQueue queueWithNamespace(MessageQueue queue) { return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId()); } + @Deprecated public Collection queuesWithNamespace(Collection queues) { if (StringUtils.isEmpty(this.getNamespace())) { return queues; @@ -206,6 +214,7 @@ public void resetClientConfig(final ClientConfig cc) { this.enableHeartbeatChannelEventListener = cc.enableHeartbeatChannelEventListener; this.detectInterval = cc.detectInterval; this.detectTimeout = cc.detectTimeout; + this.namespaceV2 = cc.namespaceV2; } public ClientConfig cloneClientConfig() { @@ -235,6 +244,7 @@ public ClientConfig cloneClientConfig() { cc.sendLatencyEnable = sendLatencyEnable; cc.detectInterval = detectInterval; cc.detectTimeout = detectTimeout; + cc.namespaceV2 = namespaceV2; return cc; } @@ -359,6 +369,7 @@ public void setDecodeDecompressBody(boolean decodeDecompressBody) { this.decodeDecompressBody = decodeDecompressBody; } + @Deprecated public String getNamespace() { if (namespaceInitialized) { return namespace; @@ -377,11 +388,20 @@ public String getNamespace() { return namespace; } + @Deprecated public void setNamespace(String namespace) { this.namespace = namespace; this.namespaceInitialized = true; } + public String getNamespaceV2() { + return namespaceV2; + } + + public void setNamespaceV2(String namespaceV2) { + this.namespaceV2 = namespaceV2; + } + public AccessChannel getAccessChannel() { return this.accessChannel; } @@ -463,6 +483,7 @@ public String toString() { ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", namespace='" + namespace + '\'' + ", namespaceInitialized=" + namespaceInitialized + + ", namespaceV2='" + namespaceV2 + '\'' + ", accessChannel=" + accessChannel + ", pollNameServerInterval=" + pollNameServerInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 5e5bd4daaaa..c193c6a42e4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -183,7 +183,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon * Default constructor. */ public DefaultLitePullConsumer() { - this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); + this(MixAll.DEFAULT_CONSUMER_GROUP, null); } /** @@ -192,7 +192,7 @@ public DefaultLitePullConsumer() { * @param consumerGroup Consumer group. */ public DefaultLitePullConsumer(final String consumerGroup) { - this(null, consumerGroup, null); + this(consumerGroup, null); } /** @@ -201,7 +201,7 @@ public DefaultLitePullConsumer(final String consumerGroup) { * @param rpcHook RPC hook to execute before each remoting command. */ public DefaultLitePullConsumer(RPCHook rpcHook) { - this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); } /** @@ -211,7 +211,9 @@ public DefaultLitePullConsumer(RPCHook rpcHook) { * @param rpcHook RPC hook to execute before each remoting command. */ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { - this(null, consumerGroup, rpcHook); + this.consumerGroup = consumerGroup; + this.enableStreamRequestType = true; + defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); } /** @@ -220,6 +222,7 @@ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { * @param consumerGroup Consumer group. * @param rpcHook RPC hook to execute before each remoting command. */ + @Deprecated public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { this.namespace = namespace; this.consumerGroup = consumerGroup; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index e5cd5415534..499f7731915 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -89,23 +89,21 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume private int maxReconsumeTimes = 16; public DefaultMQPullConsumer() { - this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); + this(MixAll.DEFAULT_CONSUMER_GROUP, null); } public DefaultMQPullConsumer(final String consumerGroup) { - this(null, consumerGroup, null); + this(consumerGroup, null); } public DefaultMQPullConsumer(RPCHook rpcHook) { - this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); } public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { - this(null, consumerGroup, rpcHook); - } - - public DefaultMQPullConsumer(final String namespace, final String consumerGroup) { - this(namespace, consumerGroup, null); + this.consumerGroup = consumerGroup; + this.enableStreamRequestType = true; + defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index e593a17c98e..224ea67d5fb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -297,7 +297,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * Default constructor. */ public DefaultMQPushConsumer() { - this(null, MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); + this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); } /** @@ -306,38 +306,41 @@ public DefaultMQPushConsumer() { * @param consumerGroup Consumer group. */ public DefaultMQPushConsumer(final String consumerGroup) { - this(null, consumerGroup, null, new AllocateMessageQueueAveragely()); + this(consumerGroup, null, new AllocateMessageQueueAveragely()); } + /** - * Constructor specifying namespace and consumer group. + * Constructor specifying RPC hook. * - * @param namespace Namespace for this MQ Producer instance. - * @param consumerGroup Consumer group. + * @param rpcHook RPC hook to execute before each remoting command. */ - public DefaultMQPushConsumer(final String namespace, final String consumerGroup) { - this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely()); + public DefaultMQPushConsumer(RPCHook rpcHook) { + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); } - /** - * Constructor specifying RPC hook. + * Constructor specifying consumer group, RPC hook. * + * @param consumerGroup Consumer group. * @param rpcHook RPC hook to execute before each remoting command. */ - public DefaultMQPushConsumer(RPCHook rpcHook) { - this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); + public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) { + this.consumerGroup = consumerGroup; + this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); + defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } + /** - * Constructor specifying namespace, consumer group and RPC hook . + * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name. * - * @param namespace Namespace for this MQ Producer instance. * @param consumerGroup Consumer group. - * @param rpcHook RPC hook to execute before each remoting command. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. */ - public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { - this(namespace, consumerGroup, rpcHook, new AllocateMessageQueueAveragely()); + public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { + this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); } /** @@ -349,59 +352,75 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup, */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { - this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy); + this.consumerGroup = consumerGroup; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } /** - * Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm. + * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name. * - * @param namespace Namespace for this MQ Producer instance. * @param consumerGroup Consume queue. * @param rpcHook RPC hook to execute before each remoting command. - * @param allocateMessageQueueStrategy Message queue allocating algorithm. + * @param allocateMessageQueueStrategy message queue allocating algorithm. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. */ - public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, - AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { this.consumerGroup = consumerGroup; - this.namespace = namespace; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + if (enableMsgTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); + dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl); + traceDispatcher = dispatcher; + this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } } /** - * Constructor specifying consumer group and enabled msg trace flag. + * Constructor specifying namespace and consumer group. * + * @param namespace Namespace for this MQ Producer instance. * @param consumerGroup Consumer group. - * @param enableMsgTrace Switch flag instance for message trace. */ - public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) { - this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null); + @Deprecated + public DefaultMQPushConsumer(final String namespace, final String consumerGroup) { + this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely()); } /** - * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name. + * Constructor specifying namespace, consumer group and RPC hook . * + * @param namespace Namespace for this MQ Producer instance. * @param consumerGroup Consumer group. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + * @param rpcHook RPC hook to execute before each remoting command. */ - public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { - this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); + @Deprecated + public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { + this(namespace, consumerGroup, rpcHook, new AllocateMessageQueueAveragely()); } - /** - * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name. + * Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm. * + * @param namespace Namespace for this MQ Producer instance. * @param consumerGroup Consume queue. * @param rpcHook RPC hook to execute before each remoting command. - * @param allocateMessageQueueStrategy message queue allocating algorithm. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. + * @param allocateMessageQueueStrategy Message queue allocating algorithm. */ - public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, - AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { - this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic); + @Deprecated + public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.consumerGroup = consumerGroup; + this.namespace = namespace; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } /** @@ -414,6 +433,7 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, * @param enableMsgTrace Switch flag instance for message trace. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. */ + @Deprecated public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { this.consumerGroup = consumerGroup; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 83d5061adb3..1a2ffe5470f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -218,7 +218,7 @@ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); - requestHeader.setBname(mq.getBrokerName()); + requestHeader.setBrokerName(mq.getBrokerName()); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( @@ -245,7 +245,7 @@ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingExcept requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); - requestHeader.setBname(mq.getBrokerName()); + requestHeader.setBrokerName(mq.getBrokerName()); return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 6074081c10e..f46dbe31247 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -53,6 +53,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.rpchook.NamespaceRpcHook; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -171,6 +172,7 @@ import org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; @@ -257,6 +259,7 @@ public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, this.remotingClient = new NettyRemotingClient(nettyClientConfig, channelEventListener); this.clientRemotingProcessor = clientRemotingProcessor; + this.remotingClient.registerRPCHook(new NamespaceRpcHook(clientConfig)); // Inject stream rpc hook first to make reserve field signature if (clientConfig.isEnableStreamRequestType()) { this.remotingClient.registerRPCHook(new StreamTypeRPCHook()); @@ -1287,7 +1290,7 @@ public long searchOffset(final String addr, final MessageQueue messageQueue, fin SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(messageQueue.getTopic()); requestHeader.setQueueId(messageQueue.getQueueId()); - requestHeader.setBname(messageQueue.getBrokerName()); + requestHeader.setBrokerName(messageQueue.getBrokerName()); requestHeader.setTimestamp(timestamp); requestHeader.setBoundaryType(boundaryType); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); @@ -1312,7 +1315,7 @@ public long getMaxOffset(final String addr, final MessageQueue messageQueue, fin GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); requestHeader.setTopic(messageQueue.getTopic()); requestHeader.setQueueId(messageQueue.getQueueId()); - requestHeader.setBname(messageQueue.getBrokerName()); + requestHeader.setBrokerName(messageQueue.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1364,7 +1367,7 @@ public long getMinOffset(final String addr, final MessageQueue messageQueue, fin GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); requestHeader.setTopic(messageQueue.getTopic()); requestHeader.setQueueId(messageQueue.getQueueId()); - requestHeader.setBname(messageQueue.getBrokerName()); + requestHeader.setBrokerName(messageQueue.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1389,7 +1392,7 @@ public long getEarliestMsgStoretime(final String addr, final MessageQueue mq, fi GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); - requestHeader.setBname(mq.getBrokerName()); + requestHeader.setBrokerName(mq.getBrokerName()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1472,7 +1475,7 @@ public int sendHeartbeat( final HeartbeatData heartbeatData, final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, new HeartbeatRequestHeader()); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); @@ -1493,7 +1496,7 @@ public HeartbeatV2Result sendHeartbeatV2( final HeartbeatData heartbeatData, final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, new HeartbeatRequestHeader()); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); @@ -1565,7 +1568,7 @@ public void queryMessage( public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis) throws RemotingException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, new HeartbeatRequestHeader()); request.setBody(heartbeat.encode()); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); @@ -1590,7 +1593,7 @@ public void consumerSendMessageBack( requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); - requestHeader.setBname(brokerName); + requestHeader.setBrokerName(brokerName); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index d2faed37831..6666c4335eb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -824,7 +824,7 @@ void ackAsync(MessageExt message, String consumerGroup) { requestHeader.setOffset(queueOffset); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setExtraInfo(extraInfo); - requestHeader.setBname(brokerName); + requestHeader.setBrokerName(brokerName); this.mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), ASYNC_TIMEOUT, new AckCallback() { @Override public void onSuccess(AckResult ackResult) { @@ -868,7 +868,7 @@ void changePopInvisibleTimeAsync(String topic, String consumerGroup, String extr requestHeader.setConsumerGroup(consumerGroup); requestHeader.setExtraInfo(extraInfo); requestHeader.setInvisibleTime(invisibleTime); - requestHeader.setBname(brokerName); + requestHeader.setBrokerName(brokerName); //here the broker should be polished this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback); return; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 5180d376ea4..2bd0d9994e5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -231,7 +231,7 @@ public PullResult pullKernelImpl( requestHeader.setSubVersion(subVersion); requestHeader.setMaxMsgBytes(maxSizeInBytes); requestHeader.setExpressionType(expressionType); - requestHeader.setBname(mq.getBrokerName()); + requestHeader.setBrokerName(mq.getBrokerName()); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { @@ -378,7 +378,7 @@ public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String co requestHeader.setExpType(expressionType); requestHeader.setExp(expression); requestHeader.setOrder(order); - requestHeader.setBname(mq.getBrokerName()); + requestHeader.setBrokerName(mq.getBrokerName()); //give 1000 ms for server response if (poll) { requestHeader.setPollTime(timeout); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index f3102e17597..3d8625937cf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -66,6 +66,7 @@ import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; @@ -113,7 +114,7 @@ public CompletableFuture sendHeartbeatOneway( ) { CompletableFuture future = new CompletableFuture<>(); try { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, new HeartbeatRequestHeader()); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); this.getRemotingClient().invokeOneway(brokerAddr, request, timeoutMillis); @@ -129,7 +130,7 @@ public CompletableFuture sendHeartbeatAsync( HeartbeatData heartbeatData, long timeoutMillis ) { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, new HeartbeatRequestHeader()); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 088bff0891c..daab475fc1b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -403,7 +403,7 @@ private void processTransactionState( thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setFromTransactionCheck(true); - thisHeader.setBname(checkRequestHeader.getBname()); + thisHeader.setBrokerName(checkRequestHeader.getBrokerName()); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { @@ -952,7 +952,7 @@ private SendResult sendKernelImpl(final Message msg, requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); - requestHeader.setBname(brokerName); + requestHeader.setBrokerName(brokerName); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { @@ -1486,7 +1486,7 @@ public void endTransaction( EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); - requestHeader.setBname(destBrokerName); + requestHeader.setBrokerName(destBrokerName); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 73a1251539a..5d785a063c2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -171,7 +171,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Default constructor. */ public DefaultMQProducer() { - this(null, MixAll.DEFAULT_PRODUCER_GROUP, null); + this(MixAll.DEFAULT_PRODUCER_GROUP); } /** @@ -180,7 +180,7 @@ public DefaultMQProducer() { * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(RPCHook rpcHook) { - this(null, MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); + this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); } /** @@ -189,31 +189,9 @@ public DefaultMQProducer(RPCHook rpcHook) { * @param producerGroup Producer group, see the name-sake field. */ public DefaultMQProducer(final String producerGroup) { - this(null, producerGroup, null); - } - - /** - * Constructor specifying producer group. - * - * @param producerGroup Producer group, see the name-sake field. - * @param rpcHook RPC hook to execute per each remoting command execution. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. - */ - public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, - final String customizedTraceTopic) { - this(null, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic); - } - - /** - * Constructor specifying producer group. - * - * @param namespace Namespace for this MQ Producer instance. - * @param producerGroup Producer group, see the name-sake field. - */ - public DefaultMQProducer(final String namespace, final String producerGroup) { - this(namespace, producerGroup, null); + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, null); + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } /** @@ -223,18 +201,6 @@ public DefaultMQProducer(final String namespace, final String producerGroup) { * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { - this(null, producerGroup, rpcHook); - } - - /** - * Constructor specifying namespace, producer group and RPC hook. - * - * @param namespace Namespace for this MQ Producer instance. - * @param producerGroup Producer group, see the name-sake field. - * @param rpcHook RPC hook to execute per each remoting command execution. - */ - public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { - this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); @@ -243,27 +209,14 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC /** * Constructor specifying namespace, producer group, topics and RPC hook. * - * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. - * @param topics Topic that needs to be initialized for routing * @param rpcHook RPC hook to execute per each remoting command execution. + * @param topics Topic that needs to be initialized for routing */ - public DefaultMQProducer(final String namespace, final String producerGroup, final List topics, RPCHook rpcHook) { - this.namespace = namespace; - this.producerGroup = producerGroup; + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, + final List topics) { + this(producerGroup, rpcHook); this.topics = topics; - defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); - produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); - } - - /** - * Constructor specifying producer group and enabled msg trace flag. - * - * @param producerGroup Producer group, see the name-sake field. - * @param enableMsgTrace Switch flag instance for message trace. - */ - public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) { - this(null, producerGroup, null, enableMsgTrace, null); } /** @@ -275,26 +228,21 @@ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) { * trace topic name. */ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { - this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic); + this(producerGroup, null, enableMsgTrace, customizedTraceTopic); } /** - * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic - * name. + * Constructor specifying producer group. * - * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. * @param enableMsgTrace Switch flag instance for message trace. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default * trace topic name. */ - public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, - boolean enableMsgTrace, final String customizedTraceTopic) { - this.namespace = namespace; - this.producerGroup = producerGroup; - defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); - produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, + final String customizedTraceTopic) { + this(producerGroup, rpcHook); //if client open the message trace feature if (enableMsgTrace) { try { @@ -315,21 +263,60 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic * name. * - * @param namespace Namespace for this MQ Producer instance. * @param producerGroup Producer group, see the name-sake field. - * @param topics Topic that needs to be initialized for routing * @param rpcHook RPC hook to execute per each remoting command execution. + * @param topics Topic that needs to be initialized for routing * @param enableMsgTrace Switch flag instance for message trace. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default * trace topic name. */ - public DefaultMQProducer(final String namespace, final String producerGroup, final List topics, - RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, final List topics, + boolean enableMsgTrace, final String customizedTraceTopic) { + this(producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic); + this.topics = topics; + } + + /** + * Constructor specifying producer group. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + */ + @Deprecated + public DefaultMQProducer(final String namespace, final String producerGroup) { + this(namespace, producerGroup, null); + } + + /** + * Constructor specifying namespace, producer group and RPC hook. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + */ + @Deprecated + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { this.namespace = namespace; this.producerGroup = producerGroup; - this.topics = topics; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + } + + /** + * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic + * name. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + @Deprecated + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, + boolean enableMsgTrace, final String customizedTraceTopic) { + this(namespace, producerGroup, rpcHook); //if client open the message trace feature if (enableMsgTrace) { try { @@ -337,9 +324,9 @@ public DefaultMQProducer(final String namespace, final String producerGroup, fin dispatcher.setHostProducer(this.defaultMQProducerImpl); traceDispatcher = dispatcher; this.defaultMQProducerImpl.registerSendMessageHook( - new SendMessageTraceHookImpl(traceDispatcher)); + new SendMessageTraceHookImpl(traceDispatcher)); this.defaultMQProducerImpl.registerEndTransactionHook( - new EndTransactionTraceHookImpl(traceDispatcher)); + new EndTransactionTraceHookImpl(traceDispatcher)); } catch (Throwable e) { logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 2c3b479f77b..5c7b437809a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -37,33 +37,31 @@ public TransactionMQProducer() { } public TransactionMQProducer(final String producerGroup) { - this(null, producerGroup, null, null); + super(producerGroup); } public TransactionMQProducer(final String producerGroup, final List topics) { - this(null, producerGroup, topics, null); + super(producerGroup, null, topics); } - public TransactionMQProducer(final String namespace, final String producerGroup) { - this(namespace, producerGroup, null, null); - } - - public TransactionMQProducer(final String namespace, final String producerGroup, final List topics) { - this(namespace, producerGroup, topics, null); + public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { + super(producerGroup, rpcHook, null); } - public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { - this(null, producerGroup, null, rpcHook); + public TransactionMQProducer(final String producerGroup, RPCHook rpcHook, final List topics) { + super(producerGroup, rpcHook, topics); } - public TransactionMQProducer(final String producerGroup, final List topics, RPCHook rpcHook) { - this(null, producerGroup, topics, rpcHook); + public TransactionMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { + super(producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic); } - public TransactionMQProducer(final String namespace, final String producerGroup, final List topics, RPCHook rpcHook) { - super(namespace, producerGroup, topics, rpcHook); + @Deprecated + public TransactionMQProducer(final String namespace, final String producerGroup) { + super(namespace, producerGroup); } + @Deprecated public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { super(namespace, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic); } diff --git a/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java new file mode 100644 index 00000000000..7deee0a9f30 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.rpchook; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; + +public class NamespaceRpcHook implements RPCHook { + private final ClientConfig clientConfig; + + public NamespaceRpcHook(ClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + CommandCustomHeader customHeader = request.readCustomHeader(); + if (customHeader instanceof RpcRequestHeader) { + RpcRequestHeader requestHeader = (RpcRequestHeader) customHeader; + if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) { + requestHeader.setNamespaced(true); + requestHeader.setNamespace(clientConfig.getNamespaceV2()); + } + } + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, + RemotingCommand response) { + + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java new file mode 100644 index 00000000000..1551ce09352 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.rpchook; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class NamespaceRpcHookTest { + private NamespaceRpcHook namespaceRpcHook; + private ClientConfig clientConfig; + private String namespace = "namespace"; + + + @Test + public void testDoBeforeRequestWithNamespace() { + clientConfig = new ClientConfig(); + clientConfig.setNamespaceV2(namespace); + namespaceRpcHook = new NamespaceRpcHook(clientConfig); + PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + namespaceRpcHook.doBeforeRequest("", request); + assertThat(pullMessageRequestHeader.getNamespaced()).isTrue(); + assertThat(pullMessageRequestHeader.getNamespace()).isEqualTo(namespace); + } + + @Test + public void testDoBeforeRequestWithoutNamespace() { + clientConfig = new ClientConfig(); + namespaceRpcHook = new NamespaceRpcHook(clientConfig); + PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + namespaceRpcHook.doBeforeRequest("", request); + assertThat(pullMessageRequestHeader.getNamespaced()).isNull(); + assertThat(pullMessageRequestHeader.getNamespace()).isNull(); + } +} \ No newline at end of file diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 6283abd6b2f..60aa446bbe9 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -242,7 +242,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, @Test public void testPushConsumerWithTraceTLS() { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true, null); consumer.setUseTLS(true); AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index ff1fdfc544b..ee173351852 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -150,7 +150,7 @@ public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws Remotin @Test public void testProducerWithTraceTLS() { - DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true); + DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true, null); producer.setUseTLS(true); AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java index 55d073a1abe..8cf87444c0c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java @@ -106,7 +106,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { return LocalTransactionState.COMMIT_MESSAGE; } }; - producer = new TransactionMQProducer(null, producerGroupTemp, null, true, null); + producer = new TransactionMQProducer(producerGroupTemp, null, true, null); producer.setTransactionListener(transactionListener); producer.setNamesrvAddr("127.0.0.1:9876"); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 34cdeb49dbb..7b6350e3bd1 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -141,7 +141,6 @@ public void run() { } final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); final TransactionMQProducer producer = new TransactionMQProducer( - null, "benchmark_transaction_producer", rpcHook, config.msgTraceEnable, diff --git a/example/src/main/java/org/apache/rocketmq/example/namespace/ProducerWithNamespace.java b/example/src/main/java/org/apache/rocketmq/example/namespace/ProducerWithNamespace.java index a8a920b1d71..2a7af58a6ee 100644 --- a/example/src/main/java/org/apache/rocketmq/example/namespace/ProducerWithNamespace.java +++ b/example/src/main/java/org/apache/rocketmq/example/namespace/ProducerWithNamespace.java @@ -33,7 +33,8 @@ public class ProducerWithNamespace { public static void main(String[] args) throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(NAMESPACE, PRODUCER_GROUP); + DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); + producer.setNamespaceV2(NAMESPACE); producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); producer.start(); diff --git a/example/src/main/java/org/apache/rocketmq/example/namespace/PullConsumerWithNamespace.java b/example/src/main/java/org/apache/rocketmq/example/namespace/PullConsumerWithNamespace.java index 9ca1b35b95b..b5509d31ecc 100644 --- a/example/src/main/java/org/apache/rocketmq/example/namespace/PullConsumerWithNamespace.java +++ b/example/src/main/java/org/apache/rocketmq/example/namespace/PullConsumerWithNamespace.java @@ -34,7 +34,8 @@ public class PullConsumerWithNamespace { private static final Map OFFSET_TABLE = new HashMap<>(); public static void main(String[] args) throws Exception { - DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(NAMESPACE, CONSUMER_GROUP); + DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(CONSUMER_GROUP); + pullConsumer.setNamespaceV2(NAMESPACE); pullConsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); pullConsumer.start(); diff --git a/example/src/main/java/org/apache/rocketmq/example/namespace/PushConsumerWithNamespace.java b/example/src/main/java/org/apache/rocketmq/example/namespace/PushConsumerWithNamespace.java index 181720ea217..f12383a7a32 100644 --- a/example/src/main/java/org/apache/rocketmq/example/namespace/PushConsumerWithNamespace.java +++ b/example/src/main/java/org/apache/rocketmq/example/namespace/PushConsumerWithNamespace.java @@ -27,7 +27,8 @@ public class PushConsumerWithNamespace { public static final String TOPIC = "NAMESPACE_TOPIC"; public static void main(String[] args) throws Exception { - DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(NAMESPACE, CONSUMER_GROUP); + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(CONSUMER_GROUP); + defaultMQPushConsumer.setNamespaceV2(NAMESPACE); defaultMQPushConsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); defaultMQPushConsumer.subscribe(TOPIC, "*"); defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java index add6c432334..72dde674c01 100644 --- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java @@ -34,7 +34,7 @@ public class TraceProducer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true); + DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true, null); // Uncomment the following line while debugging, namesrvAddr should be set to your local address // producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java index a833ee1e454..81c5e31ca58 100644 --- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java @@ -31,7 +31,7 @@ public class TracePushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { // Here,we use the default message track trace topic name - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true, null); // Uncomment the following line while debugging, namesrvAddr should be set to your local address // consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index 8553289498b..59d8432ae88 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -101,7 +101,7 @@ public CompletableFuture heartbeat(ProxyContext ctx, Heartbea switch (clientSettings.getClientType()) { case PRODUCER: { for (Resource topic : clientSettings.getPublishing().getTopicsList()) { - String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(topic); + String topicName = topic.getName(); this.registerProducer(ctx, topicName); } break; @@ -109,7 +109,7 @@ public CompletableFuture heartbeat(ProxyContext ctx, Heartbea case PUSH_CONSUMER: case SIMPLE_CONSUMER: { validateConsumerGroup(request.getGroup()); - String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + String consumerGroup = request.getGroup().getName(); this.registerConsumer(ctx, consumerGroup, clientSettings.getClientType(), clientSettings.getSubscription().getSubscriptionsList(), false); break; } @@ -142,7 +142,7 @@ public CompletableFuture notifyClientTerminatio switch (clientSettings.getClientType()) { case PRODUCER: for (Resource topic : clientSettings.getPublishing().getTopicsList()) { - String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(topic); + String topicName = topic.getName(); GrpcClientChannel channel = this.grpcChannelManager.removeChannel(clientId); if (channel != null) { ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, languageCode, MQVersion.Version.V5_0_0.ordinal()); @@ -153,7 +153,7 @@ public CompletableFuture notifyClientTerminatio case PUSH_CONSUMER: case SIMPLE_CONSUMER: validateConsumerGroup(request.getGroup()); - String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + String consumerGroup = request.getGroup().getName(); GrpcClientChannel channel = this.grpcChannelManager.removeChannel(clientId); if (channel != null) { ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, languageCode, MQVersion.Version.V5_0_0.ordinal()); @@ -241,14 +241,14 @@ protected void processAndWriteClientSettings(ProxyContext ctx, TelemetryCommand case PUBLISHING: for (Resource topic : settings.getPublishing().getTopicsList()) { validateTopic(topic); - String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(topic); + String topicName = topic.getName(); grpcClientChannel = registerProducer(ctx, topicName); grpcClientChannel.setClientObserver(responseObserver); } break; case SUBSCRIPTION: validateConsumerGroup(settings.getSubscription().getGroup()); - String groupName = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup()); + String groupName = settings.getSubscription().getGroup().getName(); grpcClientChannel = registerConsumer(ctx, groupName, settings.getClientType(), settings.getSubscription().getSubscriptionsList(), true); grpcClientChannel.setClientObserver(responseObserver); break; @@ -396,7 +396,7 @@ protected ConsumeType buildConsumeType(ClientType clientType) { protected Set buildSubscriptionDataSet(List subscriptionEntryList) { Set subscriptionDataSet = new HashSet<>(); for (SubscriptionEntry sub : subscriptionEntryList) { - String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(sub.getTopic()); + String topicName = sub.getTopic().getName(); FilterExpression filterExpression = sub.getExpression(); subscriptionDataSet.add(buildSubscriptionData(topicName, filterExpression)); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java index 1eff659392e..e741bd389d7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java @@ -74,8 +74,7 @@ public Settings getClientSettings(ProxyContext ctx) { if (settings.hasPublishing()) { settings = mergeProducerData(settings); } else if (settings.hasSubscription()) { - settings = mergeSubscriptionData(ctx, settings, - GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup())); + settings = mergeSubscriptionData(ctx, settings, settings.getSubscription().getGroup().getName()); } return mergeMetric(settings); } @@ -204,8 +203,7 @@ public Settings removeAndGetClientSettings(ProxyContext ctx) { return null; } if (settings.hasSubscription()) { - settings = mergeSubscriptionData(ctx, settings, - GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup())); + settings = mergeSubscriptionData(ctx, settings, settings.getSubscription().getGroup().getName()); } return mergeMetric(settings); } @@ -231,7 +229,7 @@ protected void onWaitEnd() { if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER)) { return settings; } - String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup()); + String consumerGroup = settings.getSubscription().getGroup().getName(); ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo( ProxyContext.createForInner(this.getClass()), consumerGroup diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java index 4daf8351196..33a4e1312f8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java @@ -64,10 +64,6 @@ public static GrpcConverter getInstance() { return instance; } - public String wrapResourceWithNamespace(Resource resource) { - return NamespaceUtil.wrapNamespace(resource.getResourceNamespace(), resource.getName()); - } - public MessageQueue buildMessageQueue(MessageExt messageExt, String brokerName) { Broker broker = Broker.getDefaultInstance(); if (!StringUtils.isEmpty(brokerName)) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java index cfcd2a26938..a556bfe2710 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcValidator.java @@ -48,7 +48,7 @@ public static GrpcValidator getInstance() { } public void validateTopic(Resource topic) { - validateTopic(GrpcConverter.getInstance().wrapResourceWithNamespace(topic)); + validateTopic(topic.getName()); } public void validateTopic(String topicName) { @@ -63,7 +63,7 @@ public void validateTopic(String topicName) { } public void validateConsumerGroup(Resource consumerGroup) { - validateConsumerGroup(GrpcConverter.getInstance().wrapResourceWithNamespace(consumerGroup)); + validateConsumerGroup(consumerGroup.getName()); } public void validateConsumerGroup(String consumerGroupName) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java index 97c716c8ff3..4a5b9cfcd62 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java @@ -35,7 +35,6 @@ import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; -import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; import org.apache.rocketmq.proxy.processor.BatchAckResult; import org.apache.rocketmq.proxy.processor.MessagingProcessor; @@ -53,8 +52,8 @@ public CompletableFuture ackMessage(ProxyContext ctx, AckMes try { validateTopicAndConsumerGroup(request.getTopic(), request.getGroup()); - String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); - String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()); + String group = request.getGroup().getName(); + String topic = request.getTopic().getName(); if (ConfigurationManager.getProxyConfig().isEnableBatchAck()) { future = ackMessageInBatch(ctx, group, topic, request); } else { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java index 02356c4977f..b7d63a33c43 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java @@ -29,7 +29,6 @@ import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; -import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; import org.apache.rocketmq.proxy.processor.MessagingProcessor; @@ -49,7 +48,7 @@ public CompletableFuture changeInvisibleDuratio validateInvisibleTime(Durations.toMillis(request.getInvisibleDuration())); ReceiptHandle receiptHandle = ReceiptHandle.decode(request.getReceiptHandle()); - String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + String group = request.getGroup().getName(); MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle()); if (messageReceiptHandle != null) { @@ -60,7 +59,7 @@ public CompletableFuture changeInvisibleDuratio receiptHandle, request.getMessageId(), group, - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()), + request.getTopic().getName(), Durations.toMillis(request.getInvisibleDuration()) ).thenApply(ackResult -> convertToChangeInvisibleDurationResponse(ctx, request, ackResult)); } catch (Throwable t) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index cf58bb87a8b..b3550eb4f37 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -94,8 +94,8 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, } validateTopicAndConsumerGroup(request.getMessageQueue().getTopic(), request.getGroup()); - String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getMessageQueue().getTopic()); - String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + String topic = request.getMessageQueue().getTopic().getName(); + String group = request.getGroup().getName(); long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration()); ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java index 7b8e70cf1dc..d0f94e8613a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java @@ -121,8 +121,8 @@ protected void processThrowableWhenWriteMessage(Throwable throwable, ctx, ReceiptHandle.decode(handle), messageExt.getMsgId(), - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()), - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getMessageQueue().getTopic()), + request.getGroup().getName(), + request.getMessageQueue().getTopic().getName(), NACK_INVISIBLE_TIME ); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java index f1fc5a143ad..d0cfc14ce00 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java @@ -25,7 +25,6 @@ import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; -import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -43,7 +42,7 @@ public CompletableFuture forwardMessage try { validateTopicAndConsumerGroup(request.getTopic(), request.getGroup()); - String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + String group = request.getGroup().getName(); String handleString = request.getReceiptHandle(); MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle()); if (messageReceiptHandle != null) { @@ -55,8 +54,8 @@ public CompletableFuture forwardMessage ctx, receiptHandle, request.getMessageId(), - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()), - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()) + request.getGroup().getName(), + request.getTopic().getName() ).thenApply(result -> convertToForwardMessageToDeadLetterQueueResponse(ctx, result)); } catch (Throwable t) { future.completeExceptionally(t); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java index f670df2050e..8679bfbe388 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java @@ -48,7 +48,6 @@ import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; -import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcValidator; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; @@ -80,7 +79,7 @@ public CompletableFuture sendMessage(ProxyContext ctx, Send future = this.messagingProcessor.sendMessage( ctx, new SendMessageQueueSelector(request), - GrpcConverter.getInstance().wrapResourceWithNamespace(topic), + topic.getName(), buildSysFlag(message), buildMessage(ctx, request.getMessagesList(), topic) ).thenApply(result -> convertToSendMessageResponse(ctx, request, result)); @@ -92,7 +91,7 @@ public CompletableFuture sendMessage(ProxyContext ctx, Send protected List buildMessage(ProxyContext context, List protoMessageList, Resource topic) { - String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(topic); + String topicName = topic.getName(); List messageExtList = new ArrayList<>(); for (apache.rocketmq.v2.Message protoMessage : protoMessageList) { if (!protoMessage.getTopic().equals(topic)) { @@ -105,7 +104,7 @@ protected List buildMessage(ProxyContext context, List queryRoute(ProxyContext ctx, QueryR validateTopic(request.getTopic()); List addressList = this.convertToAddressList(request.getEndpoints()); - String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()); + String topicName = request.getTopic().getName(); ProxyTopicRouteData proxyTopicRouteData = this.messagingProcessor.getTopicRouteDataForProxy( ctx, addressList, topicName); @@ -107,11 +106,11 @@ public CompletableFuture queryAssignment(ProxyContext c ProxyTopicRouteData proxyTopicRouteData = this.messagingProcessor.getTopicRouteDataForProxy( ctx, addressList, - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic())); + request.getTopic().getName()); boolean fifo = false; SubscriptionGroupConfig config = this.messagingProcessor.getSubscriptionGroupConfig(ctx, - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup())); + request.getGroup().getName()); if (config != null && config.isConsumeMessageOrderly()) { fifo = true; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java index e65cf2eb4f8..1e114865823 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java @@ -27,7 +27,6 @@ import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; -import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; import org.apache.rocketmq.proxy.processor.MessagingProcessor; @@ -64,7 +63,7 @@ public CompletableFuture endTransaction(ProxyContext ctx ctx, request.getTransactionId(), request.getMessageId(), - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()), + request.getTopic().getName(), transactionStatus, request.getSource().equals(TransactionSource.SOURCE_SERVER_CHECK)) .thenApply(r -> EndTransactionResponse.newBuilder() diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java index fcdc25cacda..734fbeba1b8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java @@ -142,7 +142,7 @@ protected SendMessageRequestHeader buildSendMessageRequestHeader(Message message public void start() throws Exception { this.createSysTopic(); RPCHook rpcHook = this.getRpcHook(); - this.defaultMQPushConsumer = new DefaultMQPushConsumer(null, this.getSystemMessageConsumerId(), rpcHook); + this.defaultMQPushConsumer = new DefaultMQPushConsumer(this.getSystemMessageConsumerId(), rpcHook); this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); this.defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java index a2f1f4cc89f..1dfc7ce3431 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java @@ -95,7 +95,7 @@ public void testPullMessageWithoutSub() throws Exception { header.setCommitOffset(0L); header.setSuspendTimeoutMillis(1000L); header.setSubVersion(0L); - header.setBname(brokerName); + header.setBrokerName(brokerName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, header); request.makeCustomHeaderToNet(); @@ -110,7 +110,7 @@ public void testPullMessageWithoutSub() throws Exception { newHeader.setCommitOffset(0L); newHeader.setSuspendTimeoutMillis(1000L); newHeader.setSubVersion(0L); - newHeader.setBname(brokerName); + newHeader.setBrokerName(brokerName); newHeader.setSubscription(subString); newHeader.setExpressionType(type); RemotingCommand matchRequest = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, newHeader); @@ -146,7 +146,7 @@ public void testPullMessageWithSub() throws Exception { header.setCommitOffset(0L); header.setSuspendTimeoutMillis(1000L); header.setSubVersion(0L); - header.setBname(brokerName); + header.setBrokerName(brokerName); header.setSubscription(subString); header.setExpressionType(type); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java index 9d897642fdb..4b7589c3410 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java @@ -86,7 +86,7 @@ public void testSendMessage() throws Exception { sendMessageRequestHeader.setDefaultTopicQueueNums(0); sendMessageRequestHeader.setQueueId(0); sendMessageRequestHeader.setSysFlag(0); - sendMessageRequestHeader.setBname(brokerName); + sendMessageRequestHeader.setBrokerName(brokerName); sendMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties())); RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, sendMessageRequestHeader); remotingCommand.setBody(message.getBody()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CloneGroupOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CloneGroupOffsetRequestHeader.java index a9e9982af10..0589c0fa828 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CloneGroupOffsetRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CloneGroupOffsetRequestHeader.java @@ -21,11 +21,11 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class CloneGroupOffsetRequestHeader implements CommandCustomHeader { +public class CloneGroupOffsetRequestHeader extends RpcRequestHeader { @CFNotNull private String srcGroup; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java index de9a4a50128..f56ad5b59da 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class ConsumeMessageDirectlyResultRequestHeader implements CommandCustomHeader { +public class ConsumeMessageDirectlyResultRequestHeader extends TopicRequestHeader { @CFNotNull private String consumerGroup; @CFNullable diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicRequestHeader.java index b68c5197ee8..faddd9f461e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicRequestHeader.java @@ -22,12 +22,12 @@ import com.google.common.base.MoreObjects; import org.apache.rocketmq.common.TopicFilterType; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class CreateTopicRequestHeader implements CommandCustomHeader { +public class CreateTopicRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteSubscriptionGroupRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteSubscriptionGroupRequestHeader.java index 26126f77432..4548454853f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteSubscriptionGroupRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteSubscriptionGroupRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader { +public class DeleteSubscriptionGroupRequestHeader extends RpcRequestHeader { @CFNotNull private String groupName; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java index 1305a70cc15..d24c1da0786 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class DeleteTopicRequestHeader implements CommandCustomHeader { +public class DeleteTopicRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java index 901de85b100..474811b3bd5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class GetConsumeStatsRequestHeader implements CommandCustomHeader { +public class GetConsumeStatsRequestHeader extends TopicRequestHeader { @CFNotNull private String consumerGroup; private String topic; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerConnectionListRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerConnectionListRequestHeader.java index 7ade0c16738..b572c82f35b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerConnectionListRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerConnectionListRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class GetConsumerConnectionListRequestHeader implements CommandCustomHeader { +public class GetConsumerConnectionListRequestHeader extends RpcRequestHeader { @CFNotNull private String consumerGroup; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerListByGroupRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerListByGroupRequestHeader.java index e16331a3f1e..43161ef3627 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerListByGroupRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerListByGroupRequestHeader.java @@ -18,11 +18,11 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class GetConsumerListByGroupRequestHeader implements CommandCustomHeader { +public class GetConsumerListByGroupRequestHeader extends RpcRequestHeader { @CFNotNull private String consumerGroup; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerRunningInfoRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerRunningInfoRequestHeader.java index 2adad968e5e..c67e9a15eb3 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerRunningInfoRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerRunningInfoRequestHeader.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class GetConsumerRunningInfoRequestHeader implements CommandCustomHeader { +public class GetConsumerRunningInfoRequestHeader extends RpcRequestHeader { @CFNotNull private String consumerGroup; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerStatusRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerStatusRequestHeader.java index 9aee3d4ae88..de9115a0142 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerStatusRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumerStatusRequestHeader.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class GetConsumerStatusRequestHeader implements CommandCustomHeader { +public class GetConsumerStatusRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetProducerConnectionListRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetProducerConnectionListRequestHeader.java index 2b919e02451..701335191d8 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetProducerConnectionListRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetProducerConnectionListRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class GetProducerConnectionListRequestHeader implements CommandCustomHeader { +public class GetProducerConnectionListRequestHeader extends RpcRequestHeader { @CFNotNull private String producerGroup; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetSubscriptionGroupConfigRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetSubscriptionGroupConfigRequestHeader.java index 885ab256da2..2ad06b4966e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetSubscriptionGroupConfigRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetSubscriptionGroupConfigRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class GetSubscriptionGroupConfigRequestHeader implements CommandCustomHeader { +public class GetSubscriptionGroupConfigRequestHeader extends RpcRequestHeader { @Override public void checkFields() throws RemotingCommandException { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/HeartbeatRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/HeartbeatRequestHeader.java new file mode 100644 index 00000000000..a37383f3a18 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/HeartbeatRequestHeader.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; + +public class HeartbeatRequestHeader extends RpcRequestHeader { + // for namespace + @Override + public void checkFields() throws RemotingCommandException { + + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/InitConsumerOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/InitConsumerOffsetRequestHeader.java index e6d319bca60..ac63b974b93 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/InitConsumerOffsetRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/InitConsumerOffsetRequestHeader.java @@ -16,10 +16,10 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class InitConsumerOffsetRequestHeader implements CommandCustomHeader { +public class InitConsumerOffsetRequestHeader extends TopicRequestHeader { private String topic; // @see ConsumeInitMode diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyConsumerIdsChangedRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyConsumerIdsChangedRequestHeader.java index 40ee9417f18..38271f97549 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyConsumerIdsChangedRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyConsumerIdsChangedRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class NotifyConsumerIdsChangedRequestHeader implements CommandCustomHeader { +public class NotifyConsumerIdsChangedRequestHeader extends RpcRequestHeader { @CFNotNull private String consumerGroup; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeQueueRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeQueueRequestHeader.java index 53cc2a1f55f..a1b45e9b26c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeQueueRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeQueueRequestHeader.java @@ -17,10 +17,10 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; -public class QueryConsumeQueueRequestHeader implements CommandCustomHeader { +public class QueryConsumeQueueRequestHeader extends TopicQueueRequestHeader { private String topic; private int queueId; @@ -36,11 +36,11 @@ public void setTopic(String topic) { this.topic = topic; } - public int getQueueId() { + public Integer getQueueId() { return queueId; } - public void setQueueId(int queueId) { + public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeTimeSpanRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeTimeSpanRequestHeader.java index 370f0160535..9c0aa34b5fa 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeTimeSpanRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumeTimeSpanRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class QueryConsumeTimeSpanRequestHeader implements CommandCustomHeader { +public class QueryConsumeTimeSpanRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryCorrectionOffsetHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryCorrectionOffsetHeader.java index 51099cb5751..8e03ce5c6aa 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryCorrectionOffsetHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryCorrectionOffsetHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class QueryCorrectionOffsetHeader implements CommandCustomHeader { +public class QueryCorrectionOffsetHeader extends TopicRequestHeader { private String filterGroups; @CFNotNull private String compareGroup; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryMessageRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryMessageRequestHeader.java index d89bafbcf88..1a78b9e4fab 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryMessageRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryMessageRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class QueryMessageRequestHeader implements CommandCustomHeader { +public class QueryMessageRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QuerySubscriptionByConsumerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QuerySubscriptionByConsumerRequestHeader.java index 29d9234cd49..1e5a48c5887 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QuerySubscriptionByConsumerRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QuerySubscriptionByConsumerRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class QuerySubscriptionByConsumerRequestHeader implements CommandCustomHeader { +public class QuerySubscriptionByConsumerRequestHeader extends TopicRequestHeader { @CFNotNull private String group; private String topic; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicConsumeByWhoRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicConsumeByWhoRequestHeader.java index 186cdefd90a..5cd9b931c2a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicConsumeByWhoRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicConsumeByWhoRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class QueryTopicConsumeByWhoRequestHeader implements CommandCustomHeader { +public class QueryTopicConsumeByWhoRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicsByConsumerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicsByConsumerRequestHeader.java index c172f612794..e90afdd8ba6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicsByConsumerRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryTopicsByConsumerRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class QueryTopicsByConsumerRequestHeader implements CommandCustomHeader { +public class QueryTopicsByConsumerRequestHeader extends RpcRequestHeader { @CFNotNull private String group; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ReplyMessageRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ReplyMessageRequestHeader.java index 72e02ec9357..eeb022d6d69 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ReplyMessageRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ReplyMessageRequestHeader.java @@ -17,12 +17,12 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; -public class ReplyMessageRequestHeader implements CommandCustomHeader { +public class ReplyMessageRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String producerGroup; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java index 31723f8b829..dd5c043ef62 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ResetOffsetRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; -public class ResetOffsetRequestHeader implements CommandCustomHeader { +public class ResetOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String topic; @@ -71,11 +71,11 @@ public void setForce(boolean isForce) { this.isForce = isForce; } - public int getQueueId() { + public Integer getQueueId() { return queueId; } - public void setQueueId(int queueId) { + public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2.java index 0fd0889bbdb..4b0d795bcd5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2.java @@ -25,11 +25,12 @@ import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.FastCodesHeader; +import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; /** * Use short variable name to speed up FastJson deserialization process. */ -public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader { +public class SendMessageRequestHeaderV2 extends TopicQueueRequestHeader implements CommandCustomHeader, FastCodesHeader { @CFNotNull private String a; // producerGroup; @CFNotNull @@ -51,12 +52,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode @CFNullable private Integer j; // reconsumeTimes; @CFNullable - private boolean k; // unitMode = false; + private Boolean k; // unitMode = false; private Integer l; // consumeRetryTimes @CFNullable - private boolean m; //batch + private Boolean m; //batch @CFNullable private String n; // brokerName @@ -75,7 +76,7 @@ public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final Se v1.setUnitMode(v2.k); v1.setMaxReconsumeTimes(v2.l); v1.setBatch(v2.m); - v1.setBname(v2.n); + v1.setBrokerName(v2.n); return v1; } @@ -94,7 +95,7 @@ public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); v2.m = v1.isBatch(); - v2.n = v1.getBname(); + v2.n = v1.getBrokerName(); return v2; } @@ -274,11 +275,11 @@ public void setJ(Integer j) { this.j = j; } - public boolean isK() { + public Boolean isK() { return k; } - public void setK(boolean k) { + public void setK(Boolean k) { this.k = k; } @@ -290,11 +291,11 @@ public void setL(final Integer l) { this.l = l; } - public boolean isM() { + public Boolean isM() { return m; } - public void setM(boolean m) { + public void setM(Boolean m) { this.m = m; } @@ -317,4 +318,24 @@ public String toString() { .add("n", n) .toString(); } + + @Override + public Integer getQueueId() { + return e; + } + + @Override + public void setQueueId(Integer queueId) { + this.e = queueId; + } + + @Override + public String getTopic() { + return b; + } + + @Override + public void setTopic(String topic) { + this.b = topic; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/StatisticsMessagesRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/StatisticsMessagesRequestHeader.java index 16b7ecb5ced..d8c22e37651 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/StatisticsMessagesRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/StatisticsMessagesRequestHeader.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; -public class StatisticsMessagesRequestHeader implements CommandCustomHeader { +public class StatisticsMessagesRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String consumerGroup; @CFNotNull @@ -52,14 +52,14 @@ public void setTopic(String topic) { this.topic = topic; } - public int getQueueId() { + public Integer getQueueId() { if (queueId < 0) { return -1; } return queueId; } - public void setQueueId(int queueId) { + public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnregisterClientRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnregisterClientRequestHeader.java index 371a5479833..79072195b5e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnregisterClientRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnregisterClientRequestHeader.java @@ -17,12 +17,12 @@ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; -public class UnregisterClientRequestHeader implements CommandCustomHeader { +public class UnregisterClientRequestHeader extends RpcRequestHeader { @CFNotNull private String clientID; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UpdateGroupForbiddenRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UpdateGroupForbiddenRequestHeader.java index de2f9d1fbdf..cfb28df218d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UpdateGroupForbiddenRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UpdateGroupForbiddenRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.remoting.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class UpdateGroupForbiddenRequestHeader implements CommandCustomHeader { +public class UpdateGroupForbiddenRequestHeader extends TopicRequestHeader { @CFNotNull private String group; @CFNotNull diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/DeleteTopicFromNamesrvRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/DeleteTopicFromNamesrvRequestHeader.java index ec0101e7f93..5e5bd8b172e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/DeleteTopicFromNamesrvRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/DeleteTopicFromNamesrvRequestHeader.java @@ -16,11 +16,11 @@ */ package org.apache.rocketmq.remoting.protocol.header.namesrv; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class DeleteTopicFromNamesrvRequestHeader implements CommandCustomHeader { +public class DeleteTopicFromNamesrvRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/GetRouteInfoRequestHeader.java index 0993f81fde0..8de15f3fd6d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/GetRouteInfoRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/GetRouteInfoRequestHeader.java @@ -20,12 +20,12 @@ */ package org.apache.rocketmq.remoting.protocol.header.namesrv; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class GetRouteInfoRequestHeader implements CommandCustomHeader { +public class GetRouteInfoRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/RegisterTopicRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/RegisterTopicRequestHeader.java index ce36ab0f212..d93a05824e5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/RegisterTopicRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/RegisterTopicRequestHeader.java @@ -16,11 +16,11 @@ */ package org.apache.rocketmq.remoting.protocol.header.namesrv; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -public class RegisterTopicRequestHeader implements CommandCustomHeader { +public class RegisterTopicRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RequestBuilder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RequestBuilder.java index 79167ec2668..e2e2c52fd32 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RequestBuilder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RequestBuilder.java @@ -40,8 +40,8 @@ public static RpcRequestHeader buildCommonRpcHeader(int requestCode, Boolean one } try { RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setOway(oneway); - requestHeader.setBname(destBrokerName); + requestHeader.setOneway(oneway); + requestHeader.setBrokerName(destBrokerName); return requestHeader; } catch (Throwable t) { throw new RuntimeException(t); @@ -67,8 +67,8 @@ public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCo } try { TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setOway(oneway); - requestHeader.setBname(destBrokerName); + requestHeader.setOneway(oneway); + requestHeader.setBrokerName(destBrokerName); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); requestHeader.setLo(logic); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java index 5328e8845d8..bca2d79d995 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java @@ -59,7 +59,7 @@ public void registerHook(RpcClientHook hook) { @Override public Future invoke(MessageQueue mq, RpcRequest request, long timeoutMs) throws RpcException { String bname = clientMetadata.getBrokerNameFromMessageQueue(mq); - request.getHeader().setBname(bname); + request.getHeader().setBrokerName(bname); return invoke(request, timeoutMs); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeader.java index ef7e53b4e6b..810d87648a4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeader.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.remoting.rpc; +import com.google.common.base.MoreObjects; +import java.util.Objects; import org.apache.rocketmq.remoting.CommandCustomHeader; public abstract class RpcRequestHeader implements CommandCustomHeader { @@ -28,35 +30,72 @@ public abstract class RpcRequestHeader implements CommandCustomHeader { //oneway protected Boolean oway; + @Deprecated public String getBname() { return bname; } - public void setBname(String bname) { - this.bname = bname; + @Deprecated + public void setBname(String brokerName) { + this.bname = brokerName; } - public Boolean getOway() { - return oway; + public String getBrokerName() { + return bname; } - public void setOway(Boolean oway) { - this.oway = oway; + public void setBrokerName(String brokerName) { + this.bname = brokerName; } - public String getNs() { + public String getNamespace() { return ns; } - public void setNs(String ns) { - this.ns = ns; + public void setNamespace(String namespace) { + this.ns = namespace; } - public Boolean getNsd() { + public Boolean getNamespaced() { return nsd; } - public void setNsd(Boolean nsd) { - this.nsd = nsd; + public void setNamespaced(Boolean namespaced) { + this.nsd = namespaced; + } + + public Boolean getOneway() { + return oway; + } + + public void setOneway(Boolean oneway) { + this.oway = oneway; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RpcRequestHeader header = (RpcRequestHeader) o; + return Objects.equals(ns, header.ns) && Objects.equals(nsd, header.nsd) && Objects.equals(bname, header.bname) && Objects.equals(oway, header.oway); + } + + @Override + public int hashCode() { + return Objects.hash(ns, nsd, bname, oway); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("namespace", ns) + .add("namespaced", nsd) + .add("brokerName", bname) + .add("oneway", oway) + .toString(); } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2Test.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2Test.java new file mode 100644 index 00000000000..c817d8cabe5 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeaderV2Test.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import java.nio.ByteBuffer; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SendMessageRequestHeaderV2Test { + SendMessageRequestHeaderV2 header = new SendMessageRequestHeaderV2(); + String topic = "test"; + int queueId = 5; + + @Test + public void testEncodeDecode() throws RemotingCommandException { + header.setQueueId(queueId); + header.setTopic(topic); + + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, header); + ByteBuffer buffer = remotingCommand.encode(); + + //Simulate buffer being read in NettyDecoder + buffer.getInt(); + byte[] bytes = new byte[buffer.limit() - 4]; + buffer.get(bytes, 0, buffer.limit() - 4); + buffer = ByteBuffer.wrap(bytes); + + RemotingCommand decodeRequest = RemotingCommand.decode(buffer); + assertThat(decodeRequest.getExtFields().get("e")).isEqualTo(String.valueOf(queueId)); + assertThat(decodeRequest.getExtFields().get("b")).isEqualTo(topic); + } +} \ No newline at end of file diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeaderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeaderTest.java new file mode 100644 index 00000000000..78047845910 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/RpcRequestHeaderTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.rpc; + +import java.nio.ByteBuffer; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RpcRequestHeaderTest { + String brokerName = "brokerName1"; + String namespace = "namespace1"; + boolean namespaced = true; + boolean oneway = false; + static class TestRequestHeader extends RpcRequestHeader { + + @Override + public void checkFields() throws RemotingCommandException { + + } + } + + @Test + public void testEncodeDecode() throws RemotingCommandException { + TestRequestHeader requestHeader = new TestRequestHeader(); + requestHeader.setBrokerName(brokerName); + requestHeader.setNamespace(namespace); + requestHeader.setNamespaced(namespaced); + requestHeader.setOneway(oneway); + + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); + ByteBuffer buffer = remotingCommand.encode(); + + //Simulate buffer being read in NettyDecoder + buffer.getInt(); + byte[] bytes = new byte[buffer.limit() - 4]; + buffer.get(bytes, 0, buffer.limit() - 4); + buffer = ByteBuffer.wrap(bytes); + + RemotingCommand decodeRequest = RemotingCommand.decode(buffer); + assertThat(decodeRequest.getExtFields().get("bname")).isEqualTo(brokerName); + assertThat(decodeRequest.getExtFields().get("nsd")).isEqualTo(String.valueOf(namespaced)); + assertThat(decodeRequest.getExtFields().get("ns")).isEqualTo(namespace); + assertThat(decodeRequest.getExtFields().get("oway")).isEqualTo(String.valueOf(oneway)); + } +} \ No newline at end of file diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java index 0ce0a3d8da2..183f0667370 100644 --- a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java +++ b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader; @@ -113,7 +114,7 @@ private boolean prepare(String masterAddr, String topic, String groupName, long heartbeatData.getConsumerDataSet().add(consumerData); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, new HeartbeatRequestHeader()); request.setLanguage(LanguageCode.JAVA); request.setBody(heartbeatData.encode()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 331b24d6068..9447d95bb57 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -981,7 +981,7 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume requestHeader.setTopic(queue.getTopic()); requestHeader.setQueueId(queue.getQueueId()); requestHeader.setCommitOffset(resetOffset); - requestHeader.setBname(queue.getBrokerName()); + requestHeader.setBrokerName(queue.getBrokerName()); this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); } return rollbackStats; @@ -1735,7 +1735,7 @@ public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQ requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); - requestHeader.setBname(mq.getBrokerName()); + requestHeader.setBrokerName(mq.getBrokerName()); this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis); }