Skip to content

Commit

Permalink
Compaction threshold policies support cross multiple clusters (apache…
Browse files Browse the repository at this point in the history
…#13513)

* Compaction threshold policies support cross multiple clusters
  • Loading branch information
mattisonchao authored Dec 31, 2021
1 parent 98a59a9 commit 0738a08
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4224,8 +4224,8 @@ protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription(bool
});
}

protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getCompactionThreshold)
.orElseGet(() -> {
if (applied) {
Expand All @@ -4238,27 +4238,30 @@ protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied
}));
}

protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold , boolean isGlobal) {
if (compactionThreshold != null && compactionThreshold < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
}

return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setCompactionThreshold(compactionThreshold);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});

}

protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalRemoveCompactionThreshold(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setCompactionThreshold(null);
TopicPolicies topicPolicies = op.get();
topicPolicies.setCompactionThreshold(null);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2988,11 +2988,12 @@ public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetCompactionThreshold(applied))
.thenCompose(__ -> internalGetCompactionThreshold(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getCompactionThreshold", ex, asyncResponse);
Expand All @@ -3014,10 +3015,11 @@ public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold))
.thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold, isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully set topic compaction threshold:"
Expand Down Expand Up @@ -3048,11 +3050,12 @@ public void removeCompactionThreshold(@Suspended final AsyncResponse asyncRespon
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveCompactionThreshold())
.thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
clientAppId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,30 @@ public void testReplicateMaxUnackedMsgPerSub() throws Exception {
assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
}

@Test
public void testReplicatorCompactionThresholdPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

init(namespace, persistentTopicName);
// set compaction threshold
admin1.topicPolicies(true).setCompactionThreshold(persistentTopicName, 1);
// get compaction threshold
Awaitility.await().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true)
.getCompactionThreshold(persistentTopicName), Long.valueOf(1)));
Awaitility.await().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true)
.getCompactionThreshold(persistentTopicName), Long.valueOf(1)));

//remove compaction threshold
admin1.topicPolicies(true).removeCompactionThreshold(persistentTopicName);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
}

private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
final String cluster2 = pulsar2.getConfig().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,13 @@ public void topicPolicies() throws Exception {
+ " -e -t 1s -m delete_when_no_subscriptions"));
verify(mockTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
cmdTopics.run(split("get-compaction-threshold persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getCompactionThreshold("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-compaction-threshold persistent://myprop/clust/ns1/ds1 -t 10k"));
verify(mockTopicsPolicies).setCompactionThreshold("persistent://myprop/clust/ns1/ds1", 10 * 1024);
cmdTopics.run(split("remove-compaction-threshold persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeCompactionThreshold("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
Expand Down Expand Up @@ -1162,6 +1169,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99 -g"));
verify(mockGlobalTopicsPolicies).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-compaction-threshold persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getCompactionThreshold("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-compaction-threshold persistent://myprop/clust/ns1/ds1 -t 10k -g"));
verify(mockGlobalTopicsPolicies).setCompactionThreshold("persistent://myprop/clust/ns1/ds1", 10 * 1024);
cmdTopics.run(split("remove-compaction-threshold persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeCompactionThreshold("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2 -g"));
verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
DispatchRate.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());

jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());

jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());
Expand Down Expand Up @@ -991,6 +995,59 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get compaction threshold for a topic")
private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;
@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getCompactionThreshold(persistentTopic, applied));
}
}

@Parameters(commandDescription = "Set compaction threshold for a topic")
private class SetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--threshold", "-t" },
description = "Maximum number of bytes in a topic backlog before compaction is triggered "
+ "(eg: 10M, 16G, 3T). 0 disables automatic compaction",
required = true)
private String thresholdStr = "0";
@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
long threshold = validateSizeString(thresholdStr);
getTopicPolicies(isGlobal).setCompactionThreshold(persistentTopic, threshold);
}
}

@Parameters(commandDescription = "Remove compaction threshold for a topic")
private class RemoveCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeCompactionThreshold(persistentTopic);
}
}

@Parameters(commandDescription = "Get message dispatch rate for a topic")
private class GetDispatchRate extends CliCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("set-inactive-topic-policies");
cmdUsageFormatter.addDeprecatedCommand("remove-inactive-topic-policies");

cmdUsageFormatter.addDeprecatedCommand("get-compaction-threshold");
cmdUsageFormatter.addDeprecatedCommand("set-compaction-threshold");
cmdUsageFormatter.addDeprecatedCommand("remove-compaction-threshold");

cmdUsageFormatter.addDeprecatedCommand("get-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("set-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-dispatch-rate");
Expand Down

0 comments on commit 0738a08

Please sign in to comment.