Skip to content

Commit

Permalink
[PIP-105] Part-2 Support pluggable entry filter in Dispatcher (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 authored Nov 30, 2021
1 parent aa97d39 commit dac47cb
Show file tree
Hide file tree
Showing 18 changed files with 785 additions and 1 deletion.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered
# You can use this class to decide which entries can be sent to consumers.
# Multiple classes need to be separated by commas.
entryFilterNames=

# The directory for all the entry filter implementations
entryFiltersDirectory=

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean preciseDispatcherFlowControl = false;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = " Class name of pluggable entry filter that decides whether the entry needs to be filtered."
+ "You can use this class to decide which entries can be sent to consumers."
+ "Multiple names need to be separated by commas."
)
private List<String> entryFilterNames = new ArrayList<>();

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = " The directory for all the entry filter implementations."
)
private String entryFiltersDirectory = "";

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@

package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand All @@ -48,11 +56,25 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {

protected final ServiceConfiguration serviceConfig;
protected final boolean dispatchThrottlingOnBatchMessageEnabled;
/**
* Entry filters in Broker.
* Not set to final, for the convenience of testing mock.
*/
protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
protected final FilterContext filterContext;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic()
.getBrokerService().getEntryFilters())) {
this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
this.filterContext = new FilterContext();
} else {
this.entryFilters = ImmutableList.of();
this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
}
}

/**
Expand Down Expand Up @@ -113,6 +135,7 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
Expand All @@ -127,6 +150,15 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
msgMetadata = msgMetadata == null
? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
: msgMetadata;
if (CollectionUtils.isNotEmpty(entryFilters)) {
fillContext(filterContext, msgMetadata, subscription);
if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
entry.release();
continue;
}
}
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
Expand Down Expand Up @@ -183,12 +215,36 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
subscription.acknowledgeMessage(entriesToFiltered, AckType.Individual,
Collections.emptyMap());
}

sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}

private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
ImmutableList<EntryFilterWithClassLoader> entryFilters) {
EntryFilter.FilterResult result = EntryFilter.FilterResult.ACCEPT;
for (EntryFilter entryFilter : entryFilters) {
if (entryFilter.filterEntry(entry, filterContext) == EntryFilter.FilterResult.REJECT) {
result = EntryFilter.FilterResult.REJECT;
break;
}
}
return result;
}

private void fillContext(FilterContext context, MessageMetadata msgMetadata,
Subscription subscription) {
context.reset();
context.setMsgMetadata(msgMetadata);
context.setSubscription(subscription);
}

/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public Map<String, Producer> getProducers() {
}


@Override
public BrokerService getBrokerService() {
return brokerService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -114,6 +115,8 @@
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -267,6 +270,7 @@ public class BrokerService implements Closeable {
private boolean preciseTopicPublishRateLimitingEnable;
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
private ImmutableMap<String, EntryFilterWithClassLoader> entryFilters;

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

Expand Down Expand Up @@ -299,6 +303,9 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
this.entryFilters = EntryFilterProvider.createEntryFilters(pulsar.getConfiguration());
}

pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
Expand Down Expand Up @@ -712,6 +719,17 @@ public CompletableFuture<Void> closeAsync() {
}
});

//close entry filters
if (entryFilters != null) {
entryFilters.forEach((name, filter) -> {
try {
filter.close();
} catch (Exception e) {
log.warn("Error shutting down entry filter {}", name, e);
}
});
}

CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
log.info("Event loops shutting down gracefully...");
List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,10 @@ default boolean isSystemTopic() {
*/
CompletableFuture<Void> truncate();

/**
* Get BrokerService.
* @return
*/
BrokerService getBrokerService();

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
Expand Down Expand Up @@ -158,7 +159,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.setReplicated(replicated);
this.subscriptionProperties = subscriptionProperties == null
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& !checkTopicIsEventsNames(TopicName.get(topicName))) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.plugin;

import org.apache.bookkeeper.mledger.Entry;

public interface EntryFilter {

/**
* 1. Broker determines whether to filter out this entry based on the return value of this method.
* 2. Do not deserialize the entire entry in this method,
* which has a great impact on the broker's memory and CPU.
* 3. Return ACCEPT or null will be regarded as ACCEPT.
* @param entry
* @param context
* @return
*/
FilterResult filterEntry(Entry entry, FilterContext context);

/**
* close the entry filter.
*/
void close();


enum FilterResult {
/**
* deliver to the consumer.
*/
ACCEPT,
/**
* skip the message.
*/
REJECT,
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.plugin;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class EntryFilterDefinition {

/**
* The name of the entry filter.
*/
private String name;

/**
* The description of the entry filter to be used for user help.
*/
private String description;

/**
* The class name for the entry filter.
*/
private String entryFilterClass;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.plugin;

import java.util.Map;
import java.util.TreeMap;
import lombok.Data;

@Data
public class EntryFilterDefinitions {
private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
}
Loading

0 comments on commit dac47cb

Please sign in to comment.