Skip to content

Commit

Permalink
Format the code and add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
YYYYWD committed Aug 17, 2022
1 parent 0a6885d commit 1844bf1
Show file tree
Hide file tree
Showing 20 changed files with 139 additions and 212 deletions.
13 changes: 7 additions & 6 deletions conf/service.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
# limitations under the License.


username=
secretKey=
username=yutao
secretKey=yutao

NAMESRV_ADDR=
eventNotifyRetryTopic=
clientRetryTopic=
NAMESRV_ADDR=127.0.0.1:9876
eventNotifyRetryTopic=yutao-notify
clientRetryTopic=yutao-retry

metaAddr=127.0.0.1:25001

metaAddr=
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ public Response onReadRequest(ReadRequest request) {
Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(firstTopic);
Set<String> matchTopics = tmpTrie.getAllPath(topic);

ArrayList<byte[]> msgResults = new ArrayList<>();
ArrayList<ByteString> msgResults = new ArrayList<>();

for (String tmpTopic : matchTopics) {
byte[] msgBytes = retainedMsgMap.get(tmpTopic);
if (msgBytes != null) {
msgResults.add(msgBytes);
msgResults.add(ByteString.copyFrom(msgBytes));
}
}
return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(msgResults))) //return retained msgs of matched Topic
.addAllDatalist(msgResults)//return retained msgs of matched Topic
.build();
}
} catch (Exception e) {
Expand All @@ -108,7 +108,6 @@ boolean setRetainedMsg(String firstTopic, String topic, boolean isEmpty, byte[]
retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(firstTopic), new Trie<String, String>());
}


if (isEmpty) {
//delete from trie
logger.info("Delete the topic {} retained message", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
Expand All @@ -57,6 +57,8 @@ public class RetainedMsgClientTest {
String firstTopic = "test-f1";

String originTopic = "test-f1/f2/";

String topicFilter = "test-f1/+/";
final String groupId = Constants.RETAINEDMSG + "-" + 0;
final String confStr = "127.0.0.1:25001";
CliClientServiceImpl cliClientService = new CliClientServiceImpl();
Expand All @@ -73,6 +75,20 @@ public PeerId selectLeader(String groupId) {
}
}

class RetainedMsgStateProcessWarp {
public Response setRetainedMsgRsp() {
return null;
}

public Response getRetainedMsgRsp() {
return null;
}

public Response getRetainedMsgFromTrieRsp() {
return null;
}
}

@Before
public void init() throws InterruptedException, TimeoutException {
initRpcServer();
Expand Down Expand Up @@ -126,17 +142,20 @@ public void TestSetRetainedMsg() {

CompletableFuture<Boolean> future = new CompletableFuture<>();

final WriteRequest request = WriteRequest.newBuilder().setGroup("retainedMsg-0").setData(ByteString.copyFrom(JSON.toJSONBytes(testMsg, SerializerFeature.WriteClassName))).putAllExtData(option).build();
final WriteRequest request = WriteRequest.newBuilder().setGroup("retainedMsg-0").setOperation("topic").setData(ByteString.copyFrom(JSON.toJSONBytes(testMsg, SerializerFeature.WriteClassName))).putAllExtData(option).build();

RetainedMsgStateProcessWarp stateProcess = Mockito.mock(RetainedMsgStateProcessWarp.class);
Mockito.when(stateProcess.setRetainedMsgRsp()).thenReturn(Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(testMsg.getOriginTopic())))
.build());

try {
cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err == null) {
future.complete(true);
} else {
future.complete(false);
}
Assert.assertEquals(stateProcess.setRetainedMsgRsp().getData().toStringUtf8(), request.getExtDataMap().get("topic"));
future.complete(stateProcess.setRetainedMsgRsp().getSuccess());
}

@Override
Expand All @@ -149,7 +168,7 @@ public Executor executor() {
}

future.whenComplete(((result, throwable) -> {
Assert.assertEquals(result, false);
Assert.assertEquals(result, true);
}));

}
Expand All @@ -165,21 +184,18 @@ public void TestGetRetainedMsg() {

CompletableFuture<Message> future = new CompletableFuture<>();

RetainedMsgStateProcessWarp stateProcess = Mockito.mock(RetainedMsgStateProcessWarp.class);
Mockito.when(stateProcess.getRetainedMsgRsp()).thenReturn(Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(testMsg)))
.build());

try {
cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err == null) {
Response rsp = (Response) result;
if (rsp.getData().toStringUtf8().equals("null")) {
return;
}
String strMsg = (String) JSON.parse(rsp.getData().toStringUtf8());
Message message = JSON.parseObject(strMsg, Message.class);
future.complete(message);
} else {
future.complete(null);
}
Message msg = JSON.parseObject(stateProcess.getRetainedMsgRsp().getData().toStringUtf8(), Message.class);
future.complete(msg);
}

@Override
Expand All @@ -192,45 +208,45 @@ public Executor executor() {
}

future.whenComplete(((message, throwable) -> {
Mockito.verify(message, null);
Assert.assertEquals(message, testMsg);
}));


}

@Test
public void TestGetRetainedFromTopicTrie() {
public void TestGetRetainedMsgsFromTrie() {
//test get RetainedTopicTrie
CompletableFuture<ArrayList<String>> future = new CompletableFuture<>();
CompletableFuture<ArrayList<Message>> future = new CompletableFuture<>();

HashMap<String, String> option = new HashMap<>();

option.put("firstTopic", TopicUtils.normalizeTopic(firstTopic));
option.put("topic", TopicUtils.normalizeTopic(originTopic));
option.put("topic", TopicUtils.normalizeTopic(topicFilter));


ArrayList<ByteString> msgResults = new ArrayList<>();
msgResults.add(ByteString.copyFrom(JSON.toJSONBytes(testMsg)));
msgResults.add(ByteString.copyFrom(JSON.toJSONBytes(testMsg)));

RetainedMsgStateProcessWarp stateProcess = Mockito.mock(RetainedMsgStateProcessWarp.class);
Mockito.when(stateProcess.getRetainedMsgFromTrieRsp()).thenReturn(Response.newBuilder()
.setSuccess(true)
.addAllDatalist(msgResults)
.build());

final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedMsg-0").setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();

try {
cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err == null) {
Response rsp = (Response) result;
if (!rsp.getSuccess()) {
future.complete(null);
return;
}
byte[] bytes = rsp.getData().toByteArray();
ArrayList<String> resultList = JSON.parseObject(new String(bytes), ArrayList.class);
for (int i = 0; i < resultList.size(); i++) {
resultList.set(i, new String(Base64.getDecoder().decode(resultList.get(i))));
}

future.complete(resultList);

} else {
future.complete(null);

List<ByteString> datalistList = stateProcess.getRetainedMsgFromTrieRsp().getDatalistList();
ArrayList<Message> resultList = new ArrayList<>();
for (ByteString tmp : datalistList) {
resultList.add(JSON.parseObject(tmp.toStringUtf8(), Message.class));
}
future.complete(resultList);
}

@Override
Expand All @@ -242,8 +258,12 @@ public Executor executor() {
throw new RuntimeException(e);
}

future.whenComplete(((trie, throwable) -> {
Mockito.verify(trie, null);
ArrayList<Message> targetList = new ArrayList<>();
targetList.add(testMsg);
targetList.add(testMsg);

future.whenComplete(((msgList, throwable) -> {
Assert.assertEquals(msgList, targetList);
}));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ public interface RetainedPersistManager {

CompletableFuture<Message> getRetainedMessage(String preciseTopic);

CompletableFuture<ArrayList<String>> getMsgsFromTrie(Subscription topicFilter);
CompletableFuture<ArrayList<Message>> getMsgsFromTrie(Subscription topicFilter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;


public abstract class AbstractUpstreamHook implements UpstreamHook {
public static Logger logger = LoggerFactory.getLogger(AbstractUpstreamHook.class);
Expand Down Expand Up @@ -65,6 +63,6 @@ public CompletableFuture<HookResult> doHook(MqttMessageUpContext context, MqttMe

public abstract void register();

public abstract CompletableFuture<HookResult> processMqttMessage(MqttMessageUpContext context, MqttMessage message) throws RemotingException, com.alipay.sofa.jraft.error.RemotingException, ExecutionException, InterruptedException;
public abstract CompletableFuture<HookResult> processMqttMessage(MqttMessageUpContext context, MqttMessage message) ;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;


public class Message implements Serializable {
public class Message {
private String msgId;
private String firstTopic;
private String originTopic;
Expand Down Expand Up @@ -54,6 +53,7 @@ public class Message implements Serializable {
public static String extPropertyMqttRealTopic = "mqttRealTopic";
public static String extPropertyQoS = "qosLevel";
public static String extPropertyCleanSessionFlag = "cleanSessionFlag";

public static String extPropertyNamespaceId = "namespace";
public static String extPropertyClientId = "clientId";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ public boolean isP2p() {

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }

Subscription that = (Subscription)o;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

Subscription that = (Subscription) o;

return topicFilter != null ? topicFilter.equals(that.topicFilter) : that.topicFilter == null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
public class MessageUtil {
public static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);



public static final String EMPTYSTRING = "★\r\n\t☀";

public static MqttPublishMessage toMqttMessage(String topicName, byte[] body, int qos, int mqttId, boolean retained) {
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions mqtt-common/src/main/proto/request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ message Response {
bytes data = 1;
string errMsg = 2;
bool success = 3;
repeated bytes datalist = 4;
}

Loading

0 comments on commit 1844bf1

Please sign in to comment.