Skip to content

Commit

Permalink
[fix][broker] Validate per-[namespace][topic] entry filters (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Feb 3, 2023
1 parent 11073fd commit 75d58fd
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -39,7 +40,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
Expand All @@ -52,6 +55,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -786,6 +790,26 @@ protected void validatePersistencePolicies(PersistencePolicies persistence) {

}

protected void validateEntryFilters(EntryFilters entryFilters) {
if (entryFilters == null) {
// remove entry filters
return;
}
if (StringUtils.isBlank(entryFilters.getEntryFilterNames())
|| Arrays.stream(entryFilters.getEntryFilterNames().split(","))
.filter(n -> StringUtils.isNotBlank(n))
.findAny().isEmpty()) {
throw new RestException(new RestException(Status.BAD_REQUEST,
"entryFilterNames can't be empty. To remove entry filters use the remove method."));
}
try {
pulsar().getBrokerService().getEntryFilterProvider()
.validateEntryFilters(entryFilters.getEntryFilterNames());
} catch (InvalidEntryFilterException ex) {
throw new RestException(new RestException(Status.BAD_REQUEST, ex));
}
}

/**
* Check current exception whether is redirect exception.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2479,12 +2479,14 @@ protected void internalScanOffloadedLedgers(OffloaderObjectsScannerUtils.Scanner
protected CompletableFuture<Void> internalSetEntryFiltersPerTopicAsync(EntryFilters entryFilters) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> validateEntryFilters(entryFilters))
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.entryFilters = entryFilters;
return policies;
}));
}


/**
* Base method for setReplicatorDispatchRate v1 and v2.
* Notion: don't re-use this logic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5378,34 +5378,47 @@ protected CompletableFuture<Void> internalSetSchemaValidationEnforced(boolean sc

protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean applied, boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getEntryFilters)
.orElseGet(() -> {
if (applied) {
EntryFilters entryFilters = getNamespacePolicies(namespaceName).entryFilters;
if (entryFilters == null) {
return new EntryFilters(String.join(",",
pulsar().getConfiguration().getEntryFilterNames()));
.thenCompose(__ -> {
if (!applied) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getEntryFilters).orElse(null));
}
if (!pulsar().getConfiguration().isAllowOverrideEntryFilters()) {
return CompletableFuture.completedFuture(new EntryFilters(String.join(",",
pulsar().getConfiguration().getEntryFilterNames())));
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getEntryFilters))
.thenCompose(policyEntryFilters -> {
if (policyEntryFilters.isPresent()) {
return CompletableFuture.completedFuture(policyEntryFilters.get());
}
return entryFilters;
}
return null;
})));

return getNamespacePoliciesAsync(namespaceName)
.thenApply(policies -> policies.entryFilters)
.thenCompose(nsEntryFilters -> {
if (nsEntryFilters != null) {
return CompletableFuture.completedFuture(nsEntryFilters);
}
return CompletableFuture.completedFuture(new EntryFilters(String.join(",",
pulsar().getConfiguration().getEntryFilterNames())));
});
});
});
}

protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters entryFilters,
boolean isGlobal) {

return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenAccept(__ -> validateEntryFilters(entryFilters))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setEntryFilters(entryFilters);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setEntryFilters(entryFilters);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}

protected CompletableFuture<Void> internalRemoveEntryFilters(boolean isGlobal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2771,8 +2771,11 @@ public void getEntryFiltersPerTopic(
@POST
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Set entry filters for namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Specified entry filters are not valid"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")
})
public void setEntryFiltersPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "entry filters", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,33 @@ protected void initializeBrokerEntryFilters() throws IOException {
}
}

public void validateEntryFilters(String entryFilterNames) throws InvalidEntryFilterException {
if (StringUtils.isBlank(entryFilterNames)) {
return;
}
final List<String> entryFilterList = readEntryFiltersString(entryFilterNames);
for (String filterName : entryFilterList) {
EntryFilterMetaData metaData = definitions.get(filterName);
if (metaData == null) {
throw new InvalidEntryFilterException("Entry filter '" + filterName + "' not found");
}
}
}

private List<String> readEntryFiltersString(String entryFilterNames) {
final List<String> entryFilterList = Arrays.stream(entryFilterNames.split(","))
.filter(n -> StringUtils.isNotBlank(n))
.toList();
return entryFilterList;
}

public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters policy)
throws IOException {
final String names = policy.getEntryFilterNames();
if (StringUtils.isBlank(names)) {
return Collections.emptyList();
}
final List<String> entryFilterList = Arrays.stream(names.split(","))
.filter(n -> StringUtils.isNotBlank(n))
.toList();
final List<String> entryFilterList = readEntryFiltersString(names);
return loadEntryFilters(entryFilterList);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.plugin;

public class InvalidEntryFilterException extends Exception {

public InvalidEntryFilterException(String message) {
super(message);
}

public InvalidEntryFilterException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,7 @@ public void testSetEntryFiltersHierarchy() throws Exception {
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);
conf.setEntryFilterNames(List.of("test", "test1"));
conf.setAllowOverrideEntryFilters(true);

testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition(
"test",
Expand All @@ -2420,6 +2421,8 @@ public void testSetEntryFiltersHierarchy() throws Exception {
.topic(fullTopicName)
.create();
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false));
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
new EntryFilters("test,test1"));
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
Expand All @@ -2431,6 +2434,8 @@ public void testSetEntryFiltersHierarchy() throws Exception {
EntryFilters nsEntryFilters = new EntryFilters("test");
admin.namespaces().setNamespaceEntryFilters("prop-xyz/ns1", nsEntryFilters);
assertEquals(admin.namespaces().getNamespaceEntryFilters("prop-xyz/ns1"), nsEntryFilters);
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
new EntryFilters("test"));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
Expand Down Expand Up @@ -2459,6 +2464,8 @@ public void testSetEntryFiltersHierarchy() throws Exception {
admin.topicPolicies().setEntryFiltersPerTopic(topic, topicEntryFilters);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false), topicEntryFilters));
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, true),
new EntryFilters("test1"));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
Expand All @@ -2484,6 +2491,122 @@ public void testSetEntryFiltersHierarchy() throws Exception {
}
}

@Test(timeOut = 30000)
public void testValidateNamespaceEntryFilters() throws Exception {
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);

testEntryFilterProvider
.setMockEntryFilters(new EntryFilterDefinition(
"test",
null,
EntryFilterTest.class.getName()
));
final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", testEntryFilterProvider, true);

try {
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters("notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters(""));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters(","));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.namespaces().setNamespaceEntryFilters(myNamespace, new EntryFilters("test,notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
} finally {
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", oldEntryFilterProvider, true);
}
}

@Test(timeOut = 30000)
public void testValidateTopicEntryFilters() throws Exception {
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);

testEntryFilterProvider
.setMockEntryFilters(new EntryFilterDefinition(
"test",
null,
EntryFilterTest.class.getName()
));
final EntryFilterProvider oldEntryFilterProvider = pulsar.getBrokerService().getEntryFilterProvider();
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", testEntryFilterProvider, true);

try {
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topicName = myNamespace + "/topic";
admin.topics().createNonPartitionedTopic(topicName);
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topicName)
.create();
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters("notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters(""));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters(","));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "entryFilterNames can't be empty. " +
"To remove entry filters use the remove method.");
}
try {
admin.topicPolicies().setEntryFiltersPerTopic(topicName, new EntryFilters("test,notexists"));
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 400);
assertEquals(e.getMessage(), "Entry filter 'notexists' not found");
}
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topicName, false));
} finally {
FieldUtils.writeField(pulsar.getBrokerService(),
"entryFilterProvider", oldEntryFilterProvider, true);
}
}

@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
Expand Down

0 comments on commit 75d58fd

Please sign in to comment.