Skip to content

Commit

Permalink
[Broker] Support enable subscription types. (apache#9401)
Browse files Browse the repository at this point in the history
## Motivation
Support setting enable subscription types. in broker/namespace/topic 
### Verifying this change
and the test
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
  • Loading branch information
congbobo184 authored Feb 19, 2021
1 parent cf63ae8 commit 6ea6a9b
Show file tree
Hide file tree
Showing 18 changed files with 685 additions and 3 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,14 @@ subscriptionRedeliveryTrackerEnabled=true
# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5

# Enable subscription types (default is all type enabled)
# SubscriptionTypes : Exclusive,Shared,Failover,Key_Shared
# Example : Exclusive,Shared
# Above example will disable Failover and Key_Shared subscription types
subscriptionTypesEnabled=Exclusive,Shared,Failover,Key_Shared

# Enable Key_Shared subscription (default is enabled)
# @deprecated since 2.8.0 subscriptionTypesEnabled is preferred over subscriptionKeySharedEnable.
subscriptionKeySharedEnable=true

# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int subscriptionExpiryCheckIntervalInMinutes = 5;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Enable subscription types (default is all type enabled)"
)
private Set<String> subscriptionTypesEnabled =
Sets.newHashSet("Exclusive", "Shared", "Failover", "Key_Shared");

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -60,6 +61,8 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -2309,6 +2312,28 @@ protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchem
"isAllowAutoUpdateSchema");
}

protected Set<SubscriptionType> internalGetSubscriptionTypesEnabled() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE,
PolicyOperation.READ);
Set<SubscriptionType> subscriptionTypes = new HashSet<>();
getNamespacePolicies(namespaceName).subscription_types_enabled.forEach(subType ->
subscriptionTypes.add(SubscriptionType.valueOf(subType.name())));
return subscriptionTypes;
}

protected void internalSetSubscriptionTypesEnabled(Set<SubscriptionType> subscriptionTypesEnabled) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE,
PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Set<SubType> subTypes = new HashSet<>();
subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name())));
mutatePolicy((policies) -> {
policies.subscription_types_enabled = subTypes;
return policies;
}, (policies) -> policies.subscription_types_enabled,
"subscriptionTypesEnabled");
}


private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation,
Function<Policies, T> getter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
Expand Down Expand Up @@ -3770,6 +3772,23 @@ protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<List<SubType>> internalGetSubscriptionTypesEnabled() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionTypesEnabled);

}

protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled(
Set<SubscriptionType> subscriptionTypesEnabled) {
List<SubType> subTypes = Lists.newArrayList();
subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name())));
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
topicPolicies.setSubscriptionTypesEnabled(subTypes);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemovePublishRate() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -1444,6 +1445,34 @@ public void setIsAllowAutoUpdateSchema(
internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema);
}

@GET
@Path("/{tenant}/{namespace}/subscriptionTypesEnabled")
@ApiOperation(value = "The set of whether allow subscription types")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public Set<SubscriptionType> getSubscriptionTypesEnabled(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetSubscriptionTypesEnabled();
}

@POST
@Path("/{tenant}/{namespace}/subscriptionTypesEnabled")
@ApiOperation(value = "Update set of whether allow share sub type")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setSubscriptionTypesEnabled(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "Set of whether allow subscription types", required = true)
Set<SubscriptionType> subscriptionTypesEnabled) {
validateNamespaceName(tenant, namespace);
internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled);
}


@GET
@Path("/{tenant}/{namespace}/schemaValidationEnforced")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -51,8 +52,10 @@
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -2950,6 +2953,73 @@ public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
@ApiOperation(value = "Get is enable sub type fors specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<List<SubType>> subscriptionTypesEnabled = internalGetSubscriptionTypesEnabled();
if (!subscriptionTypesEnabled.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
Set<SubscriptionType> subscriptionTypes = new HashSet<>();
subscriptionTypesEnabled.get().forEach(subType ->
subscriptionTypes.add(SubscriptionType.valueOf(subType.name())));
asyncResponse.resume(subscriptionTypes);
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscriptionTypesEnabled")
@ApiOperation(value = "Set is enable sub types for specified topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Enable sub types for the specified topic")
Set<SubscriptionType> subscriptionTypesEnabled) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic is enable sub types", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic is enable sub types", ex);
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic is enabled sub types :"
+ " tenant={}, namespace={}, topic={}, subscriptionTypesEnabled={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(subscriptionTypesEnabled));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Remove message publish rate configuration for specified topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
Expand Down Expand Up @@ -632,6 +633,20 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
);
return future;
}

try {
if (!topic.endsWith(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
&& !checkSubscriptionTypesEnable(subType)) {
future.completeExceptionally(
new NotAllowedException("Topic[{" + topic + "}] don't support "
+ subType.name() + " sub type!"));
return future;
}
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}

if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
Expand Down Expand Up @@ -2780,6 +2795,40 @@ private boolean checkMaxSubscriptionsPerTopicExceed() {
return false;
}

public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
TopicName topicName = TopicName.get(topic);
if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
TopicPolicies topicPolicies =
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
if (topicPolicies == null) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
} else {
if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
}
} else {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
}

private boolean checkNsAndBrokerSubscriptionTypesEnable(TopicName topicName, SubType subType) throws Exception {
Optional<Policies> policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespaceObject().toString()));
if (policies.isPresent()) {
if (policies.get().subscription_types_enabled.isEmpty()) {
return getBrokerService().getPulsar().getConfiguration()
.getSubscriptionTypesEnabled().contains(subType.name());
} else {
return policies.get().subscription_types_enabled.contains(subType);
}
} else {
return getBrokerService().getPulsar().getConfiguration()
.getSubscriptionTypesEnabled().contains(subType.name());
}
}

public PositionImpl getMaxReadPosition() {
return this.transactionBuffer.getMaxReadPosition();
}
Expand Down
Loading

0 comments on commit 6ea6a9b

Please sign in to comment.