Skip to content

Commit

Permalink
Support get topic applied policy for InactiveTopic (apache#9230)
Browse files Browse the repository at this point in the history
Master Issue: apache#9216

### Modifications
1 Now if there is no data at the namespace-level, the broker-level data will be returned by default, so let it return null
2 add query param `applied` for getInactiveTopic
3 add `applied` API for client

### Verifying this change
unit test:
InactiveTopicDeleteTest#testInactiveTopicApplied
  • Loading branch information
315157973 authored Jan 20, 2021
1 parent 29f0e1b commit 215bd35
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2200,13 +2200,7 @@ protected InactiveTopicPolicies internalGetInactiveTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
if (policies.inactive_topic_policies == null) {
return new InactiveTopicPolicies(config().getBrokerDeleteInactiveTopicsMode()
, config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()
, config().isBrokerDeleteInactiveTopicsEnabled());
} else {
return policies.inactive_topic_policies;
}
return policies.inactive_topic_policies;
}

protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,22 @@ protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies off
return completableFuture;
}

protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) {
InactiveTopicPolicies inactiveTopicPolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getInactiveTopicPolicies)
.orElseGet(() -> {
if (applied) {
InactiveTopicPolicies policies = getNamespacePolicies(namespaceName).inactive_topic_policies;
return policies == null ? new InactiveTopicPolicies(
config().getBrokerDeleteInactiveTopicsMode(),
config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
config().isBrokerDeleteInactiveTopicsEnabled()) : policies;
}
return null;
});
return CompletableFuture.completedFuture(inactiveTopicPolicies);
}

protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
TopicPolicies topicPolicies = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,14 +483,20 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as
public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isInactiveTopicPoliciesSet()) {
asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
} else {
asyncResponse.resume(Response.noContent().build());
}
internalGetInactiveTopicPolicies(applied).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed get InactiveTopicPolicies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed get InactiveTopicPolicies", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(res);
}
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

public class InactiveTopicDeleteTest extends BrokerTestBase {
Expand Down Expand Up @@ -543,4 +545,57 @@ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Excepti
-> Assert.assertFalse(admin.topics().getList(namespace).contains(topic3)));
Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
}

@Test(timeOut = 30000)
public void testInactiveTopicApplied() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
super.baseSetup();

final String namespace = "prop/ns-abc";
final String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
//namespace-level default value is null
assertNull(admin.namespaces().getInactiveTopicPolicies(namespace));
//topic-level default value is null
assertNull(admin.topics().getInactiveTopicPolicies(topic));
//use broker-level by default
InactiveTopicPolicies brokerLevelPolicy =
new InactiveTopicPolicies(conf.getBrokerDeleteInactiveTopicsMode(),
conf.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
conf.isBrokerDeleteInactiveTopicsEnabled());
Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic, true), brokerLevelPolicy);
//set namespace-level policy
InactiveTopicPolicies namespaceLevelPolicy =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,
20, false);
admin.namespaces().setInactiveTopicPolicies(namespace, namespaceLevelPolicy);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.namespaces().getInactiveTopicPolicies(namespace)));
InactiveTopicPolicies policyFromBroker = admin.topics().getInactiveTopicPolicies(topic, true);
assertEquals(policyFromBroker.getMaxInactiveDurationSeconds(), 20);
assertFalse(policyFromBroker.isDeleteWhileInactive());
assertEquals(policyFromBroker.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
// set topic-level policy
InactiveTopicPolicies topicLevelPolicy =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,
30, false);
admin.topics().setInactiveTopicPolicies(topic, topicLevelPolicy);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getInactiveTopicPolicies(topic)));
policyFromBroker = admin.topics().getInactiveTopicPolicies(topic, true);
assertEquals(policyFromBroker.getMaxInactiveDurationSeconds(), 30);
assertFalse(policyFromBroker.isDeleteWhileInactive());
assertEquals(policyFromBroker.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
//remove topic-level policy
admin.topics().removeInactiveTopicPolicies(topic);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(admin.topics().getInactiveTopicPolicies(topic, true), namespaceLevelPolicy));
//remove namespace-level policy
admin.namespaces().removeInactiveTopicPolicies(namespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(admin.topics().getInactiveTopicPolicies(topic, true), brokerLevelPolicy));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,21 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
*/
CompletableFuture<Void> removeMaxUnackedMessagesOnConsumerAsync(String topic);

/**
* Get inactive topic policies applied for a topic.
* @param topic
* @return
* @throws PulsarAdminException
*/
InactiveTopicPolicies getInactiveTopicPolicies(String topic, boolean applied) throws PulsarAdminException;

/**
* Get inactive topic policies applied for a topic asynchronously.
* @param topic
* @param applied
* @return
*/
CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String topic, boolean applied);
/**
* get inactive topic policies of a topic.
* @param topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1597,9 +1597,9 @@ public void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminE
}

@Override
public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws PulsarAdminException {
public InactiveTopicPolicies getInactiveTopicPolicies(String topic, boolean applied) throws PulsarAdminException {
try {
return getInactiveTopicPoliciesAsync(topic).
return getInactiveTopicPoliciesAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
Expand All @@ -1612,9 +1612,10 @@ public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws Pulsa
}

@Override
public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String topic) {
public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
path = path.queryParam("applied", applied);
final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
@Override
Expand All @@ -1630,6 +1631,16 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws PulsarAdminException {
return getInactiveTopicPolicies(topic, false);
}

@Override
public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String topic) {
return getInactiveTopicPoliciesAsync(topic, false);
}

@Override
public CompletableFuture<Void> setInactiveTopicPoliciesAsync(String topic
, InactiveTopicPolicies inactiveTopicPolicies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void topics() throws Exception {
verify(mockTopics).setDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -e -t 1s -m delete_when_no_subscriptions"));
Expand Down Expand Up @@ -878,6 +878,9 @@ public boolean matches(Long timestamp) {
cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMessageTTL("persistent://myprop/clust/ns1/ds1", true);


cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,10 +1904,13 @@ private class GetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(admin.topics().getInactiveTopicPolicies(persistentTopic));
print(admin.topics().getInactiveTopicPolicies(persistentTopic, applied));
}
}

Expand Down

0 comments on commit 215bd35

Please sign in to comment.