Skip to content

Commit

Permalink
Max consumers per subscription support cross multiple clusters (apach…
Browse files Browse the repository at this point in the history
…e#13522)

Max consumers per subscription support cross multiple clusters
  • Loading branch information
315157973 authored Dec 28, 2021
1 parent 96b3e92 commit 0bc45b5
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4178,30 +4178,33 @@ protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
}


protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxConsumersPerSubscription));
}

protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(
Integer maxConsumersPerSubscription, boolean isGlobal) {
if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
}
return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setMaxConsumersPerSubscription(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3064,11 +3064,12 @@ public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncR
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetMaxConsumersPerSubscription())
.thenCompose(__ -> internalGetMaxConsumersPerSubscription(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
.exceptionally(ex -> {
Expand All @@ -3090,12 +3091,13 @@ public void setMaxConsumersPerSubscription(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription))
.thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully set topic max consumers per subscription:"
Expand Down Expand Up @@ -3126,11 +3128,12 @@ public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asy
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveMaxConsumersPerSubscription())
.thenCompose(__ -> internalRemoveMaxConsumersPerSubscription(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic max consumers per subscription:"
+ " tenant={}, namespace={}, topic={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,32 @@ public void testReplicatorMaxProducer() throws Exception {
assertNull(admin3.topicPolicies(true).getMaxProducers(topic)));
}

@Test
public void testReplicatorMaxConsumerPerSubPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();

init(namespace, topic);
// set max consumer per sub
admin1.topicPolicies(true).setMaxConsumersPerSubscription(topic, 100);
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100));
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100));

Awaitility.await().untilAsserted(() -> {
assertEquals(admin1.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100);
assertNull(admin1.topicPolicies().getMaxConsumersPerSubscription(topic));
});

//remove max consumer per sub
admin1.topicPolicies(true).removeMaxConsumersPerSubscription(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic)));
}

@Test
public void testReplicatorTopicPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -c 5"));
verify(mockTopicsPolicies).setMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1", 5);
cmdTopics.run(split("remove-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
Expand Down Expand Up @@ -1034,6 +1041,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -c 5 -g"));
verify(mockGlobalTopicsPolicies).setMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1", 5);
cmdTopics.run(split("remove-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover -g"));
verify(mockGlobalTopicsPolicies).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
jcommander.addCommand("set-subscription-types-enabled", new SetSubscriptionTypesEnabled());
jcommander.addCommand("get-subscription-types-enabled", new GetSubscriptionTypesEnabled());
jcommander.addCommand("remove-subscription-types-enabled", new RemoveSubscriptionTypesEnabled());
Expand Down Expand Up @@ -77,6 +80,57 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
}

@Parameters(commandDescription = "Get max consumers per subscription for a topic")
private class GetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxConsumersPerSubscription(persistentTopic));
}
}

@Parameters(commandDescription = "Set max consumers per subscription for a topic")
private class SetMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
private int maxConsumersPerSubscription;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxConsumersPerSubscription(persistentTopic, maxConsumersPerSubscription);
}
}

@Parameters(commandDescription = "Remove max consumers per subscription for a topic")
private class RemoveMaxConsumersPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxConsumersPerSubscription(persistentTopic);
}
}

@Parameters(commandDescription = "Get the message TTL for a topic")
private class GetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("set-message-ttl");
cmdUsageFormatter.addDeprecatedCommand("remove-message-ttl");

cmdUsageFormatter.addDeprecatedCommand("get-max-consumers-per-subscription");
cmdUsageFormatter.addDeprecatedCommand("set-max-consumers-per-subscription");
cmdUsageFormatter.addDeprecatedCommand("remove-max-consumers-per-subscription");

cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-consumer");
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-consumer");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-consumer");
Expand Down

0 comments on commit 0bc45b5

Please sign in to comment.