Skip to content

Commit

Permalink
Fix some problems
Browse files Browse the repository at this point in the history
  • Loading branch information
YYYYWD committed Aug 8, 2022
1 parent 7025ae2 commit 5ce4c74
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 161 deletions.
3 changes: 2 additions & 1 deletion conf/meta.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@


selfAddress=
membersAddress=
membersAddress=
limitRetainedMessageCount=
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
public class MetaConf {
private static final String CONF_FILE_NAME = "meta.conf";
private File confFile;

private String clusterName = "defaultCluster";
private String allNodeAddress;
private String dbPath = System.getProperty("user.home") + "/mqtt_meta/db";
Expand All @@ -39,7 +38,7 @@ public class MetaConf {

private String selfAddress;
private String membersAddress;

private int limitRetainedMessageCount;
private int electionTimeoutMs = 1000;

private int snapshotIntervalSecs = 1000;
Expand All @@ -53,6 +52,7 @@ public MetaConf() throws IOException {
in.close();
MixAll.properties2Object(properties, this);
this.confFile = new File(classPathResource.getURL().getFile());
System.out.println(getLimitRetainedMessageCount());
}

public File getConfFile() {
Expand Down Expand Up @@ -138,4 +138,12 @@ public int getRaftGroupNum() {
public void setRaftGroupNum(int raftGroupNum) {
this.raftGroupNum = raftGroupNum;
}

public int getLimitRetainedMessageCount() {
return limitRetainedMessageCount;
}

public void setLimitRetainedMessageCount(int limitRetainedMessageCount) {
this.limitRetainedMessageCount = limitRetainedMessageCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class Constants {
public static final String COUNTER = "counter";

public static final String RETAINEDMSG = "retainedmsg";
public static final String RETAINEDMSG = "retainedMsg";

public static final String READ_INDEX_TYPE = "readIndexType";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,73 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.google.protobuf.ByteString;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Trie;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.meta.config.MetaConf;
import org.apache.rocketmq.mqtt.meta.raft.snapshot.SnapshotOperation;
import org.apache.rocketmq.mqtt.meta.raft.snapshot.impl.CounterSnapshotOperation;

import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;

public class RetainedMsgStateProcess extends StateProcessor {
@Resource
private MetaConf metaConf;
private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcess.class);

private final AtomicLong value = new AtomicLong(0);

private final HashMap<String, String> retainedMsgMap = new HashMap<>(); //key:topic value:retained msg

private final HashMap<String, Trie<String, String>> retainedMsgTopicTrie = new HashMap<>(); //key:firstTopic value:retained topic Trie
private final ConcurrentHashMap<String, String> retainedMsgMap = new ConcurrentHashMap<>(); //key:topic value:retained msg
private final ConcurrentHashMap<String, Trie<String, String>> retainedMsgTopicTrie = new ConcurrentHashMap<>(); //key:firstTopic value:retained topic Trie

protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private static final int LIMIT_RETAINED_MESSAGE_COUNT = 100;
private SnapshotOperation snapshotOperation;


@Override
public Response onReadRequest(ReadRequest request) {
try {
String topic = request.getExtDataMap().get("topic");
String flag = request.getExtDataMap().get("flag");
String firstTopic = request.getExtDataMap().get("firstTopic");
String operation = request.getOperation();

if (flag.equals("topic")) { //return retained msg
logger.info("FirstTopic:{} Topic:{} Operation:{}",firstTopic,topic,operation);

if (operation.equals("topic")) { //return retained msg
String msg = retainedMsgMap.get(topic);
return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(msg)))
.build();
} else {
if (!retainedMsgTopicTrie.containsKey(topic)) {
} else { //return retain msgs of matched Topic
if (!retainedMsgTopicTrie.containsKey(firstTopic)) {
Trie<String, String> newTrie = new Trie<>();
retainedMsgTopicTrie.put(topic, newTrie);
retainedMsgTopicTrie.put(firstTopic, newTrie);
}
Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(firstTopic);

Set<String> matchTopics = tmpTrie.getAllPath(topic);

ArrayList<String> msgResults = new ArrayList<>();

for (String tmpTopic:matchTopics) {
String msg = retainedMsgMap.get(tmpTopic);
if (msg != null) {
msgResults.add(msg);
}
}
Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(topic); //return firstTopic trie

return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(tmpTrie)))
.setData(ByteString.copyFrom(JSON.toJSONBytes(msgResults))) //return retained msgs of matched Topic
.build();
}
} catch (Exception e) {
Expand All @@ -82,26 +98,24 @@ public Response onReadRequest(ReadRequest request) {
}
}

boolean setRetainedMsg(String topic, String msg) {
boolean setRetainedMsg(String firstTopic,String topic, boolean isEmpty,String msg) {


// if message is empty
Message message = JSON.parseObject(msg, Message.class);

if (!retainedMsgTopicTrie.containsKey(message.getFirstTopic())) {
retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(message.getFirstTopic()), new Trie<String, String>());
// if the trie of firstTopic doesn't exist
if (!retainedMsgTopicTrie.containsKey(firstTopic)) {
retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(firstTopic), new Trie<String, String>());
}

if (message.isEmpty()) {
if (isEmpty) {
//delete from trie
retainedMsgMap.put(TopicUtils.normalizeTopic(topic), msg);
retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(message.getFirstTopic())).deleteTrieNode(message.getOriginTopic(), "");
logger.info("Delete the topic {} retained message", topic);
retainedMsgMap.remove(TopicUtils.normalizeTopic(topic));
retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)).deleteTrieNode(topic, "");
} else {
//Add to trie
Trie<String, String> trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(message.getFirstTopic()));
if (trie.getNodePath().size() < LIMIT_RETAINED_MESSAGE_COUNT) {
Trie<String, String> trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic));
if (trie.getNodePath().size() < metaConf.getLimitRetainedMessageCount()) {
retainedMsgMap.put(TopicUtils.normalizeTopic(topic), msg);
retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(message.getFirstTopic())).addNode(message.getOriginTopic(), "", "");
retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic)).addNode(topic, "", "");
return true;
} else {
return false;
Expand All @@ -115,22 +129,27 @@ boolean setRetainedMsg(String topic, String msg) {
public Response onWriteRequest(WriteRequest writeRequest) {

try {
String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("firstTopic")); //retained msg firstTopic
String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("topic")); //retained msg topic
boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get("isEmpty")); //retained msg is empty
String message = writeRequest.getExtDataMap().get("message"); //retained msg

boolean res = setRetainedMsg(topic, message);
boolean res = setRetainedMsg(firstTopic,topic, isEmpty,message);
if (!res) {
logger.warn("Put the topic {} retained message failed! Exceeded maximum number of reserved topics limit.",topic);
return Response.newBuilder()
.setSuccess(false)
.setErrMsg("Exceeded maximum number of reserved topics limit.")
.build();
}
logger.info("Put the topic {} retained message success!", topic);

return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(topic)))
.build();
} catch (Exception e) {
logger.error("Put the retained message error! {}",e.getMessage());
return Response.newBuilder()
.setSuccess(false)
.setErrMsg(e.getMessage())
Expand All @@ -142,19 +161,16 @@ public Response onWriteRequest(WriteRequest writeRequest) {

@Override
public SnapshotOperation loadSnapshotOperate() {
snapshotOperation = new CounterSnapshotOperation(lock);
return snapshotOperation;
}

@Override
public void onSnapshotSave(SnapshotWriter writer, BiConsumer<Boolean, Throwable> callFinally) {
snapshotOperation.onSnapshotSave(writer, callFinally, value.toString());

}

@Override
public boolean onSnapshotLoad(SnapshotReader reader) {
String load = snapshotOperation.onSnapshotLoad(reader);
value.set(Long.parseLong(load));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.apache.rocketmq.mqtt.meta.raft;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.conf.Configuration;
Expand All @@ -26,7 +28,9 @@
import org.mockito.Mockito;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand All @@ -38,7 +42,9 @@ public class RetainedMsgClientTest {
@Mock
private Message testMsg=new Message();
String firstTopic="test-f1";
final String groupId = Constants.RETAINEDMSG + "%" + 0;

String originTopic="test-f1/f2/";
final String groupId = Constants.RETAINEDMSG + "-" + 0;
final String confStr = "127.0.0.1:25001";
CliClientServiceImpl cliClientService = new CliClientServiceImpl();
Configuration conf = new Configuration();
Expand Down Expand Up @@ -77,7 +83,7 @@ public void init() throws InterruptedException, TimeoutException {
testMsg.setPayload("hello world".getBytes());
testMsg.setMsgId("12345678");
testMsg.setFirstTopic(firstTopic);
testMsg.setOriginTopic(firstTopic+"/t1/");
testMsg.setOriginTopic(originTopic);
testMsg.setEmpty(false);
testMsg.setRetained(true);

Expand All @@ -98,14 +104,15 @@ public static void initRpcServer() {
public void TestSetRetainedMsg(){
//test set retain msg

CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();

HashMap<String, String> option = new HashMap<>();
option.put("firstTopic",testMsg.getFirstTopic());
option.put("message", JSON.toJSONString(testMsg, SerializerFeature.WriteClassName));
option.put("topic", testMsg.getOriginTopic());
option.put("isEmpty", String.valueOf(testMsg.isEmpty()));

CompletableFuture<Boolean>future=new CompletableFuture<>();

final WriteRequest request = WriteRequest.newBuilder().setGroup("retainedmsg%0").putAllExtData(option).build();
final WriteRequest request = WriteRequest.newBuilder().setGroup("retainedMsg-0").putAllExtData(option).build();

try {
cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
Expand Down Expand Up @@ -178,13 +185,14 @@ public Executor executor() {
@Test
public void TestGetRetainedTopicTrie(){
//test get RetainedTopicTrie
CompletableFuture<Trie<String, String>> future = new CompletableFuture<>();
CompletableFuture<ArrayList<String>> future = new CompletableFuture<>();

HashMap<String, String> option = new HashMap<>();
option.put("flag", "trie");
option.put("topic", TopicUtils.normalizeTopic(firstTopic));

final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedmsg%0").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();
option.put("firstTopic", TopicUtils.normalizeTopic(firstTopic));
option.put("topic", TopicUtils.normalizeTopic(originTopic));

final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedMsg-0").setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();

try {
cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
Expand All @@ -193,11 +201,12 @@ public void complete(Object result, Throwable err) {
if (err == null) {
Response rsp = (Response) result;
if (!rsp.getSuccess()) {
System.out.println("error");
future.complete(null);
return;
}
Trie<String, String> tmpTrie = JSON.parseObject(rsp.getData().toStringUtf8(), Trie.class);
future.complete(tmpTrie);
JSONArray tmpResult = JSON.parseArray(rsp.getData().toStringUtf8());
List<String> list = JSONObject.parseArray(tmpResult.toJSONString(), String.class);
future.complete((ArrayList<String>) list);

} else {
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;

public interface RetainedPersistManager {
Expand All @@ -29,5 +29,5 @@ public interface RetainedPersistManager {

CompletableFuture<Message> getRetainedMessage(String preciseTopic);

Set<String> getTopicsFromTrie(Subscription topicFilter);
CompletableFuture<ArrayList<String>> getMsgsFromTrie(Subscription topicFilter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class MessageUtil {



public static final String EMPTYSTRING = "%@!@%";
public static final String EMPTYSTRING = "\r\n\t";

public static MqttPublishMessage toMqttMessage(String topicName, byte[] body, int qos, int mqttId, boolean retained) {
ByteBuf payload = ALLOCATOR.buffer();
Expand Down
Loading

0 comments on commit 5ce4c74

Please sign in to comment.