diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index bacd25ccf5e..1416ebf638e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -421,7 +421,7 @@ public void registerProcessor() { this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); - this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); /** * ConsumerManageProcessor diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java index 9a3b8131588..52c2474103e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java @@ -31,7 +31,6 @@ public class SqlConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); - try { consumer.subscribe("TopicTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java new file mode 100644 index 00000000000..cb0210f44ec --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.rmq; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.test.listener.AbstractListener; + +public class RMQSqlConsumer extends RMQNormalConsumer { + private static Logger logger = Logger.getLogger(RMQSqlConsumer.class); + private MessageSelector selector; + public RMQSqlConsumer(String nsAddr, String topic, MessageSelector selector, + String consumerGroup, AbstractListener listener) { + super(nsAddr, topic, "*", consumerGroup, listener); + this.selector = selector; + } + + @Override + public void create() { + super.create(); + try { + consumer.subscribe(topic, selector); + } catch (Exception e) { + logger.error("Subscribe Sql Errored", e); + } + } +} diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java index b5b3fdd1f9c..7dd747f8ab7 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java @@ -17,8 +17,10 @@ package org.apache.rocketmq.test.factory; +import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer; import org.apache.rocketmq.test.listener.AbstractListener; public class ConsumerFactory { @@ -42,4 +44,14 @@ public static RMQBroadCastConsumer getRMQBroadCastConsumer(String nsAddr, String consumer.start(); return consumer; } + + public static RMQSqlConsumer getRMQSqlConsumer(String nsAddr, String consumerGroup, + String topic, MessageSelector selector, + AbstractListener listner) { + RMQSqlConsumer consumer = new RMQSqlConsumer(nsAddr, topic, selector, + consumerGroup, listner); + consumer.create(); + consumer.start(); + return consumer; + } } diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java index 974434a4056..14da397e0ec 100644 --- a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java +++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java @@ -95,6 +95,28 @@ public Collection waitForMessageConsume(Collection allSendMsgs, return sendMsgs; } + public long waitForMessageConsume(int size, + int timeoutMills) { + + long curTime = System.currentTimeMillis(); + while (true) { + if (msgBodys.getDataSize() >= size) { + break; + } + if (System.currentTimeMillis() - curTime >= timeoutMills) { + logger.error(String.format("timeout but [%s] not recv all send messages!", + listnerName)); + break; + } else { + logger.info(String.format("[%s] still [%s] msg not recv!", listnerName, + size - msgBodys.getDataSize())); + TestUtil.waitForMonment(500); + } + } + + return msgBodys.getDataSize(); + } + public void waitForMessageConsume(Map sendMsgIndex, int timeoutMills) { Collection notRecvMsgs = waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills); for (Object object : notRecvMsgs) { diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 9805eba7c42..07af7aa6e3d 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -127,6 +127,7 @@ public static BrokerController createAndStartBroker(String nsAddr) { brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); brokerConfig.setBrokerIP1("127.0.0.1"); brokerConfig.setNamesrvAddr(nsAddr); + brokerConfig.setEnablePropertyFilter(true); storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); storeConfig.setHaListenPort(8000 + random.nextInt(1000)); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java new file mode 100644 index 00000000000..7eef2ab0c48 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.filter; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer; +import org.apache.rocketmq.test.factory.ConsumerFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class SqlFilterIT extends BaseConf { + private static Logger logger = Logger.getLogger(SqlFilterIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testFilterConsumer() throws Exception { + int msgSize = 16; + + String group = initConsumerGroup(); + MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))"); + RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1")); + Thread.sleep(3000); + producer.send("TagA", msgSize); + producer.send("TagB", msgSize); + producer.send("TagC", msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(msgSize * 2, consumeTime); + assertThat(producer.getAllMsgBody()) + .containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())); + + assertThat(consumer.getListner().getAllMsgBody().size()).isEqualTo(msgSize * 2); + } +}