Skip to content

Commit

Permalink
[ISSUE apache#44] Support shared subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrirW committed Jun 9, 2023
1 parent 501ade3 commit 1d0b869
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
Expand Down Expand Up @@ -48,6 +50,19 @@ public interface LmqQueueStore {
*/
CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count);

/**
* pop messages
*
* @param consumerGroup
* @param firstTopic
* @param queue
* @param count
* @return
*/
CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count);

void popAck(String lmqTopic, String consumerGroup, Message message);

/**
* pull last messages
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class Constants {

public static final String PLUS_SIGN = "+";
public static final String NUMBER_SIGN = "#";
public static final String DOLLAR_SIGN = "$";
public static final String COLON = ":";

public static final String P2P = "/p2p/";
Expand Down Expand Up @@ -55,5 +56,7 @@ public class Constants {
public static final byte CTRL_2 = '\u0002';

public static final String NOT_FOUND = "NOT_FOUND";
public static final String SHARED_PREFIX = DOLLAR_SIGN + "share";
public static final String EMPTY_SHARE_NAME = "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public class PullResult {
public static final int PULL_SUCCESS = 301;
public static final int PULL_OFFSET_MOVED = 302;
public static final int NO_NEW_MSG = 303;
private int code = PULL_SUCCESS;
private String remark;
private List<Message> messageList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public boolean isP2p() {
return TopicUtils.isP2pTopic(queueName);
}

public boolean isShare() {
return TopicUtils.isSharedSubscription(queueName);
}

public long getQueueId() {
return queueId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ public int hashCode() {
return topicFilter != null ? topicFilter.hashCode() : 0;
}

public boolean isShare() {
return TopicUtils.isSharedSubscription(topicFilter);
}

public String getSharedName() {
if (!isShare()) {
return null;
}
return TopicUtils.getSharedName(topicFilter);
}

public String getTopicFilter() {
return topicFilter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.mqtt.common.util;

import java.util.Arrays;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.mqtt.common.model.Constants;
import org.apache.rocketmq.mqtt.common.model.MqttTopic;
Expand Down Expand Up @@ -116,6 +118,9 @@ public static MqttTopic decode(String topics) {
if (topics.startsWith(Constants.MQTT_TOPIC_DELIMITER)) {
topics = topics.substring(1);
}
if (topics.startsWith(Constants.SHARED_PREFIX)) {
topics = TopicUtils.getSharedTopicFilter(topics);
}
String topic;
String secondTopic = null;
int index = topics.indexOf(Constants.MQTT_TOPIC_DELIMITER, 1);
Expand Down Expand Up @@ -185,4 +190,24 @@ public static String wrapLmq(String firstTopic, String secondTopic) {
public static String wrapP2pLmq(String clientId) {
return normalizeTopic(Constants.P2P + clientId);
}

// shared subscription topic filter format: $share/{ShareName}/{filter}
public static boolean isSharedSubscription(String topicFilter) {
if (StringUtils.isEmpty(topicFilter)) {
return false;
}
if (!topicFilter.startsWith(Constants.SHARED_PREFIX)) {
return false;
}
String[] arr = topicFilter.split(Constants.MQTT_TOPIC_DELIMITER);
return arr.length > 2;
}

public static String getSharedName(String topicFilter) {
return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER)[1];
}

public static String getSharedTopicFilter(String topicFilter) {
return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER, 3)[2];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ public boolean addSendingMessages(Subscription subscription, Queue queue, List<M
if (!subscriptions.containsKey(subscription.getTopicFilter())) {
return false;
}
if (subscription.isShare()) {
return true;
}
if (!sendingMessages.containsKey(subscription)) {
sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.Subscription;
Expand Down Expand Up @@ -56,6 +58,8 @@ public class PushAction {
@Resource
private ConnectConf connectConf;

@Resource
private LmqQueueStore lmqQueueStore;

public void messageArrive(Session session, Subscription subscription, Queue queue) {
if (session == null) {
Expand Down Expand Up @@ -163,6 +167,9 @@ public void write(Session session, Message message, int mqttId, int qos, Subscri
if (subscription.isRetry()) {
message.setRetry(message.getRetry() + 1);
logger.warn("retryPush:{},{},{}", session.getClientId(), message.getMsgId(), message.getRetry());
} else if (subscription.isShare()) {
String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/","%");
lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), message);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void refreshCache(Pair<Queue, Session> pair) {
if (queue == null) {
return;
}
if (queue.isP2p() || queue.isRetry()) {
if (queue.isP2p() || queue.isRetry() || queue.isShare()) {
return;
}
addLoadEvent(queue, pair.getRight());
Expand Down Expand Up @@ -188,6 +188,13 @@ public PullResultStatus pullMessage(Session session, Subscription subscription,
callbackResult(pullResult, callBackResult);
return DONE;
}

if (subscription.isShare()) {
CompletableFuture<PullResult> pullResult = lmqQueueStore.popMessage(subscription.getSharedName(), toFirstTopic(subscription), queue, count);
callbackResult(pullResult, callBackResult);
return DONE;
}

CacheEntry cacheEntry = cache.getIfPresent(queue);
if (cacheEntry == null) {
StatUtil.addPv("NoPullCache", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.model.WillMessage;
import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
Expand Down Expand Up @@ -393,6 +394,7 @@ public void notifyPullMessage(Session session, Subscription subscription, Queue
throw new RuntimeException(
"invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
}
logger.info("session loop impl doing notifyPullMessage queueFresh.freshQueue({}, {}})", session, subscription);
queueFresh.freshQueue(session, subscription);
pullMessage(session, subscription, queue);
return;
Expand Down Expand Up @@ -539,7 +541,6 @@ private void doPull(PullEvent pullEvent) {
pushAction.messageArrive(session, subscription, queue);
}
} else if (PullResult.PULL_OFFSET_MOVED == pullResult.getCode()) {
queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
session.markPersistOffsetFlag(true);
pullMessage(session, subscription, queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public void init() throws MQClientException {

private void refresh() throws MQClientException {
Set<String> tmp = metaPersistManager.getAllFirstTopics();
logger.info("Notify Manager is refreshing, all first topic is " + tmp);

if (tmp == null || tmp.isEmpty()) {
return;
}
Expand Down
Loading

0 comments on commit 1d0b869

Please sign in to comment.