Skip to content

Commit

Permalink
[feature][broker] Allow to configure the entry filters per namespace …
Browse files Browse the repository at this point in the history
…and per topic (apache#17153)
  • Loading branch information
gaozhangmin authored Aug 30, 2022
1 parent cb14208 commit 2f4af65
Show file tree
Hide file tree
Showing 30 changed files with 986 additions and 17 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ entryFilterNames=
# The directory for all the entry filter implementations
entryFiltersDirectory=

# Whether allow topic level entry filters policies overrides broker configuration.
allowOverrideEntryFilters=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String entryFiltersDirectory = "";

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Whether allow topic level entry filters policies overrides broker configuration."
)
private boolean allowOverrideEntryFilters = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
Expand Down Expand Up @@ -2722,5 +2723,14 @@ protected void internalScanOffloadedLedgers(OffloaderObjectsScannerUtils.Scanner

}

protected CompletableFuture<Void> internalSetEntryFiltersPerTopicAsync(EntryFilters entryFilters) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.entryFilters = entryFilters;
return policies;
}));
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -5335,4 +5336,50 @@ protected CompletableFuture<Void> internalSetSchemaValidationEnforced(boolean sc
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean applied, boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getEntryFilters)
.orElseGet(() -> {
if (applied) {
EntryFilters entryFilters = getNamespacePolicies(namespaceName).entryFilters;
if (entryFilters == null) {
return new EntryFilters(String.join(",",
pulsar().getConfiguration().getEntryFilterNames()));
}
return entryFilters;
}
return null;
})));

}

protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters entryFilters,
boolean isGlobal) {

return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setEntryFilters(entryFilters);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}

protected CompletableFuture<Void> internalRemoveEntryFilters(boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setEntryFilters(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -2658,5 +2659,66 @@ public void finished(int total, int errors, int unknown) throws Exception {
}
}

@GET
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void getEntryFiltersPerTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(polices -> asyncResponse.resume(polices.entryFilters))
.exceptionally(ex -> {
log.error("[{}] Failed to get entry filters config on namespace {}: {} ",
clientAppId(), namespaceName, ex.getCause().getMessage(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Set entry filters for namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public void setEntryFiltersPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "entry filters", required = true)
EntryFilters entryFilters) {
validateNamespaceName(tenant, namespace);
internalSetEntryFiltersPerTopicAsync(entryFilters)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to set entry filters for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Remove entry filters for namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid TTL")})
public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetEntryFiltersPerTopicAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to remove entry filters for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}



private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -53,6 +54,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
Expand Down Expand Up @@ -509,6 +511,96 @@ protected void validateAdminOperationOnTopic(TopicName topicName, boolean author
validateTopicOwnership(topicName, authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/entryFilters")
@ApiOperation(value = "Get entry filters for a topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") })
public void getEntryFilters(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this call to this "
+ "broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetEntryFilters(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getEntryFilters", ex, asyncResponse);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/entryFilters")
@ApiOperation(value = "Set entry filters for specified topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setEntryFilters(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this "
+ "call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Enable sub types for the specified topic")
EntryFilters entryFilters) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetEntryFilters(entryFilters, isGlobal))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setEntryFilters", ex, asyncResponse);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/entryFilters")
@ApiOperation(value = "Remove entry filters for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeEntryFilters(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Whether leader broker redirected this"
+ "call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveEntryFilters(isGlobal))
.thenRun(() -> {
log.info(
"[{}] Successfully remove entry filters: tenant={}, namespace={}, topic={}, isGlobal={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
handleTopicPolicyException("removeEntryFilters", ex, asyncResponse);
return null;
});
}

private Topic getTopicReference(TopicName topicName) {
try {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
Expand Down
Loading

0 comments on commit 2f4af65

Please sign in to comment.