Skip to content

Commit

Permalink
Support get applied PersistencePolicies (apache#9831)
Browse files Browse the repository at this point in the history
Master Issue: apache#9216

### Modifications
Add applied API for topic-level
Add remove API for namespace-level

### Verifying this change
Verify the applied API and CMD
  • Loading branch information
315157973 authored Mar 10, 2021
1 parent 2dfdfd5 commit 59e0187
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1394,11 +1394,21 @@ protected void internalSetRetention(RetentionPolicies retention) {
}
}

protected void internalDeletePersistence() {
validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
doUpdatePersistence(null);
}

protected void internalSetPersistence(PersistencePolicies persistence) {
validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
validatePersistencePolicies(persistence);

doUpdatePersistence(persistence);
}

private void doUpdatePersistence(PersistencePolicies persistence) {
try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies)->{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2733,8 +2733,24 @@ protected CompletableFuture<Void> internalRemoveRetention() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected Optional<PersistencePolicies> internalGetPersistence(){
return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
protected CompletableFuture<PersistencePolicies> internalGetPersistence(boolean applied) {
PersistencePolicies persistencePolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getPersistence)
.orElseGet(() -> {
if (applied) {
PersistencePolicies namespacePolicy = getNamespacePolicies(namespaceName)
.persistence;
return namespacePolicy == null
? new PersistencePolicies(
pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(),
pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(),
pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(),
pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit())
: namespacePolicy;
}
return null;
});
return CompletableFuture.completedFuture(persistencePolicies);
}

protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,16 @@ public void setPersistence(@PathParam("tenant") String tenant, @PathParam("names
internalSetPersistence(persistence);
}

@DELETE
@Path("/{tenant}/{namespace}/persistence")
@ApiOperation(value = "Delete the persistence configuration for all topics on a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void deletePersistence(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalDeletePersistence();
}

@POST
@Path("/{tenant}/{namespace}/persistence/bookieAffinity")
@ApiOperation(value = "Set the bookie-affinity-group to namespace-persistent policy.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1743,21 +1743,21 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse,
public void getPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<PersistencePolicies> persistencePolicies = internalGetPersistence();
if (!persistencePolicies.isPresent()) {
asyncResponse.resume(Response.noContent().build());
internalGetPersistence(applied).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed get persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed get persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(persistencePolicies.get());
asyncResponse.resume(res);
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,39 @@ public void testRetentionPriority() throws Exception {
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
}

@Test(timeOut = 20000)
public void testGetPersistenceApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getPersistence(topic));
assertNull(admin.namespaces().getPersistence(myNamespace));
PersistencePolicies brokerPolicy
= new PersistencePolicies(pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(),
pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(),
pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(),
pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
PersistencePolicies namespacePolicy
= new PersistencePolicies(5,4,3,2);

admin.namespaces().setPersistence(myNamespace, namespacePolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getPersistence(myNamespace)));
assertEquals(admin.topics().getPersistence(topic, true), namespacePolicy);

PersistencePolicies topicPolicy = new PersistencePolicies(4, 3, 2, 1);
admin.topics().setPersistence(topic, topicPolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getPersistence(topic)));
assertEquals(admin.topics().getPersistence(topic, true), topicPolicy);

admin.namespaces().removePersistence(myNamespace);
admin.topics().removePersistence(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getPersistence(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getPersistence(topic)));
assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
}

@Test
public void testCheckPersistence() throws Exception {
PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,19 @@ CompletableFuture<Void> setSubscriptionTypesEnabledAsync(String namespace,
*/
CompletableFuture<Void> removeBacklogQuotaAsync(String namespace);

/**
* Remove the persistence configuration on a namespace.
* @param namespace
* @throws PulsarAdminException
*/
void removePersistence(String namespace) throws PulsarAdminException;

/**
* Remove the persistence configuration on a namespace asynchronously.
* @param namespace
*/
CompletableFuture<Void> removePersistenceAsync(String namespace);

/**
* Set the persistence configuration for all the topics on a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2139,6 +2139,22 @@ void setInactiveTopicPolicies(String topic
*/
CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic);

/**
* Get the applied configuration of persistence policies for specified topic.
*
* @param topic Topic name
* @return Configuration of bookkeeper persistence policies
* @throws PulsarAdminException Unexpected error
*/
PersistencePolicies getPersistence(String topic, boolean applied) throws PulsarAdminException;

/**
* Get the applied configuration of persistence policies for specified topic asynchronously.
*
* @param topic Topic name
*/
CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic, boolean applied);

/**
* Remove the configuration of persistence policies for specified topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,27 @@ public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
return asyncDeleteRequest(path);
}

@Override
public void removePersistence(String namespace) throws PulsarAdminException {
try {
removePersistenceAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removePersistenceAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "persistence");
return asyncDeleteRequest(path);
}

@Override
public void setPersistence(String namespace, PersistencePolicies persistence) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2326,8 +2326,18 @@ public CompletableFuture<Void> setPersistenceAsync(String topic, PersistencePoli

@Override
public PersistencePolicies getPersistence(String topic) throws PulsarAdminException {
return getPersistence(topic, false);
}

@Override
public CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic) {
return getPersistenceAsync(topic, false);
}

@Override
public PersistencePolicies getPersistence(String topic, boolean applied) throws PulsarAdminException {
try {
return getPersistenceAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getPersistenceAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -2339,9 +2349,10 @@ public PersistencePolicies getPersistence(String topic) throws PulsarAdminExcept
}

@Override
public CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic) {
public CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "persistence");
path = path.queryParam("applied", applied);
final CompletableFuture<PersistencePolicies> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PersistencePolicies>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-persistence myprop/clust/ns1"));
verify(mockNamespaces).getPersistence("myprop/clust/ns1");

namespaces.run(split("remove-persistence myprop/clust/ns1"));
verify(mockNamespaces).removePersistence("myprop/clust/ns1");

namespaces.run(split("get-max-subscriptions-per-topic myprop/clust/ns1"));
verify(mockNamespaces).getMaxSubscriptionsPerTopic("myprop/clust/ns1");
namespaces.run(split("set-max-subscriptions-per-topic myprop/clust/ns1 -m 300"));
Expand Down Expand Up @@ -935,6 +938,13 @@ public void topics() throws Exception {
cmdTopics.run(split("remove-max-subscriptions persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPersistence("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0"));
verify(mockTopics).setPersistence("persistent://myprop/clust/ns1/ds1", new PersistencePolicies(2, 1, 1, 100.0d));
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removePersistence("persistent://myprop/clust/ns1/ds1");

// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,18 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove the persistence policies for a namespace")
private class RemovePersistence extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removePersistence(namespace);
}
}

@Parameters(commandDescription = "Set the persistence policies for a namespace")
private class SetPersistence extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
Expand Down Expand Up @@ -2057,6 +2069,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("get-persistence", new GetPersistence());
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());

jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
Expand Down

0 comments on commit 59e0187

Please sign in to comment.