Skip to content

Commit

Permalink
Max unacked messages per consumer support cross multiple clusters (ap…
Browse files Browse the repository at this point in the history
…ache#13547)

Max unacked messages per consumer support cross multiple clusters
  • Loading branch information
315157973 authored Dec 30, 2021
1 parent 24e79d0 commit a07e7a1
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,8 @@ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(In
});
}

protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
.orElseGet(() -> {
if (applied) {
Expand All @@ -936,7 +936,8 @@ protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boo
}));
}

protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum,
boolean isGlobal) {
if (maxUnackedNum != null && maxUnackedNum < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
Expand All @@ -946,6 +947,7 @@ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Intege
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,12 @@ public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse async
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied))
.thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
return null;
Expand All @@ -370,13 +371,14 @@ public void setMaxUnackedMessagesOnConsumer(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setMaxUnackedMessagesOnConsumer", ex, asyncResponse);
Expand All @@ -393,11 +395,12 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteMaxUnackedMessagesOnConsumer", ex, asyncResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,25 @@ public void testReplicatorMaxConsumerPerSubPolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic)));
}

@Test
public void testReplicateMaxUnackedMsgPerConsumer() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, topic);
// set max unacked msgs per consumers
admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100);
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100));
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100));
// remove max unacked msgs per consumers
admin1.topicPolicies(true).removeMaxUnackedMessagesOnConsumer(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic)));
}

@Test
public void testReplicatorTopicPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,14 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99"));
verify(mockTopicsPolicies).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("remove-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies, times(1)).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies, times(1))
.getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);

cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10"));
Expand Down Expand Up @@ -1037,6 +1045,15 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("set-max-producers persistent://myprop/clust/ns1/ds1 -p 99 -g"));
verify(mockGlobalTopicsPolicies).setMaxProducers("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("remove-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies, times(1))
.removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies, times(1))
.getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -m 999 -g"));
verify(mockGlobalTopicsPolicies, times(1)).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);

cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10 -g"));
Expand Down Expand Up @@ -1068,8 +1085,6 @@ public void topicPolicies() throws Exception {
verify(mockGlobalTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));



cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap -g"));
verify(mockGlobalTopicsPolicies).getDispatchRate("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("remove-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());

jcommander.addCommand("get-max-unacked-messages-per-consumer", new GetMaxUnackedMessagesPerConsumer());
jcommander.addCommand("set-max-unacked-messages-per-consumer", new SetMaxUnackedMessagesPerConsumer());
jcommander.addCommand("remove-max-unacked-messages-per-consumer", new RemoveMaxUnackedMessagesPerConsumer());

jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
Expand Down Expand Up @@ -152,6 +157,60 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get max unacked messages policy per consumer for a topic")
private class GetMaxUnackedMessagesPerConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnConsumer(persistentTopic, applied));
}
}

@Parameters(commandDescription = "Remove max unacked messages policy per consumer for a topic")
private class RemoveMaxUnackedMessagesPerConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnConsumer(persistentTopic);
}
}

@Parameters(commandDescription = "Set max unacked messages policy per consumer for a topic")
private class SetMaxUnackedMessagesPerConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
private int maxNum;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
}
}

@Parameters(commandDescription = "Get the message TTL for a topic")
private class GetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("set-max-consumers");
cmdUsageFormatter.addDeprecatedCommand("remove-max-consumers");

cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-per-consumer");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-per-consumer");
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-per-consumer");

cmdUsageFormatter.addDeprecatedCommand("get-message-ttl");
cmdUsageFormatter.addDeprecatedCommand("set-message-ttl");
cmdUsageFormatter.addDeprecatedCommand("remove-message-ttl");
Expand Down

0 comments on commit a07e7a1

Please sign in to comment.