Skip to content

Commit

Permalink
Subscription types support across multiple clusters (apache#13482)
Browse files Browse the repository at this point in the history
* Subscription Types support cross multiple clusters
  • Loading branch information
315157973 authored Dec 26, 2021
1 parent 53bc0d5 commit cadc011
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4255,30 +4255,32 @@ protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate
});
}

protected CompletableFuture<Optional<List<SubType>>> internalGetSubscriptionTypesEnabled() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Optional<List<SubType>>> internalGetSubscriptionTypesEnabled(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getSubscriptionTypesEnabled));
}

protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled(
Set<SubscriptionType> subscriptionTypesEnabled) {
Set<SubscriptionType> subscriptionTypesEnabled, boolean isGlobal) {
List<SubType> subTypes = Lists.newArrayList();
subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name())));
return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSubscriptionTypesEnabled(subTypes);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setSubscriptionTypesEnabled(Lists.newArrayList());
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3242,10 +3242,11 @@ public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetSubscriptionTypesEnabled())
.thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal))
.thenAccept(op -> {
asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build());
Expand All @@ -3269,12 +3270,13 @@ public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Enable sub types for the specified topic")
Set<SubscriptionType> subscriptionTypesEnabled) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled))
.thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully set topic is enabled sub types :"
Expand Down Expand Up @@ -3306,10 +3308,11 @@ public void removeSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncR
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveSubscriptionTypesEnabled())
.thenCompose(__ -> internalRemoveSubscriptionTypesEnabled(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}",
clientAppId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -173,6 +177,27 @@ public void testReplicatorTopicPolicies() throws Exception {
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getRetention(persistentTopicName)));
}
@Test
public void testReplicateSubscriptionTypesPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, topic);
Set<SubscriptionType> subscriptionTypes = new HashSet<>();
subscriptionTypes.add(SubscriptionType.Shared);
// set subscription types policies
admin1.topicPolicies(true).setSubscriptionTypesEnabled(topic, subscriptionTypes);
Awaitility.await().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes));
Awaitility.await().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes));
// remove subscription types policies
admin1.topicPolicies(true).removeSubscriptionTypesEnabled(topic);
Awaitility.await().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));
Awaitility.await().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));

}

private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,20 @@ CompletableFuture<Void> setSubscriptionTypesEnabledAsync(String topic,
*/
CompletableFuture<Set<SubscriptionType>> getSubscriptionTypesEnabledAsync(String topic);

/**
* Remove subscription types enabled for a topic.
* @param topic
* @throws PulsarAdminException
*/
void removeSubscriptionTypesEnabled(String topic) throws PulsarAdminException;

/**
* Remove subscription types enabled for a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String topic);

/**
* Set topic-subscribe-rate (topic will limit by subscribeRate).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
Expand Down Expand Up @@ -1231,6 +1234,28 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public void removeSubscriptionTypesEnabled(String topic) throws PulsarAdminException {
try {
removeSubscriptionTypesEnabledAsync(topic)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeSubscriptionTypesEnabledAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscriptionTypesEnabled");
return asyncDeleteRequest(path);
}

@Override
public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
return getSubscribeRate(topic, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,14 @@ public void topicPolicies() throws Exception {

CmdTopicPolicies cmdTopics = new CmdTopicPolicies(() -> admin);

cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover"));
verify(mockTopicsPolicies).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
cmdTopics.run(split("get-subscription-types-enabled persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-subscription-types-enabled persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getRetention("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-retention persistent://myprop/clust/ns1/ds1 -t 10m -s 20M"));
Expand Down Expand Up @@ -981,6 +989,14 @@ public void topicPolicies() throws Exception {
verify(mockGlobalTopicsPolicies).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover -g"));
verify(mockGlobalTopicsPolicies).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
cmdTopics.run(split("get-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.TopicPolicies;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand All @@ -42,6 +45,9 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
jcommander.addCommand("set-subscription-types-enabled", new SetSubscriptionTypesEnabled());
jcommander.addCommand("get-subscription-types-enabled", new GetSubscriptionTypesEnabled());
jcommander.addCommand("remove-subscription-types-enabled", new RemoveSubscriptionTypesEnabled());
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
jcommander.addCommand("remove-retention", new RemoveRetention());
Expand Down Expand Up @@ -117,6 +123,69 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Set subscription types enabled for a topic")
private class SetSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--types", "-t"}, description = "Subscription types enabled list (comma separated values)."
+ " Possible values: (Exclusive, Shared, Failover, Key_Shared).", required = true)
private List<String> subTypes;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
Set<SubscriptionType> types = new HashSet<>();
subTypes.forEach(s -> {
SubscriptionType subType;
try {
subType = SubscriptionType.valueOf(s);
} catch (IllegalArgumentException exception) {
throw new ParameterException(String.format("Illegal subscription type %s. Possible values: %s.", s,
Arrays.toString(SubscriptionType.values())));
}
types.add(subType);
});
getTopicPolicies(isGlobal).setSubscriptionTypesEnabled(persistentTopic, types);
}
}

@Parameters(commandDescription = "Get subscription types enabled for a topic")
private class GetSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getSubscriptionTypesEnabled(persistentTopic));
}
}

@Parameters(commandDescription = "Remove subscription types enabled for a topic")
private class RemoveSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeSubscriptionTypesEnabled(persistentTopic);
}
}

@Parameters(commandDescription = "Get the retention policy for a topic")
private class GetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down Expand Up @@ -184,8 +253,7 @@ private class RemoveRetention extends CliCommand {
private List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously"
, arity = 0)
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("get-deduplication");
cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");

cmdUsageFormatter.addDeprecatedCommand("set-subscription-types-enabled");
cmdUsageFormatter.addDeprecatedCommand("get-subscription-types-enabled");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-types-enabled");
}
}

Expand Down Expand Up @@ -2004,7 +2008,6 @@ void run() throws PulsarAdminException {
}
}


@Parameters(commandDescription = "Set subscription types enabled for a topic")
private class SetSubscriptionTypesEnabled extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down

0 comments on commit cadc011

Please sign in to comment.