Skip to content

Commit

Permalink
bugfix Class WillMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
DongyuanPan committed Jan 5, 2023
1 parent 74a8f5e commit 00ab034
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ public class WillMessage {

private byte[] body;

private boolean isRetain;
private boolean retain;

private int qos;

public WillMessage(String willTopic, byte[] body, boolean isRetain, int qos) {
public WillMessage(String willTopic, byte[] body, boolean retain, int qos) {
this.willTopic = willTopic;
this.body = body;
this.isRetain = isRetain;
this.retain = retain;
this.qos = qos;
}

Expand All @@ -53,11 +53,11 @@ public void setBody(byte[] body) {
}

public boolean isRetain() {
return isRetain;
return retain;
}

public void setRetain(boolean retain) {
isRetain = retain;
this.retain = retain;
}

public int getQos() {
Expand All @@ -73,7 +73,7 @@ public String toString() {
return "WillMessage{" +
"willTopic='" + willTopic + '\'' +
", body=" + Arrays.toString(body) +
", isRetain=" + isRetain +
", retain=" + retain +
", qos=" + qos +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private void masterLoop() {
String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
if (rs == null || throwable != null) {
if (rs == null || tb != null) {
logger.error("{} master fail to scan cs", ip, tb);
return;
}
Expand Down Expand Up @@ -325,7 +325,7 @@ public void run() {
logger.error("fail to delete will message key:{}", willKey);
return;
}
logger.debug("delete will message key {} successfully", willKey);
logger.info("delete will message key {} successfully", willKey);
});
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.junit.Test;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -92,4 +94,25 @@ private boolean masterHasDown(String masterValue) {
return System.currentTimeMillis() - Long.parseLong(ipTime[1]) > 10 * checkAliveIntervalMillis;
}

@Test
public void scan() throws ExecutionException, InterruptedException, TimeoutException {
String ip = "172.17.0.1";
String startClientKey = ip + Constants.CTRL_0;
String endClientKey = ip + Constants.CTRL_2;
willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
if (willMap == null || throwable != null) {
return;
}

if (willMap.size() == 0) {
return;
}
});
Thread.sleep(10000);


}



}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void main(String[] args) throws MqttException, NoSuchAlgorithmExce
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(recvClientId + " connect success to " + serverURI);
try {
final String topicFilter[] = {firstTopic + "/r1", firstTopic + "/willTopic"};
final String topicFilter[] = {firstTopic + "/r1", "dongyuan-f2/willTopic"};
final int[] qos = {1, 1};
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
Expand All @@ -62,9 +62,8 @@ public void connectionLost(Throwable throwable) {
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
try {
String payload = new String(mqttMessage.getPayload());
String[] ss = payload.split("_");
System.out.println(now() + "receive:" + topic + "," + payload
+ " ---- rt:" + (System.currentTimeMillis() - Long.parseLong(ss[1])));
+ " ---- rt:");
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void main(String[] args) throws InterruptedException, MqttExceptio
String sendClientId = "send01";
String recvClientId = "recv01";
MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);
mqttConnectOptions.setWill(firstTopic + "/willTopic", "will message: hello".getBytes(), 1, true);
mqttConnectOptions.setWill("dongyuan-f2/willTopic", "will message: hello".getBytes(), 1, true);

MqttClient mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);
mqttClient.setTimeToWait(5000L);
Expand Down

0 comments on commit 00ab034

Please sign in to comment.