Skip to content

Commit

Permalink
Support get topic applied policy for Retention (apache#9362)
Browse files Browse the repository at this point in the history
Master Issue: apache#9216
### Modifications
1. Add applied API
2. Add remove API

### Verifying this change
1. Test whether the priority is correct when policies of different levels exist at the same time
2. Test applied API works
  • Loading branch information
315157973 authored Feb 7, 2021
1 parent 3d5d6f6 commit 08c4b04
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2538,6 +2538,9 @@ private void validatePolicies(NamespaceName ns, Policies policies) {
}

protected void validateRetentionPolicies(RetentionPolicies retention) {
if (retention == null) {
return;
}
checkArgument(retention.getRetentionSizeInMB() >= -1,
"Invalid retention policy: size limit must be >= -1");
checkArgument(retention.getRetentionTimeInMinutes() >= -1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2637,15 +2637,19 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
}

protected void internalGetRetention(AsyncResponse asyncResponse){
protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied){
preValidation();
Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
.map(TopicPolicies::getRetentionPolicies);
if (!retention.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(retention.get());
}
RetentionPolicies retentionPolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
if (applied) {
RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies;
return policies == null ? new RetentionPolicies(
config().getDefaultRetentionTimeInMinutes(), config().getDefaultRetentionSizeInMB())
: policies;
}
return null;
});
asyncResponse.resume(retentionPolicies == null ? Response.noContent().build() : retentionPolicies);
}

protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,19 @@ public void setRetention(@PathParam("tenant") String tenant, @PathParam("namespa
internalSetRetention(retention);
}

@DELETE
@Path("/{tenant}/{namespace}/retention")
@ApiOperation(value = " Remove retention configuration on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
public void removeRetention(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
validateNamespaceName(tenant, namespace);
internalSetRetention(null);
}

@POST
@Path("/{tenant}/{namespace}/persistence")
@ApiOperation(value = "Set the persistence configuration for all the topics on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1669,10 +1669,11 @@ public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncRespo
public void getRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
try {
internalGetRetention(asyncResponse);
internalGetRetention(asyncResponse, applied);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -305,6 +307,93 @@ public void testRemoveRetention() throws Exception {
admin.topics().deletePartitionedTopic(testTopic, true);
}

@Test(timeOut = 10000)
public void testRetentionAppliedApi() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
RetentionPolicies brokerPolicies =
new RetentionPolicies(conf.getDefaultRetentionTimeInMinutes(), conf.getDefaultRetentionSizeInMB());
assertEquals(admin.topics().getRetention(topic, true), brokerPolicies);

RetentionPolicies namespacePolicies = new RetentionPolicies(10, 20);
admin.namespaces().setRetention(myNamespace, namespacePolicies);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies));

RetentionPolicies topicPolicies = new RetentionPolicies(20,30);
admin.topics().setRetention(topic, topicPolicies);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), topicPolicies));

admin.topics().removeRetention(topic);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies));

admin.namespaces().removeRetention(myNamespace);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), brokerPolicies));
}

@Test(timeOut = 20000)
public void testRetentionPriority() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getRetention(topic));
assertNull(admin.namespaces().getRetention(myNamespace));

PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Method shouldTopicBeRetained = PersistentTopic.class.getDeclaredMethod("shouldTopicBeRetained");
shouldTopicBeRetained.setAccessible(true);
Field lastActive = PersistentTopic.class.getSuperclass().getDeclaredField("lastActive");
lastActive.setAccessible(true);
//set last active to 2 minutes ago
lastActive.setLong(persistentTopic, System.nanoTime() - TimeUnit.MINUTES.toNanos(2));
//the default value of the broker-level is 0, so it is not retained by default
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//set namespace-level policy
RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
admin.namespaces().setRetention(myNamespace, retentionPolicies);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// set topic-level policy
admin.topics().setRetention(topic, new RetentionPolicies(3, 1));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getRetention(topic)));
assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//topic-level disabled
admin.topics().setRetention(topic, new RetentionPolicies(0, 0));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic).getRetentionSizeInMB(), 0));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// remove topic-level policy
admin.topics().removeRetention(topic);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getRetention(topic)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//namespace-level disabled
admin.namespaces().setRetention(myNamespace, new RetentionPolicies(0, 0));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//change namespace-level policy
admin.namespaces().setRetention(myNamespace, new RetentionPolicies(1, 1));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// remove namespace-level policy
admin.namespaces().removeRetention(myNamespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.namespaces().getRetention(myNamespace)));
//the default value of the broker-level is 0, so it is not retained by default
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
}

@Test
public void testCheckPersistence() throws Exception {
PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,20 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi
*/
CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPolicies retention);

/**
* Remove the retention configuration for all the topics on a namespace.
* @param namespace
* @throws PulsarAdminException
*/
void removeRetention(String namespace) throws PulsarAdminException;

/**
* Remove the retention configuration for all the topics on a namespace asynchronously.
* @param namespace
* @return
*/
CompletableFuture<Void> removeRetentionAsync(String namespace);

/**
* Get the retention configuration for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,23 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
*/
CompletableFuture<RetentionPolicies> getRetentionAsync(String topic);

/**
* Get the applied retention configuration for a topic.
* @param topic
* @param applied
* @return
* @throws PulsarAdminException
*/
RetentionPolicies getRetention(String topic, boolean applied) throws PulsarAdminException;

/**
* Get the applied retention configuration for a topic asynchronously.
* @param topic
* @param applied
* @return
*/
CompletableFuture<RetentionPolicies> getRetentionAsync(String topic, boolean applied);

/**
* Remove the retention configuration for all the topics on a topic.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,27 @@ public CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPoli
return asyncPostRequest(path, Entity.entity(retention, MediaType.APPLICATION_JSON));
}

@Override
public void removeRetention(String namespace) throws PulsarAdminException {
try {
removeRetentionAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeRetentionAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "retention");
return asyncDeleteRequest(path);
}

@Override
public RetentionPolicies getRetention(String namespace) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2075,8 +2075,18 @@ public CompletableFuture<Void> setRetentionAsync(String topic, RetentionPolicies

@Override
public RetentionPolicies getRetention(String topic) throws PulsarAdminException {
return getRetention(topic, false);
}

@Override
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic) {
return getRetentionAsync(topic, false);
}

@Override
public RetentionPolicies getRetention(String topic, boolean applied) throws PulsarAdminException {
try {
return getRetentionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getRetentionAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -2088,9 +2098,10 @@ public RetentionPolicies getRetention(String topic) throws PulsarAdminException
}

@Override
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic) {
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "retention");
path = path.queryParam("applied", applied);
final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<RetentionPolicies>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-retention myprop/clust/ns1"));
verify(mockNamespaces).getRetention("myprop/clust/ns1");

namespaces.run(split("remove-retention myprop/clust/ns1"));
verify(mockNamespaces).removeRetention("myprop/clust/ns1");

namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s"));
verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1", new DelayedDeliveryPolicies(1000, true));

Expand Down Expand Up @@ -847,6 +850,14 @@ public void topics() throws Exception {
cmdTopics.run(split("set-max-message-size persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getRetention("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-retention persistent://myprop/clust/ns1/ds1 -t 10m -s 20M"));
verify(mockTopics).setRetention("persistent://myprop/clust/ns1/ds1",
new RetentionPolicies(10, 20));
cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeRetention("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,18 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove the retention policy for a namespace")
private class RemoveRetention extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeRetention(namespace);
}
}

@Parameters(commandDescription = "Set the retention policy for a namespace")
private class SetRetention extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
Expand Down Expand Up @@ -1975,6 +1987,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
jcommander.addCommand("remove-retention", new RemoveRetention());

jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1226,10 +1226,13 @@ private class GetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getRetention(persistentTopic));
print(getAdmin().topics().getRetention(persistentTopic, applied));
}
}

Expand Down

0 comments on commit 08c4b04

Please sign in to comment.