Skip to content

Commit

Permalink
[ISSUE apache#7699] Add namespace v2 in client (apache#7700)
Browse files Browse the repository at this point in the history
* Add namespace v2

* Add NamespaceRpcHook

* Refector extends header

* Use Boolean in request header to remove unnecessary encode

* Add unit test

* Add NamespaceRpcHookTest

* Remove GrpcConverter#wrapResourceWithNamespace

* Optimize readability of RpcRequestHeader
drpmma authored Jan 12, 2024
1 parent d2b818d commit b1d8d30
Showing 82 changed files with 651 additions and 318 deletions.
Original file line number Diff line number Diff line change
@@ -1377,7 +1377,7 @@ public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String b
requestHeader.setSubVersion(System.currentTimeMillis());
requestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
requestHeader.setExpressionType(ExpressionType.TAG);
requestHeader.setBname(brokerName);
requestHeader.setBrokerName(brokerName);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
CompletableFuture<PullResult> pullResultFuture = new CompletableFuture<>();
Original file line number Diff line number Diff line change
@@ -1019,7 +1019,7 @@ private RemotingCommand rewriteRequestForStaticTopic(SearchOffsetRequestHeader r
requestHeader.setLo(false);
requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId());
requestHeader.setBname(item.getBname());
requestHeader.setBrokerName(item.getBname());
RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
@@ -1086,7 +1086,7 @@ private RemotingCommand rewriteRequestForStaticTopic(GetMaxOffsetRequestHeader r
LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), Long.MAX_VALUE, true);
assert maxItem != null;
assert maxItem.getLogicOffset() >= 0;
requestHeader.setBname(maxItem.getBname());
requestHeader.setBrokerName(maxItem.getBname());
requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId());

@@ -1152,7 +1152,7 @@ private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(RpcReque
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null;
try {
requestHeader.setBname(mappingItem.getBname());
requestHeader.setBrokerName(mappingItem.getBname());
requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId());
long physicalOffset;
@@ -1219,7 +1219,7 @@ private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequ
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null;
try {
requestHeader.setBname(mappingItem.getBname());
requestHeader.setBrokerName(mappingItem.getBname());
requestHeader.setLo(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
//TO DO check if it is in current broker
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRe
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true);
requestHeader.setQueueId(mappingItem.getQueueId());
requestHeader.setLo(false);
requestHeader.setBname(mappingItem.getBname());
requestHeader.setBrokerName(mappingItem.getBname());
requestHeader.setCommitOffset(mappingItem.computePhysicalQueueOffset(globalOffset));
//leader, let it go, do not need to rewrite the response
if (mappingDetail.getBname().equals(mappingItem.getBname())) {
@@ -237,7 +237,7 @@ public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHe
}
} else {
//maybe we need to reconstruct an object
requestHeader.setBname(mappingItem.getBname());
requestHeader.setBrokerName(mappingItem.getBname());
requestHeader.setQueueId(mappingItem.getQueueId());
requestHeader.setLo(false);
requestHeader.setSetZeroIfNotFound(false);
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader re

int sysFlag = requestHeader.getSysFlag();
requestHeader.setLo(false);
requestHeader.setBname(bname);
requestHeader.setBrokerName(bname);
sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
requestHeader.setSysFlag(sysFlag);
Original file line number Diff line number Diff line change
@@ -137,7 +137,7 @@ public void cleanItemExpired() {
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
header.setBrokerName(broker);
header.setLo(false);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
@@ -265,7 +265,7 @@ public void cleanItemListMoreThanSecondGen() {
String broker = entry.getValue();
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setBname(broker);
header.setBrokerName(broker);
header.setLo(true);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception {
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
checkTransactionStateRequestHeader.setBrokerName(brokerController.getBrokerConfig().getBrokerName());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
21 changes: 21 additions & 0 deletions client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
Original file line number Diff line number Diff line change
@@ -45,8 +45,10 @@ public class ClientConfig {
private String clientIP = NetworkUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
@Deprecated
protected String namespace;
private boolean namespaceInitialized = false;
protected String namespaceV2;
protected AccessChannel accessChannel = AccessChannel.LOCAL;

/**
@@ -137,10 +139,12 @@ public void changeInstanceNameToPID() {
}
}

@Deprecated
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}

@Deprecated
public Set<String> withNamespace(Set<String> resourceSet) {
Set<String> resourceWithNamespace = new HashSet<>();
for (String resource : resourceSet) {
@@ -149,10 +153,12 @@ public Set<String> withNamespace(Set<String> resourceSet) {
return resourceWithNamespace;
}

@Deprecated
public String withoutNamespace(String resource) {
return NamespaceUtil.withoutNamespace(resource, this.getNamespace());
}

@Deprecated
public Set<String> withoutNamespace(Set<String> resourceSet) {
Set<String> resourceWithoutNamespace = new HashSet<>();
for (String resource : resourceSet) {
@@ -161,13 +167,15 @@ public Set<String> withoutNamespace(Set<String> resourceSet) {
return resourceWithoutNamespace;
}

@Deprecated
public MessageQueue queueWithNamespace(MessageQueue queue) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
}
return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
}

@Deprecated
public Collection<MessageQueue> queuesWithNamespace(Collection<MessageQueue> queues) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queues;
@@ -206,6 +214,7 @@ public void resetClientConfig(final ClientConfig cc) {
this.enableHeartbeatChannelEventListener = cc.enableHeartbeatChannelEventListener;
this.detectInterval = cc.detectInterval;
this.detectTimeout = cc.detectTimeout;
this.namespaceV2 = cc.namespaceV2;
}

public ClientConfig cloneClientConfig() {
@@ -235,6 +244,7 @@ public ClientConfig cloneClientConfig() {
cc.sendLatencyEnable = sendLatencyEnable;
cc.detectInterval = detectInterval;
cc.detectTimeout = detectTimeout;
cc.namespaceV2 = namespaceV2;
return cc;
}

@@ -359,6 +369,7 @@ public void setDecodeDecompressBody(boolean decodeDecompressBody) {
this.decodeDecompressBody = decodeDecompressBody;
}

@Deprecated
public String getNamespace() {
if (namespaceInitialized) {
return namespace;
@@ -377,11 +388,20 @@ public String getNamespace() {
return namespace;
}

@Deprecated
public void setNamespace(String namespace) {
this.namespace = namespace;
this.namespaceInitialized = true;
}

public String getNamespaceV2() {
return namespaceV2;
}

public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}

public AccessChannel getAccessChannel() {
return this.accessChannel;
}
@@ -463,6 +483,7 @@ public String toString() {
", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads +
", namespace='" + namespace + '\'' +
", namespaceInitialized=" + namespaceInitialized +
", namespaceV2='" + namespaceV2 + '\'' +
", accessChannel=" + accessChannel +
", pollNameServerInterval=" + pollNameServerInterval +
", heartbeatBrokerInterval=" + heartbeatBrokerInterval +
Original file line number Diff line number Diff line change
@@ -183,7 +183,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
* Default constructor.
*/
public DefaultLitePullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
}

/**
@@ -192,7 +192,7 @@ public DefaultLitePullConsumer() {
* @param consumerGroup Consumer group.
*/
public DefaultLitePullConsumer(final String consumerGroup) {
this(null, consumerGroup, null);
this(consumerGroup, null);
}

/**
@@ -201,7 +201,7 @@ public DefaultLitePullConsumer(final String consumerGroup) {
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
}

/**
@@ -211,7 +211,9 @@ public DefaultLitePullConsumer(RPCHook rpcHook) {
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook);
this.consumerGroup = consumerGroup;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}

/**
@@ -220,6 +222,7 @@ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
@Deprecated
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
Original file line number Diff line number Diff line change
@@ -89,23 +89,21 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
private int maxReconsumeTimes = 16;

public DefaultMQPullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
}

public DefaultMQPullConsumer(final String consumerGroup) {
this(null, consumerGroup, null);
this(consumerGroup, null);
}

public DefaultMQPullConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
}

public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook);
}

public DefaultMQPullConsumer(final String namespace, final String consumerGroup) {
this(namespace, consumerGroup, null);
this.consumerGroup = consumerGroup;
this.enableStreamRequestType = true;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
}

/**
Loading
Oops, something went wrong.

0 comments on commit b1d8d30

Please sign in to comment.