diff --git a/conf/broker.conf b/conf/broker.conf index 9d660c646a9ba..4496be4cd1e3a 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -411,6 +411,14 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 # Precise dispathcer flow control according to history message number of each entry preciseDispatcherFlowControl=false +# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered +# You can use this class to decide which entries can be sent to consumers. +# Multiple classes need to be separated by commas. +entryFilterNames= + +# The directory for all the entry filter implementations +entryFiltersDirectory= + # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic maxConcurrentLookupRequest=50000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index efc9f43f72e00..906ef3f3e9841 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -835,6 +835,22 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean preciseDispatcherFlowControl = false; + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = " Class name of pluggable entry filter that decides whether the entry needs to be filtered." + + "You can use this class to decide which entries can be sent to consumers." + + "Multiple names need to be separated by commas." + ) + private List entryFilterNames = new ArrayList<>(); + + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = " The directory for all the entry filter implementations." + ) + private String entryFiltersDirectory = ""; + @FieldContext( category = CATEGORY_SERVER, doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index b53de2f791ba4..8970ec6359028 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -19,18 +19,26 @@ package org.apache.pulsar.broker.service; +import com.google.common.collect.ImmutableList; import io.netty.buffer.ByteBuf; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; +import org.apache.pulsar.broker.service.plugin.FilterContext; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -48,11 +56,25 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { protected final ServiceConfiguration serviceConfig; protected final boolean dispatchThrottlingOnBatchMessageEnabled; + /** + * Entry filters in Broker. + * Not set to final, for the convenience of testing mock. + */ + protected ImmutableList entryFilters; + protected final FilterContext filterContext; protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) { this.subscription = subscription; this.serviceConfig = serviceConfig; this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled(); + if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic() + .getBrokerService().getEntryFilters())) { + this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList(); + this.filterContext = new FilterContext(); + } else { + this.entryFilters = ImmutableList.of(); + this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED; + } } /** @@ -113,6 +135,7 @@ public int filterEntriesForConsumer(Optional entryWrapper, int e long totalBytes = 0; int totalChunkedMessages = 0; int totalEntries = 0; + List entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { Entry entry = entries.get(i); if (entry == null) { @@ -127,6 +150,15 @@ public int filterEntriesForConsumer(Optional entryWrapper, int e msgMetadata = msgMetadata == null ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1) : msgMetadata; + if (CollectionUtils.isNotEmpty(entryFilters)) { + fillContext(filterContext, msgMetadata, subscription); + if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) { + entriesToFiltered.add(entry.getPosition()); + entries.set(i, null); + entry.release(); + continue; + } + } if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) { if (Markers.isTxnMarker(msgMetadata)) { @@ -183,12 +215,36 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) { interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata); } } + if (CollectionUtils.isNotEmpty(entriesToFiltered)) { + subscription.acknowledgeMessage(entriesToFiltered, AckType.Individual, + Collections.emptyMap()); + } + sendMessageInfo.setTotalMessages(totalMessages); sendMessageInfo.setTotalBytes(totalBytes); sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages); return totalEntries; } + private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry, + ImmutableList entryFilters) { + EntryFilter.FilterResult result = EntryFilter.FilterResult.ACCEPT; + for (EntryFilter entryFilter : entryFilters) { + if (entryFilter.filterEntry(entry, filterContext) == EntryFilter.FilterResult.REJECT) { + result = EntryFilter.FilterResult.REJECT; + break; + } + } + return result; + } + + private void fillContext(FilterContext context, MessageMetadata msgMetadata, + Subscription subscription) { + context.reset(); + context.setMsgMetadata(msgMetadata); + context.setSubscription(subscription); + } + /** * Determine whether the number of consumers on the subscription reaches the threshold. * @return diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 26c591d31c282..ac83775605d51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -318,6 +318,7 @@ public Map getProducers() { } + @Override public BrokerService getBrokerService() { return brokerService; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7b8ae8071d8b1..7fa59bf02ae8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -26,6 +26,7 @@ import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; @@ -114,6 +115,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; +import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; +import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -267,6 +270,7 @@ public class BrokerService implements Closeable { private boolean preciseTopicPublishRateLimitingEnable; private final LongAdder pausedConnections = new LongAdder(); private BrokerInterceptor interceptor; + private ImmutableMap entryFilters; private Set brokerEntryMetadataInterceptors; @@ -299,6 +303,9 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater")); this.authorizationService = new AuthorizationService( pulsar.getConfiguration(), pulsar().getPulsarResources()); + if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) { + this.entryFilters = EntryFilterProvider.createEntryFilters(pulsar.getConfiguration()); + } pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); @@ -712,6 +719,17 @@ public CompletableFuture closeAsync() { } }); + //close entry filters + if (entryFilters != null) { + entryFilters.forEach((name, filter) -> { + try { + filter.close(); + } catch (Exception e) { + log.warn("Error shutting down entry filter {}", name, e); + } + }); + } + CompletableFuture> cancellableDownstreamFutureReference = new CompletableFuture<>(); log.info("Event loops shutting down gracefully..."); List> shutdownEventLoops = new ArrayList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 9db4111969b81..89cc4480bf185 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -294,4 +294,10 @@ default boolean isSystemTopic() { */ CompletableFuture truncate(); + /** + * Get BrokerService. + * @return + */ + BrokerService getBrokerService(); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 2c23bee256193..061d0380dc432 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -158,7 +159,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); this.setReplicated(replicated); - this.subscriptionProperties = subscriptionProperties == null + this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !checkTopicIsEventsNames(TopicName.get(topicName))) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java new file mode 100644 index 0000000000000..40e6644953f52 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import org.apache.bookkeeper.mledger.Entry; + +public interface EntryFilter { + + /** + * 1. Broker determines whether to filter out this entry based on the return value of this method. + * 2. Do not deserialize the entire entry in this method, + * which has a great impact on the broker's memory and CPU. + * 3. Return ACCEPT or null will be regarded as ACCEPT. + * @param entry + * @param context + * @return + */ + FilterResult filterEntry(Entry entry, FilterContext context); + + /** + * close the entry filter. + */ + void close(); + + + enum FilterResult { + /** + * deliver to the consumer. + */ + ACCEPT, + /** + * skip the message. + */ + REJECT, + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java new file mode 100644 index 0000000000000..5df3944e41dd2 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class EntryFilterDefinition { + + /** + * The name of the entry filter. + */ + private String name; + + /** + * The description of the entry filter to be used for user help. + */ + private String description; + + /** + * The class name for the entry filter. + */ + private String entryFilterClass; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java new file mode 100644 index 0000000000000..9aa3113e177ae --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import java.util.Map; +import java.util.TreeMap; +import lombok.Data; + +@Data +public class EntryFilterDefinitions { + private final Map filters = new TreeMap<>(); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java new file mode 100644 index 0000000000000..babaa80d5f60e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import java.nio.file.Path; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class EntryFilterMetaData { + /** + * The definition of the entry filter. + */ + private EntryFilterDefinition definition; + + /** + * The path to the handler package. + */ + private Path archivePath; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java new file mode 100644 index 0000000000000..9e19a5784d3fc --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.ImmutableMap; +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; + +@Slf4j +public class EntryFilterProvider { + + static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter.yml"; + + /** + * create entry filter instance. + */ + public static ImmutableMap createEntryFilters( + ServiceConfiguration conf) throws IOException { + EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(), + conf.getNarExtractionDirectory()); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (String filterName : conf.getEntryFilterNames()) { + EntryFilterMetaData metaData = definitions.getFilters().get(filterName); + if (null == metaData) { + throw new RuntimeException("No entry filter is found for name `" + filterName + + "`. Available entry filters are : " + definitions.getFilters()); + } + EntryFilterWithClassLoader filter; + filter = load(metaData, conf.getNarExtractionDirectory()); + if (filter != null) { + builder.put(filterName, filter); + } + log.info("Successfully loaded entry filter for name `{}`", filterName); + } + return builder.build(); + } + + private static EntryFilterDefinitions searchForEntryFilters(String entryFiltersDirectory, + String narExtractionDirectory) + throws IOException { + Path path = Paths.get(entryFiltersDirectory).toAbsolutePath(); + log.info("Searching for entry filters in {}", path); + + EntryFilterDefinitions entryFilterDefinitions = new EntryFilterDefinitions(); + if (!path.toFile().exists()) { + log.info("Pulsar entry filters directory not found"); + return entryFilterDefinitions; + } + + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) { + for (Path archive : stream) { + try { + EntryFilterDefinition def = + getEntryFilterDefinition(archive.toString(), narExtractionDirectory); + log.info("Found entry filter from {} : {}", archive, def); + + checkArgument(StringUtils.isNotBlank(def.getName())); + checkArgument(StringUtils.isNotBlank(def.getEntryFilterClass())); + + EntryFilterMetaData metadata = new EntryFilterMetaData(); + metadata.setDefinition(def); + metadata.setArchivePath(archive); + + entryFilterDefinitions.getFilters().put(def.getName(), metadata); + } catch (Throwable t) { + log.warn("Failed to load entry filters from {}." + + " It is OK however if you want to use this entry filters," + + " please make sure you put the correct entry filter NAR" + + " package in the entry filter directory.", archive, t); + } + } + } + + return entryFilterDefinitions; + } + + private static EntryFilterDefinition getEntryFilterDefinition(String narPath, + String narExtractionDirectory) + throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), + narExtractionDirectory)) { + return getEntryFilterDefinition(ncl); + } + } + + private static EntryFilterDefinition getEntryFilterDefinition(NarClassLoader ncl) throws IOException { + String configStr = ncl.getServiceDefinition(ENTRY_FILTER_DEFINITION_FILE); + + return ObjectMapperFactory.getThreadLocalYaml().readValue( + configStr, EntryFilterDefinition.class + ); + } + + private static EntryFilterWithClassLoader load(EntryFilterMetaData metadata, + String narExtractionDirectory) + throws IOException { + NarClassLoader ncl = NarClassLoader.getFromArchive( + metadata.getArchivePath().toAbsolutePath().toFile(), + Collections.emptySet(), + EntryFilter.class.getClassLoader(), narExtractionDirectory); + + EntryFilterDefinition def = getEntryFilterDefinition(ncl); + if (StringUtils.isBlank(def.getEntryFilterClass())) { + throw new IOException("Entry filters `" + def.getName() + "` does NOT provide a entry" + + " filters implementation"); + } + + try { + Class entryFilterClass = ncl.loadClass(def.getEntryFilterClass()); + Object filter = entryFilterClass.getDeclaredConstructor().newInstance(); + if (!(filter instanceof EntryFilter)) { + throw new IOException("Class " + def.getEntryFilterClass() + + " does not implement entry filter interface"); + } + EntryFilter pi = (EntryFilter) filter; + return new EntryFilterWithClassLoader(pi, ncl); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } + log.error("Failed to load class {}", def.getEntryFilterClass(), e); + throw new IOException(e); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java new file mode 100644 index 0000000000000..8c2569ca6335b --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.common.nar.NarClassLoader; + +@Slf4j +public class EntryFilterWithClassLoader implements EntryFilter { + private final EntryFilter entryFilter; + private final NarClassLoader classLoader; + + public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) { + this.entryFilter = entryFilter; + this.classLoader = classLoader; + } + + @Override + public FilterResult filterEntry(Entry entry, FilterContext context) { + return entryFilter.filterEntry(entry, context); + } + + @Override + public void close() { + entryFilter.close(); + try { + classLoader.close(); + } catch (IOException e) { + log.error("close EntryFilterWithClassLoader failed", e); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java new file mode 100644 index 0000000000000..e520e1011f9ff --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import lombok.Data; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.common.api.proto.MessageMetadata; + +@Data +public class FilterContext { + private Subscription subscription; + private MessageMetadata msgMetadata; + + public void reset() { + subscription = null; + msgMetadata = null; + } + + public static final FilterContext FILTER_CONTEXT_DISABLED = new FilterContext(); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java new file mode 100644 index 0000000000000..05e54ca4831f8 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java new file mode 100644 index 0000000000000..dbcaa160b2391 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + + +import java.util.List; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.commons.collections4.MapUtils; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.common.api.proto.KeyValue; + +public class EntryFilter2Test implements EntryFilter { + @Override + public FilterResult filterEntry(Entry entry, FilterContext context) { + if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) { + return FilterResult.ACCEPT; + } + List list = context.getMsgMetadata().getPropertiesList(); + // filter by subscription properties + PersistentSubscription subscription = (PersistentSubscription) context.getSubscription(); + if (!MapUtils.isEmpty(subscription.getSubscriptionProperties())) { + for (KeyValue keyValue : list) { + if(subscription.getSubscriptionProperties().containsKey(keyValue.getKey())){ + return FilterResult.ACCEPT; + } + } + } + return FilterResult.REJECT; + } + + @Override + public void close() { + + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java new file mode 100644 index 0000000000000..812d49aa7b44e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + + +import java.util.List; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.common.api.proto.KeyValue; + +public class EntryFilterTest implements EntryFilter { + @Override + public FilterResult filterEntry(Entry entry, FilterContext context) { + if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) { + return FilterResult.ACCEPT; + } + List list = context.getMsgMetadata().getPropertiesList(); + // filter by string + for (KeyValue keyValue : list) { + if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) { + return FilterResult.ACCEPT; + } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){ + return FilterResult.REJECT; + } + } + return null; + } + + @Override + public void close() { + + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java new file mode 100644 index 0000000000000..03389352629e6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.AbstractBaseDispatcher; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class FilterEntryTest extends BrokerTestBase { + @BeforeMethod + @Override + protected void setup() throws Exception { + baseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + public void testFilter() throws Exception { + + String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); + String subName = "sub"; + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName(subName).subscribe(); + // mock entry filters + PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopicReference(topic).get().getSubscription(subName); + Dispatcher dispatcher = subscription.getDispatcher(); + Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters"); + field.setAccessible(true); + NarClassLoader narClassLoader = mock(NarClassLoader.class); + EntryFilter filter1 = new EntryFilterTest(); + EntryFilterWithClassLoader loader1 = spy(new EntryFilterWithClassLoader(filter1, narClassLoader)); + EntryFilter filter2 = new EntryFilter2Test(); + EntryFilterWithClassLoader loader2 = spy(new EntryFilterWithClassLoader(filter2, narClassLoader)); + field.set(dispatcher, ImmutableList.of(loader1, loader2)); + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topic).create(); + for (int i = 0; i < 10; i++) { + producer.send("test"); + } + + int counter = 0; + while (true) { + Message message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + counter++; + consumer.acknowledge(message); + } else { + break; + } + } + // All normal messages can be received + assertEquals(10, counter); + MessageIdImpl lastMsgId = null; + for (int i = 0; i < 10; i++) { + lastMsgId = (MessageIdImpl) producer.newMessage().property("REJECT", "").value("1").send(); + } + counter = 0; + while (true) { + Message message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + counter++; + consumer.acknowledge(message); + } else { + break; + } + } + // REJECT messages are filtered out + assertEquals(0, counter); + + // All messages should be acked, check the MarkDeletedPosition + assertNotNull(lastMsgId); + MessageIdImpl finalLastMsgId = lastMsgId; + Awaitility.await().untilAsserted(() -> { + PositionImpl position = (PositionImpl) subscription.getCursor().getMarkDeletedPosition(); + assertEquals(position.getLedgerId(), finalLastMsgId.getLedgerId()); + assertEquals(position.getEntryId(), finalLastMsgId.getEntryId()); + }); + consumer.close(); + + Map map = new HashMap<>(); + map.put("1","1"); + map.put("2","2"); + consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map) + .subscriptionName(subName).subscribe(); + for (int i = 0; i < 10; i++) { + producer.newMessage().property(String.valueOf(i), String.valueOf(i)).value("1").send(); + } + counter = 0; + while (true) { + Message message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + counter++; + consumer.acknowledge(message); + } else { + break; + } + } + assertEquals(2, counter); + + producer.close(); + consumer.close(); + + BrokerService brokerService = pulsar.getBrokerService(); + Field field1 = BrokerService.class.getDeclaredField("entryFilters"); + field1.setAccessible(true); + field1.set(brokerService, ImmutableMap.of("1", loader1, "2", loader2)); + cleanup(); + verify(loader1, times(1)).close(); + verify(loader2, times(1)).close(); + + } + +}