Skip to content

Commit

Permalink
[ROCKETMQ-121]Support message filtering based on SQL92 closes apache#82
Browse files Browse the repository at this point in the history
  • Loading branch information
vsair authored and dongeforever committed Apr 21, 2017
1 parent 42f78c2 commit 58f1574
Show file tree
Hide file tree
Showing 116 changed files with 12,552 additions and 128 deletions.
4 changes: 4 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class BrokerController {
private final MessageStoreConfig messageStoreConfig;
private final ConsumerOffsetManager consumerOffsetManager;
private final ConsumerManager consumerManager;
private final ConsumerFilterManager consumerFilterManager;
private final ProducerManager producerManager;
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
Expand Down Expand Up @@ -149,6 +152,7 @@ public BrokerController(//
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
Expand Down Expand Up @@ -192,6 +196,7 @@ public boolean initialize() throws CloneNotSupportedException {

result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();

if (result) {
try {
Expand All @@ -202,6 +207,7 @@ public boolean initialize() throws CloneNotSupportedException {
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
e.printStackTrace();
Expand Down Expand Up @@ -274,6 +280,17 @@ public void run() {
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -400,9 +417,11 @@ public void registerProcessor() {
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

/**
* ConsumerManageProcessor
Expand Down Expand Up @@ -504,6 +523,10 @@ public ConsumerManager getConsumerManager() {
return consumerManager;
}

public ConsumerFilterManager getConsumerFilterManager() {
return consumerFilterManager;
}

public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}
Expand Down Expand Up @@ -590,6 +613,10 @@ public void shutdown() {
if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}

if (this.consumerFilterManager != null) {
this.consumerFilterManager.persist();
}
}

private void unregisterBrokerAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ public static String getSubscriptionGroupPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}

public static String getConsumerFilterPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.client;

public enum ConsumerGroupEvent {

/**
* Some consumers in the group are changed.
*/
CHANGE,
/**
* The group of consumer is unregistered.
*/
UNREGISTER,
/**
* The group of consumer is registered.
*/
REGISTER
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;
import java.util.List;

public interface ConsumerIdsChangeListener {
void consumerIdsChanged(final String group, final List<Channel> channels);

void handle(ConsumerGroupEvent event, String group, Object... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ public void doChannelCloseEvent(final String remoteAddr, final Channel channel)
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
next.getKey());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}

this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
}
Expand All @@ -111,10 +112,12 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}

this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

return r1 || r2;
}

Expand All @@ -126,10 +129,12 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);

this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;

import java.util.Collection;
import java.util.List;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;

public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
private final BrokerController brokerController;
Expand All @@ -28,11 +32,34 @@ public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
}

@Override
public void consumerIdsChanged(String group, List<Channel> channels) {
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:
if (args == null || args.length < 1) {
return;
}
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
case UNREGISTER:
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER:
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.broker.filter;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.store.CommitLogDispatcher;
import org.apache.rocketmq.store.DispatchRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Iterator;

/**
* Calculate bit map of filter.
*/
public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher {

private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);

protected final BrokerConfig brokerConfig;
protected final ConsumerFilterManager consumerFilterManager;

public CommitLogDispatcherCalcBitMap(BrokerConfig brokerConfig, ConsumerFilterManager consumerFilterManager) {
this.brokerConfig = brokerConfig;
this.consumerFilterManager = consumerFilterManager;
}

@Override
public void dispatch(DispatchRequest request) {
if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
return;
}

try {

Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());

if (filterDatas == null || filterDatas.isEmpty()) {
return;
}

Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
BitsArray filterBitMap = BitsArray.create(
this.consumerFilterManager.getBloomFilter().getM()
);

long startTime = System.currentTimeMillis();
while (iterator.hasNext()) {
ConsumerFilterData filterData = iterator.next();

if (filterData.getCompiledExpression() == null) {
log.error("[BUG] Consumer in filter manager has no compiled expression! {}", filterData);
continue;
}

if (filterData.getBloomFilterData() == null) {
log.error("[BUG] Consumer in filter manager has no bloom data! {}", filterData);
continue;
}

Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());

ret = filterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
}

log.debug("Result of Calc bit map:ret={}, data={}, props={}, offset={}", ret, filterData, request.getPropertiesMap(), request.getCommitLogOffset());

// eval true
if (ret != null && ret instanceof Boolean && (Boolean) ret) {
consumerFilterManager.getBloomFilter().hashTo(
filterData.getBloomFilterData(),
filterBitMap
);
}
}

request.setBitMap(filterBitMap.bytes());

long eclipseTime = System.currentTimeMillis() - startTime;
// 1ms
if (eclipseTime >= 1) {
log.warn("Spend {} ms to calc bit map, consumerNum={}, topic={}", eclipseTime, filterDatas.size(), request.getTopic());
}
} catch (Throwable e) {
log.error("Calc bit map error! topic={}, offset={}, queueId={}, {}", request.getTopic(), request.getCommitLogOffset(), request.getQueueId(), e);
}
}
}
Loading

0 comments on commit 58f1574

Please sign in to comment.