Skip to content

Latest commit

 

History

History
70 lines (43 loc) · 1.87 KB

filter_server_guide.md

File metadata and controls

70 lines (43 loc) · 1.87 KB

Filter网络架构,以CPU资源换取宝贵的网卡流量资源

screenshot

启动Broker时,增加以下配置,可以自动加载Filter Server进程

filterServerNums=1

Filter样本(Consumer仅负责将代码上传到Filter Server,由Filter Server编译后执行)

package com.alibaba.rocketmq.example.filter;

import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;


public class MessageFilterImpl implements MessageFilter {

    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }

        return false;
    }
}

Consumer例子

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
        
        String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
        consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
            filterCode);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        <br>
        consumer.start();

        System.out.println("Consumer Started.");
    }