forked from bingwong2010/RocketMQ
-
Notifications
You must be signed in to change notification settings - Fork 0
filter_server_guide
Xiaorui Wang edited this page Mar 29, 2015
·
3 revisions
filterServerNums=1
package com.alibaba.rocketmq.example.filter;
import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg) {
String property = msg.getUserProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if ((id % 3) == 0 && (id > 10)) {
return true;
}
}
return false;
}
}
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
// 使用Java代码,在服务器做消息过滤
String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
filterCode);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
consumer.start();
System.out.println("Consumer Started.");
}