Skip to content

Commit

Permalink
Support get applied compaction threshold (apache#10038)
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 authored Mar 26, 2021
1 parent 0992634 commit d81f507
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3798,8 +3798,19 @@ protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected Optional<Long> internalGetCompactionThreshold() {
return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied) {
Long threshold = getTopicPolicies(topicName)
.map(TopicPolicies::getCompactionThreshold)
.orElseGet(() -> {
if (applied) {
Long namespacePolicy = getNamespacePolicies(namespaceName).compaction_threshold;
return namespacePolicy == null
? pulsar().getConfiguration().getBrokerServiceCompactionThresholdInBytes()
: namespacePolicy;
}
return null;
});
return CompletableFuture.completedFuture(threshold);
}

protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2676,21 +2676,12 @@ public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncR
public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<Long> compactionThreshold = internalGetCompactionThreshold();
if (!compactionThreshold.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(compactionThreshold.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
internalGetCompactionThreshold(applied).whenComplete((res, ex)
-> internalHandleResult(asyncResponse, res, ex, "Failed get compaction threshold"));
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2008,4 +2008,32 @@ public void testSubscriptionTypesEnabled() throws Exception {
}
}

@Test(timeOut = 20000)
public void testGetCompactionThresholdApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getCompactionThreshold(topic));
assertNull(admin.namespaces().getCompactionThreshold(myNamespace));
long brokerPolicy = pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes();
assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
long namespacePolicy = 10L;

admin.namespaces().setCompactionThreshold(myNamespace, namespacePolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getCompactionThreshold(myNamespace)));
assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), namespacePolicy);

long topicPolicy = 20L;
admin.topics().setCompactionThreshold(topic, topicPolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getCompactionThreshold(topic)));
assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), topicPolicy);

admin.namespaces().removeCompactionThreshold(myNamespace);
admin.topics().removeCompactionThreshold(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getCompactionThreshold(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getCompactionThreshold(topic)));
assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2574,6 +2574,23 @@ void setInactiveTopicPolicies(String topic
*/
CompletableFuture<Long> getCompactionThresholdAsync(String topic);

/**
* Get the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* @param topic Topic name
* @throws NotAuthorizedException Don't have admin permission
* @throws NotFoundException Namespace does not exist
* @throws PulsarAdminException Unexpected error
*/
Long getCompactionThreshold(String topic, boolean applied) throws PulsarAdminException;

/**
* Get the compactionThreshold for a topic asynchronously. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
* @param topic Topic name
*/
CompletableFuture<Long> getCompactionThresholdAsync(String topic, boolean applied);

/**
* Set the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2566,8 +2566,18 @@ public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic)

@Override
public Long getCompactionThreshold(String topic) throws PulsarAdminException {
return getCompactionThreshold(topic, false);
}

@Override
public CompletableFuture<Long> getCompactionThresholdAsync(String topic) {
return getCompactionThresholdAsync(topic, false);
}

@Override
public Long getCompactionThreshold(String topic, boolean applied) throws PulsarAdminException {
try {
return getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getCompactionThresholdAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -2579,9 +2589,10 @@ public Long getCompactionThreshold(String topic) throws PulsarAdminException {
}

@Override
public CompletableFuture<Long> getCompactionThresholdAsync(String topic) {
public CompletableFuture<Long> getCompactionThresholdAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "compactionThreshold");
path = path.queryParam("applied", applied);
final CompletableFuture<Long> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,13 @@ public void topics() throws Exception {
cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics, times(2)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-compaction-threshold persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getCompactionThreshold("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-compaction-threshold persistent://myprop/clust/ns1/ds1 -t 10k"));
verify(mockTopics).setCompactionThreshold("persistent://myprop/clust/ns1/ds1", 10 * 1024);
cmdTopics.run(split("remove-compaction-threshold persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeCompactionThreshold("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-message-size persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxMessageSize("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-message-size persistent://myprop/clust/ns1/ds1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1739,10 +1740,13 @@ private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getCompactionThreshold(persistentTopic));
print(getAdmin().topics().getCompactionThreshold(persistentTopic, applied));
}
}

Expand Down

0 comments on commit d81f507

Please sign in to comment.