diff --git a/conf/meta.conf b/conf/meta.conf index bdf1d2a59..680f60329 100644 --- a/conf/meta.conf +++ b/conf/meta.conf @@ -15,4 +15,5 @@ selfAddress= -membersAddress= \ No newline at end of file +membersAddress= +limitRetainedMessageCount= \ No newline at end of file diff --git a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java index dc073579d..74ba0ee33 100644 --- a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java +++ b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java @@ -30,7 +30,6 @@ public class MetaConf { private static final String CONF_FILE_NAME = "meta.conf"; private File confFile; - private String clusterName = "defaultCluster"; private String allNodeAddress; private String dbPath = System.getProperty("user.home") + "/mqtt_meta/db"; @@ -39,7 +38,7 @@ public class MetaConf { private String selfAddress; private String membersAddress; - + private int limitRetainedMessageCount; private int electionTimeoutMs = 1000; private int snapshotIntervalSecs = 1000; @@ -53,6 +52,7 @@ public MetaConf() throws IOException { in.close(); MixAll.properties2Object(properties, this); this.confFile = new File(classPathResource.getURL().getFile()); + System.out.println(getLimitRetainedMessageCount()); } public File getConfFile() { @@ -138,4 +138,12 @@ public int getRaftGroupNum() { public void setRaftGroupNum(int raftGroupNum) { this.raftGroupNum = raftGroupNum; } + + public int getLimitRetainedMessageCount() { + return limitRetainedMessageCount; + } + + public void setLimitRetainedMessageCount(int limitRetainedMessageCount) { + this.limitRetainedMessageCount = limitRetainedMessageCount; + } } diff --git a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/Constants.java b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/Constants.java index eb7e18ae8..c91423b83 100644 --- a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/Constants.java +++ b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/Constants.java @@ -20,7 +20,7 @@ public class Constants { public static final String COUNTER = "counter"; - public static final String RETAINEDMSG = "retainedmsg"; + public static final String RETAINEDMSG = "retainedMsg"; public static final String READ_INDEX_TYPE = "readIndexType"; } diff --git a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcess.java b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcess.java index fb0b45814..31a1e5822 100644 --- a/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcess.java +++ b/meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcess.java @@ -21,32 +21,34 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.google.protobuf.ByteString; -import org.apache.rocketmq.mqtt.common.model.Message; import org.apache.rocketmq.mqtt.common.model.Trie; import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest; import org.apache.rocketmq.mqtt.common.model.consistency.Response; import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest; import org.apache.rocketmq.mqtt.common.util.TopicUtils; +import org.apache.rocketmq.mqtt.meta.config.MetaConf; import org.apache.rocketmq.mqtt.meta.raft.snapshot.SnapshotOperation; -import org.apache.rocketmq.mqtt.meta.raft.snapshot.impl.CounterSnapshotOperation; - -import java.util.HashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; public class RetainedMsgStateProcess extends StateProcessor { + @Resource + private MetaConf metaConf; + private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcess.class); private final AtomicLong value = new AtomicLong(0); - - private final HashMap retainedMsgMap = new HashMap<>(); //key:topic value:retained msg - - private final HashMap> retainedMsgTopicTrie = new HashMap<>(); //key:firstTopic value:retained topic Trie + private final ConcurrentHashMap retainedMsgMap = new ConcurrentHashMap<>(); //key:topic value:retained msg + private final ConcurrentHashMap> retainedMsgTopicTrie = new ConcurrentHashMap<>(); //key:firstTopic value:retained topic Trie protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - private static final int LIMIT_RETAINED_MESSAGE_COUNT = 100; private SnapshotOperation snapshotOperation; @@ -54,24 +56,38 @@ public class RetainedMsgStateProcess extends StateProcessor { public Response onReadRequest(ReadRequest request) { try { String topic = request.getExtDataMap().get("topic"); - String flag = request.getExtDataMap().get("flag"); + String firstTopic = request.getExtDataMap().get("firstTopic"); + String operation = request.getOperation(); - if (flag.equals("topic")) { //return retained msg + logger.info("FirstTopic:{} Topic:{} Operation:{}",firstTopic,topic,operation); + + if (operation.equals("topic")) { //return retained msg String msg = retainedMsgMap.get(topic); return Response.newBuilder() .setSuccess(true) .setData(ByteString.copyFrom(JSON.toJSONBytes(msg))) .build(); - } else { - if (!retainedMsgTopicTrie.containsKey(topic)) { + } else { //return retain msgs of matched Topic + if (!retainedMsgTopicTrie.containsKey(firstTopic)) { Trie newTrie = new Trie<>(); - retainedMsgTopicTrie.put(topic, newTrie); + retainedMsgTopicTrie.put(firstTopic, newTrie); + } + Trie tmpTrie = retainedMsgTopicTrie.get(firstTopic); + + Set matchTopics = tmpTrie.getAllPath(topic); + + ArrayList msgResults = new ArrayList<>(); + + for (String tmpTopic:matchTopics) { + String msg = retainedMsgMap.get(tmpTopic); + if (msg != null) { + msgResults.add(msg); + } } - Trie tmpTrie = retainedMsgTopicTrie.get(topic); //return firstTopic trie return Response.newBuilder() .setSuccess(true) - .setData(ByteString.copyFrom(JSON.toJSONBytes(tmpTrie))) + .setData(ByteString.copyFrom(JSON.toJSONBytes(msgResults))) //return retained msgs of matched Topic .build(); } } catch (Exception e) { @@ -82,26 +98,24 @@ public Response onReadRequest(ReadRequest request) { } } - boolean setRetainedMsg(String topic, String msg) { + boolean setRetainedMsg(String firstTopic,String topic, boolean isEmpty,String msg) { - - // if message is empty - Message message = JSON.parseObject(msg, Message.class); - - if (!retainedMsgTopicTrie.containsKey(message.getFirstTopic())) { - retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(message.getFirstTopic()), new Trie()); + // if the trie of firstTopic doesn't exist + if (!retainedMsgTopicTrie.containsKey(firstTopic)) { + retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(firstTopic), new Trie()); } - if (message.isEmpty()) { + if (isEmpty) { //delete from trie - retainedMsgMap.put(TopicUtils.normalizeTopic(topic), msg); - retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(message.getFirstTopic())).deleteTrieNode(message.getOriginTopic(), ""); + logger.info("Delete the topic {} retained message", topic); + retainedMsgMap.remove(TopicUtils.normalizeTopic(topic)); + retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)).deleteTrieNode(topic, ""); } else { //Add to trie - Trie trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(message.getFirstTopic())); - if (trie.getNodePath().size() < LIMIT_RETAINED_MESSAGE_COUNT) { + Trie trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)); + if (trie.getNodePath().size() < metaConf.getLimitRetainedMessageCount()) { retainedMsgMap.put(TopicUtils.normalizeTopic(topic), msg); - retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(message.getFirstTopic())).addNode(message.getOriginTopic(), "", ""); + retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)).addNode(topic, "", ""); return true; } else { return false; @@ -115,22 +129,27 @@ boolean setRetainedMsg(String topic, String msg) { public Response onWriteRequest(WriteRequest writeRequest) { try { + String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("firstTopic")); //retained msg firstTopic String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("topic")); //retained msg topic + boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get("isEmpty")); //retained msg is empty String message = writeRequest.getExtDataMap().get("message"); //retained msg - boolean res = setRetainedMsg(topic, message); + boolean res = setRetainedMsg(firstTopic,topic, isEmpty,message); if (!res) { + logger.warn("Put the topic {} retained message failed! Exceeded maximum number of reserved topics limit.",topic); return Response.newBuilder() .setSuccess(false) .setErrMsg("Exceeded maximum number of reserved topics limit.") .build(); } + logger.info("Put the topic {} retained message success!", topic); return Response.newBuilder() .setSuccess(true) .setData(ByteString.copyFrom(JSON.toJSONBytes(topic))) .build(); } catch (Exception e) { + logger.error("Put the retained message error! {}",e.getMessage()); return Response.newBuilder() .setSuccess(false) .setErrMsg(e.getMessage()) @@ -142,19 +161,16 @@ public Response onWriteRequest(WriteRequest writeRequest) { @Override public SnapshotOperation loadSnapshotOperate() { - snapshotOperation = new CounterSnapshotOperation(lock); return snapshotOperation; } @Override public void onSnapshotSave(SnapshotWriter writer, BiConsumer callFinally) { - snapshotOperation.onSnapshotSave(writer, callFinally, value.toString()); + } @Override public boolean onSnapshotLoad(SnapshotReader reader) { - String load = snapshotOperation.onSnapshotLoad(reader); - value.set(Long.parseLong(load)); return true; } diff --git a/meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java b/meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java index 8b0c1d10b..f39db4300 100644 --- a/meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java +++ b/meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java @@ -1,6 +1,8 @@ package org.apache.rocketmq.mqtt.meta.raft; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.alipay.sofa.jraft.RouteTable; import com.alipay.sofa.jraft.conf.Configuration; @@ -26,7 +28,9 @@ import org.mockito.Mockito; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -38,7 +42,9 @@ public class RetainedMsgClientTest { @Mock private Message testMsg=new Message(); String firstTopic="test-f1"; - final String groupId = Constants.RETAINEDMSG + "%" + 0; + + String originTopic="test-f1/f2/"; + final String groupId = Constants.RETAINEDMSG + "-" + 0; final String confStr = "127.0.0.1:25001"; CliClientServiceImpl cliClientService = new CliClientServiceImpl(); Configuration conf = new Configuration(); @@ -77,7 +83,7 @@ public void init() throws InterruptedException, TimeoutException { testMsg.setPayload("hello world".getBytes()); testMsg.setMsgId("12345678"); testMsg.setFirstTopic(firstTopic); - testMsg.setOriginTopic(firstTopic+"/t1/"); + testMsg.setOriginTopic(originTopic); testMsg.setEmpty(false); testMsg.setRetained(true); @@ -98,14 +104,15 @@ public static void initRpcServer() { public void TestSetRetainedMsg(){ //test set retain msg - CompletableFuture future = new CompletableFuture(); - HashMap option = new HashMap<>(); + option.put("firstTopic",testMsg.getFirstTopic()); option.put("message", JSON.toJSONString(testMsg, SerializerFeature.WriteClassName)); option.put("topic", testMsg.getOriginTopic()); + option.put("isEmpty", String.valueOf(testMsg.isEmpty())); + CompletableFuturefuture=new CompletableFuture<>(); - final WriteRequest request = WriteRequest.newBuilder().setGroup("retainedmsg%0").putAllExtData(option).build(); + final WriteRequest request = WriteRequest.newBuilder().setGroup("retainedMsg-0").putAllExtData(option).build(); try { cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() { @@ -178,13 +185,14 @@ public Executor executor() { @Test public void TestGetRetainedTopicTrie(){ //test get RetainedTopicTrie - CompletableFuture> future = new CompletableFuture<>(); + CompletableFuture> future = new CompletableFuture<>(); HashMap option = new HashMap<>(); - option.put("flag", "trie"); - option.put("topic", TopicUtils.normalizeTopic(firstTopic)); - final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedmsg%0").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build(); + option.put("firstTopic", TopicUtils.normalizeTopic(firstTopic)); + option.put("topic", TopicUtils.normalizeTopic(originTopic)); + + final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedMsg-0").setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build(); try { cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() { @@ -193,11 +201,12 @@ public void complete(Object result, Throwable err) { if (err == null) { Response rsp = (Response) result; if (!rsp.getSuccess()) { - System.out.println("error"); + future.complete(null); return; } - Trie tmpTrie = JSON.parseObject(rsp.getData().toStringUtf8(), Trie.class); - future.complete(tmpTrie); + JSONArray tmpResult = JSON.parseArray(rsp.getData().toStringUtf8()); + List list = JSONObject.parseArray(tmpResult.toJSONString(), String.class); + future.complete((ArrayList) list); } else { future.complete(null); diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java index aa13215f8..f9b2575c8 100644 --- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java +++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java @@ -19,7 +19,7 @@ import org.apache.rocketmq.mqtt.common.model.Message; import org.apache.rocketmq.mqtt.common.model.Subscription; -import java.util.Set; +import java.util.ArrayList; import java.util.concurrent.CompletableFuture; public interface RetainedPersistManager { @@ -29,5 +29,5 @@ public interface RetainedPersistManager { CompletableFuture getRetainedMessage(String preciseTopic); - Set getTopicsFromTrie(Subscription topicFilter); + CompletableFuture> getMsgsFromTrie(Subscription topicFilter); } diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java index cd40cc7b0..079d57bc1 100644 --- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java +++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java @@ -43,7 +43,7 @@ public class MessageUtil { - public static final String EMPTYSTRING = "%@!@%"; + public static final String EMPTYSTRING = "★\r\n\t☀"; public static MqttPublishMessage toMqttMessage(String topicName, byte[] body, int qos, int mqttId, boolean retained) { ByteBuf payload = ALLOCATOR.buffer(); diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java index e33527632..2ebd75968 100644 --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java @@ -19,6 +19,7 @@ +import com.alibaba.fastjson.JSON; import com.alipay.sofa.jraft.error.RemotingException; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -50,6 +51,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -159,6 +161,7 @@ private MqttSubAckMessage getResponse(MqttSubscribeMessage mqttSubscribeMessage) } + @SuppressWarnings("checkstyle:Indentation") private void sendRetainMessage(ChannelHandlerContext ctx, Set subscriptions) throws InterruptedException, RemotingException, org.apache.rocketmq.remoting.exception.RemotingException { String clientId = ChannelInfo.getClientId(ctx.channel()); @@ -185,16 +188,19 @@ private void sendRetainMessage(ChannelHandlerContext ctx, Set subs } for (Subscription subscription : wildcardTopics) { - Set topics = retainedPersistManager.getTopicsFromTrie(subscription); - for (String topic : topics) { - CompletableFuture retainedMessage = retainedPersistManager.getRetainedMessage(topic); - retainedMessage.whenComplete((msg, throwable) -> { - if (msg == null) { - return; - } + + CompletableFuture> future = retainedPersistManager.getMsgsFromTrie(subscription); + future.whenComplete((msgsList,throwable) -> { + ArrayList results = new ArrayList<>(); + for (String strMsg:msgsList) { + results.add(JSON.parseObject(strMsg,Message.class)); + } + logger.info("result:" + results); + for (Message msg : results) { _sendMessage(session, clientId, subscription, msg); - }); - } + } + }); + } } diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java index 3e5b3ac84..3449b756e 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java @@ -20,97 +20,39 @@ import com.alipay.sofa.jraft.error.RemotingException; -import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager; import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager; import org.apache.rocketmq.mqtt.common.model.Message; import org.apache.rocketmq.mqtt.common.model.Subscription; -import org.apache.rocketmq.mqtt.common.model.Trie; -import org.apache.rocketmq.mqtt.common.util.TopicUtils; -import org.apache.rocketmq.mqtt.common.util.TrieUtil; + import org.apache.rocketmq.mqtt.ds.retain.RetainedMsgClient; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.annotation.Resource; +import java.util.ArrayList; -import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public class RetainedPersistManagerImpl implements RetainedPersistManager { private static Logger logger = LoggerFactory.getLogger(RetainedPersistManagerImpl.class); - private volatile Map> localRetainedTopicTrieCache = new ConcurrentHashMap<>(); - - private ScheduledThreadPoolExecutor refreshScheduler; //ThreadPool to refresh local Trie @Resource private MetaPersistManager metaPersistManager; public void init() { - refreshScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("refreshKVStore")); - refreshScheduler.scheduleWithFixedDelay(() -> { - try { - refreshKVStore(); - } catch (Throwable t) { - logger.error("", t); - } - }, 3, 3, TimeUnit.SECONDS); - - } - - - - - private void refreshKVStore() { //Refresh all firstTopic - long start = System.currentTimeMillis(); - logger.info(" Start refresh the tries..."); - Set allFirstTopics = metaPersistManager.getAllFirstTopics(); - for (String firstTopic : allFirstTopics) { - //If local Trie is null - if (localRetainedTopicTrieCache.get(TopicUtils.normalizeTopic(firstTopic)) == null) { - localRetainedTopicTrieCache.put(TopicUtils.normalizeTopic(firstTopic), new Trie()); - } - try { - refreshSingleTopicTrieStore(TopicUtils.normalizeTopic(firstTopic)); - logger.info("firstTopic:" + TopicUtils.normalizeTopic(firstTopic)); - logger.info("firstTopic Trie: " + localRetainedTopicTrieCache.get(TopicUtils.normalizeTopic(firstTopic)).toString()); - } catch (RemotingException | InterruptedException e) { - logger.error(String.valueOf(e)); - } - } - logger.info("Refresh the tries cost rt {}", System.currentTimeMillis() - start); } - @SuppressWarnings("checkstyle:WhitespaceAround") - private void refreshSingleTopicTrieStore(String firstTopic) throws RemotingException, InterruptedException { //Refresh single firstTopic - - CompletableFuture> completableFuture = new CompletableFuture<>(); - - RetainedMsgClient.GetRetainedTopicTrie(TopicUtils.normalizeTopic(firstTopic), completableFuture); - completableFuture.whenComplete((tmpTrie, throwable) -> { - Trie kvTrie = null; - kvTrie = tmpTrie; - logger.info("The firstTopic {} kvTrie: {}", firstTopic, kvTrie.toString()); - Trie localTrie = TrieUtil.rebuildLocalTrie(kvTrie);//local<-kvTrie - localRetainedTopicTrieCache.put(firstTopic, localTrie); - - }); - - } public CompletableFuture storeRetainedMessage(String topic, Message message) { CompletableFuture result = new CompletableFuture<>(); @@ -125,7 +67,8 @@ public CompletableFuture storeRetainedMessage(String topic, Message mes try { RetainedMsgClient.SetRetainedMsg(topic, message, result); } catch (RemotingException | InterruptedException e) { - logger.error(String.valueOf(e)); + logger.error("",e); + result.completeExceptionally(e); } return result; @@ -133,32 +76,30 @@ public CompletableFuture storeRetainedMessage(String topic, Message mes public CompletableFuture getRetainedMessage(String preciseTopic) { //precise preciseTopic CompletableFuture future = new CompletableFuture<>(); - + logger.info("topic:" + preciseTopic); try { RetainedMsgClient.GetRetainedMsg(preciseTopic, future); } catch (RemotingException | InterruptedException e) { - logger.error(String.valueOf(e)); + logger.error("",e); + future.completeExceptionally(e); } return future; } - public Set getTopicsFromTrie(Subscription subscription) { - Set results; + public CompletableFuture> getMsgsFromTrie(Subscription subscription) { String firstTopic = subscription.toFirstTopic(); String originTopicFilter = subscription.getTopicFilter(); logger.info("firstTopic={} originTopicFilter={}", firstTopic, originTopicFilter); - results = localRetainedTopicTrieCache.get(firstTopic).getAllPath(originTopicFilter); - if (results.isEmpty()) { //Refresh the trie about single firstTopic - logger.info("Local trie does not exist. Try to find..."); - try { - refreshSingleTopicTrieStore(firstTopic); - results = localRetainedTopicTrieCache.get(firstTopic).getAllPath(originTopicFilter); - } catch (RemotingException | InterruptedException e) { - logger.error(String.valueOf(e)); - } + + CompletableFuture> future = new CompletableFuture<>(); + try { + RetainedMsgClient.GetRetainedMsgsFromTrie(firstTopic, originTopicFilter,future); + } catch (RemotingException | InterruptedException e) { + logger.error("",e); + future.completeExceptionally(e); } - return results; + return future; } diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java index 33f5e664a..5ed2372cf 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/retain/RetainedMsgClient.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.mqtt.ds.retain; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.alipay.sofa.jraft.RouteTable; import com.alipay.sofa.jraft.conf.Configuration; @@ -30,7 +32,6 @@ import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; import com.alipay.sofa.jraft.util.RpcFactoryHelper; import org.apache.rocketmq.mqtt.common.model.Message; -import org.apache.rocketmq.mqtt.common.model.Trie; import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest; import org.apache.rocketmq.mqtt.common.model.consistency.Response; import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest; @@ -42,7 +43,9 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; @@ -58,7 +61,7 @@ public class RetainedMsgClient { final Configuration conf = new Configuration(); static final CliClientServiceImpl CLICLIENTSERVICE = new CliClientServiceImpl(); - static final String GROUPNAME = "retainedmsg"; + static final String GROUPNAME = "retainedMsg"; static PeerId leader; @Resource @@ -93,13 +96,16 @@ public static void initRpcServer() { registry.registerResponseInstance(ReadRequest.class.getName(), Response.getDefaultInstance()); } - public static void SetRetainedMsg(String topic, Message msg, CompletableFuture future) throws RemotingException, - InterruptedException { + public static void SetRetainedMsg(String topic, Message msg, CompletableFuture future) throws RemotingException, InterruptedException { HashMap option = new HashMap<>(); option.put("message", JSON.toJSONString(msg, SerializerFeature.WriteClassName)); option.put("topic", topic); - logger.info("SetRetainedMsg option:" + option.toString()); + option.put("firstTopic",msg.getFirstTopic()); + option.put("isEmpty",String.valueOf(msg.isEmpty())); + + + logger.info("SetRetainedMsg option:" + option); final WriteRequest request = WriteRequest.newBuilder().setGroup(GROUPNAME + "-0").putAllExtData(option).build(); @@ -126,17 +132,18 @@ public Executor executor() { return null; } }, 5000); + } - public static void GetRetainedTopicTrie(String firstTopic, CompletableFuture> future) throws RemotingException, - InterruptedException { + public static void GetRetainedMsgsFromTrie(String firstTopic, String topic, CompletableFuture> future) throws RemotingException, InterruptedException { HashMap option = new HashMap<>(); - option.put("flag", "trie"); //request for trie - option.put("topic", firstTopic); + option.put("firstTopic", firstTopic); + option.put("topic", topic); + logger.info("This is trie" + option); - final ReadRequest request = ReadRequest.newBuilder().setGroup(GROUPNAME + "-0").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build(); + final ReadRequest request = ReadRequest.newBuilder().setGroup(GROUPNAME + GROUP_SEQ_NUM_SPLIT + "0").setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build(); CLICLIENTSERVICE.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() { @Override @@ -147,9 +154,9 @@ public void complete(Object result, Throwable err) { logger.info("GetRetainedTopicTrie failed. {}", rsp.getErrMsg()); return; } - Trie tmpTrie = JSON.parseObject(rsp.getData().toStringUtf8(), Trie.class); - logger.info("GetRetainedTopicTrie success.Trie :" + tmpTrie); - future.complete(tmpTrie); + JSONArray tmpResult = JSON.parseArray(rsp.getData().toStringUtf8()); + List list = JSONObject.parseArray(tmpResult.toJSONString(), String.class); + future.complete((ArrayList) list); logger.info("-------------------------------GetRetainedTopicTrie success.----------------------------------"); } else { logger.info("-------------------------------GetRetainedTopicTrie fail.-------------------------------------"); @@ -157,7 +164,6 @@ public void complete(Object result, Throwable err) { err.printStackTrace(); } } - @Override public Executor executor() { return null; @@ -165,15 +171,14 @@ public Executor executor() { }, 5000); } - public static void GetRetainedMsg(String topic, CompletableFuture future) throws RemotingException, - InterruptedException { + + public static void GetRetainedMsg(String topic, CompletableFuture future) throws RemotingException, InterruptedException { HashMap option = new HashMap<>(); - option.put("flag", "topic"); //request for retain msg of topic option.put("topic", topic); + final ReadRequest request = ReadRequest.newBuilder().setGroup(GROUPNAME + GROUP_SEQ_NUM_SPLIT + "0").setOperation("topic").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build(); - final ReadRequest request = ReadRequest.newBuilder().setGroup(GROUPNAME + "-0").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build(); CLICLIENTSERVICE.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() { @@ -200,6 +205,4 @@ public Executor executor() { } }, 5000); } - - } diff --git a/pom.xml b/pom.xml index 2e5f29088..bfa3ac477 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,12 @@ 2.28.2 test - + + org.mockito + mockito-inline + 4.3.1 + test + io.grpc