Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
pingww committed Jan 28, 2023
1 parent d348e95 commit b5b89a4
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ private void csLoop() {
willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
if (result == null || throwable != null) {
logger.error("{} fail to put csKey", csKey, throwable);
return;
}
});

Expand Down Expand Up @@ -244,11 +243,9 @@ private void masterLoop() {
willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
if (!rs || tb != null) {
logger.error("{} fail to update master", ip, tb);
return;
}
});


// master to check all cs state
String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;

import static org.apache.rocketmq.mqtt.meta.raft.rpc.Constants.GROUP_SEQ_NUM_SPLIT;
import static org.apache.rocketmq.mqtt.meta.raft.rpc.Constants.GROUP_RETAINED_MSG;


@Service
public class RetainedMsgClient {

private static Logger logger = LoggerFactory.getLogger(RetainedMsgClient.class);
private static final String GROUP_SEQ_NUM_SPLIT = "-";
final String groupId = Constants.RETAINEDMSG + GROUP_SEQ_NUM_SPLIT + 0;
private static final String groupId = GROUP_RETAINED_MSG + GROUP_SEQ_NUM_SPLIT + 0;
final Configuration conf = new Configuration();
static final CliClientServiceImpl CLICLIENTSERVICE = new CliClientServiceImpl();

static final String GROUPNAME = "retainedMsg";
static PeerId leader;

@Resource
Expand Down Expand Up @@ -103,7 +103,7 @@ public static void setRetainedMsg(String topic, Message msg, CompletableFuture<B

logger.debug("SetRetainedMsg option:" + option);

final WriteRequest request = WriteRequest.newBuilder().setGroup(GROUPNAME + GROUP_SEQ_NUM_SPLIT + "0").setData(ByteString.copyFrom(msg.getEncodeBytes())).putAllExtData(option).build();
final WriteRequest request = WriteRequest.newBuilder().setGroup(groupId).setData(ByteString.copyFrom(msg.getEncodeBytes())).putAllExtData(option).build();

CLICLIENTSERVICE.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
@Override
Expand Down Expand Up @@ -140,7 +140,7 @@ public static void GetRetainedMsgsFromTrie(String firstTopic, String topic, Comp

logger.debug("GetRetainedMsgsFromTrie option:" + option);

final ReadRequest request = ReadRequest.newBuilder().setGroup(GROUPNAME + GROUP_SEQ_NUM_SPLIT + "0").setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();
final ReadRequest request = ReadRequest.newBuilder().setGroup(groupId).setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();

CLICLIENTSERVICE.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
@Override
Expand Down Expand Up @@ -184,7 +184,7 @@ public static void GetRetainedMsg(String topic, CompletableFuture<Message> futur
HashMap<String, String> option = new HashMap<>();
option.put("topic", topic);

final ReadRequest request = ReadRequest.newBuilder().setGroup(GROUPNAME + GROUP_SEQ_NUM_SPLIT + "0").setOperation("topic").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();
final ReadRequest request = ReadRequest.newBuilder().setGroup(groupId).setOperation("topic").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();

CLICLIENTSERVICE.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import static org.apache.rocketmq.mqtt.meta.raft.rpc.Constants.GROUP_SEQ_NUM_SPLIT;
import static org.apache.rocketmq.mqtt.meta.raft.rpc.Constants.GROUP_WILL_MSG;


@Service
public class WillMsgClient {

private static Logger logger = LoggerFactory.getLogger(WillMsgClient.class);
private final static String GROUP_SEQ_NUM_SPLIT = "-";
private final static String RAFT_GROUP_ID = Constants.WILL_MSG + GROUP_SEQ_NUM_SPLIT + 0;
private final static String RAFT_GROUP_ID = GROUP_WILL_MSG + GROUP_SEQ_NUM_SPLIT + 0;
private MetaRpcClient metaRpcClient;

@Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import static org.apache.rocketmq.mqtt.meta.raft.rpc.Constants.GROUP_SEQ_NUM_SPLIT;

@Service
public class MqttRaftServer {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttRaftServer.class);

private static final String GROUP_SEQ_NUM_SPLIT = "-";
@Resource
private MetaConf metaConf;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public boolean onSnapshotLoad(SnapshotReader reader) {

@Override
public String groupCategory() {
return Constants.RETAINEDMSG;
return Constants.GROUP_RETAINED_MSG;
}

public int getMaxRetainedMessageNum() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;

import static org.apache.rocketmq.mqtt.meta.raft.rpc.Constants.GROUP_WILL_MSG;

public class WillMsgStateProcessor extends StateProcessor {
private static Logger logger = LoggerFactory.getLogger(WillMsgStateProcessor.class);

Expand Down Expand Up @@ -164,7 +166,7 @@ public boolean onSnapshotLoad(SnapshotReader reader) {

@Override
public String groupCategory() {
return Constants.WILL_MSG;
return GROUP_WILL_MSG;
}

public Response put(byte[] key, byte[] value) throws RocksDBException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
public class Constants {
public static final String COUNTER = "counter";

public static final String RETAINEDMSG = "retainedMsg";
public static final String GROUP_RETAINED_MSG = "retainedMsg";
public static final String GROUP_WILL_MSG = "willMsg";
public static final String GROUP_SEQ_NUM_SPLIT = "-";

public static final String WILL_MSG = "willMsg";
public static final String NOT_FOUND = "NOT_FOUND";

public static final String READ_INDEX_TYPE = "readIndexType";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
import java.util.Set;

public class IpUtil {
private static String candidatesHost;

public static String getLocalAddressCompatible() {
try {
String candidatesHost = getLocalAddress();
if (candidatesHost != null) {
return candidatesHost;
}

return getLocalAddress();
} catch (Exception e) {
throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException", e);
}
return null;
}

private static String getLocalAddress() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class RetainedMsgClientTest {
String originTopic = "test-f1/f2/";

String topicFilter = "test-f1/+/";
final String groupId = Constants.RETAINEDMSG + "-" + 0;
final String groupId = Constants.GROUP_RETAINED_MSG + "-" + 0;
final String confStr = "127.0.0.1:25001";
CliClientServiceImpl cliClientService = new CliClientServiceImpl();
Configuration conf = new Configuration();
Expand Down

0 comments on commit b5b89a4

Please sign in to comment.