Skip to content

Commit

Permalink
[feature][CLI] Add query options to the get topics in Namespace. (apa…
Browse files Browse the repository at this point in the history
…che#16167)

### Motivation

Since apache#2025 has added query param - `Mode`,  but the CLI is still not supported.

apache#15410 has introduced to filter system topic. 

So this patch is supported the above query params for Namespace#getTopics.
  • Loading branch information
Technoboy- authored Jun 26, 2022
1 parent 7afc411 commit 6c715e0
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -846,4 +846,10 @@ protected static String getPartitionedTopicNotFoundErrorMessage(String topic) {
protected static String getSubNotFoundErrorMessage(String topic, String subscription) {
return String.format("Subscription %s not found for topic %s", subscription, topic);
}

protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
return topics.stream()
.filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5278,12 +5278,6 @@ protected CompletableFuture<Void> internalSetSchemaCompatibilityStrategy(SchemaC
}));
}

protected List<String> filterSystemTopic(List<String> topics, boolean includeSystemTopic) {
return topics.stream()
.filter(topic -> includeSystemTopic ? true : !pulsar().getBrokerService().isSystemTopic(topic))
.collect(Collectors.toList());
}

protected CompletableFuture<Boolean> internalGetSchemaValidationEnforced(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,15 @@ public void getTopics(@Suspended AsyncResponse response,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
.thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
.thenAccept(response::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,15 @@ public void getTenantNamespaces(@Suspended final AsyncResponse response,
public void getTopics(@Suspended AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(tenant, namespace);
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(__ -> pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
.thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
.thenAccept(response::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Mode;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
Expand Down Expand Up @@ -2509,4 +2511,26 @@ public void testSchemaValidationEnforced(String topicType) throws Exception {
assertEquals(admin.topics().getSchemaValidationEnforced(topic, false), true)
);
}

@Test
public void testGetNamespaceTopicList() throws Exception {
final String persistentTopic = "persistent://prop-xyz/ns1/testGetNamespaceTopicList";
final String nonPersistentTopic = "non-persistent://prop-xyz/ns1/non-testGetNamespaceTopicList";
final String eventTopic = "persistent://prop-xyz/ns1/__change_events";
admin.topics().createNonPartitionedTopic(persistentTopic);
Awaitility.await().untilAsserted(() ->
admin.namespaces().getTopics("prop-xyz/ns1",
ListNamespaceTopicsOptions.builder().mode(Mode.PERSISTENT).includeSystemTopic(true).build())
.contains(eventTopic));
List<String> notIncludeSystemTopics = admin.namespaces().getTopics("prop-xyz/ns1",
ListNamespaceTopicsOptions.builder().includeSystemTopic(false).build());
Assert.assertFalse(notIncludeSystemTopics.contains(eventTopic));
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(nonPersistentTopic)
.create();
List<String> notPersistentTopics = admin.namespaces().getTopics("prop-xyz/ns1",
ListNamespaceTopicsOptions.builder().mode(Mode.NON_PERSISTENT).build());
Assert.assertTrue(notPersistentTopics.contains(nonPersistentTopic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import lombok.Cleanup;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -399,7 +402,8 @@ public void testAutoCreationOfSystemTopicTransactionBufferSnapshot() throws Exce

pulsarClient.newProducer().topic(topicString).create();

assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertTrue(admin.namespaces().getTopics("prop/ns-abc",
ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}

Expand All @@ -409,9 +413,11 @@ public void testAutoCreationOfSystemTopicNamespaceEvents() throws Exception {

final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;

pulsarClient.newProducer().topic(topicString).create();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicString).create();

assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertTrue(admin.namespaces().getTopics("prop/ns-abc",
ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class ListNamespaceTopicsOptions {

public static final ListNamespaceTopicsOptions EMPTY = ListNamespaceTopicsOptions.builder().build();

/**
* Set to true to get topics including system topic, otherwise not.
*/
private final boolean includeSystemTopic;

/**
* Allowed topic domain mode (persistent, non_persistent, all).
*/
private final Mode mode;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin;

public enum Mode {

PERSISTENT(0), NON_PERSISTENT(1), ALL(2),;
private final int value;
private Mode(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public static Mode valueOf(int n) {
switch (n) {
case 0 :
return PERSISTENT;
case 1 :
return NON_PERSISTENT;
case 2 :
return ALL;
default :
return null;

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,51 @@ public interface Namespaces {
*/
CompletableFuture<List<String>> getTopicsAsync(String namespace);

/**
* Get the list of topics.
* <p/>
* Get the list of all the topics under a certain namespace.
* <p/>
* Response Example:
*
* <pre>
* <code>["persistent://my-tenant/use/namespace1/my-topic-1",
* "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
* </pre>
*
* @param namespace
* Namespace name
* @param options
* List namespace topics options
*
* @throws NotAuthorizedException
* You don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
List<String> getTopics(String namespace, ListNamespaceTopicsOptions options) throws PulsarAdminException;

/**
* Get the list of topics asynchronously.
* <p/>
* Get the list of all the topics under a certain namespace.
* <p/>
* Response Example:
*
* <pre>
* <code>["persistent://my-tenant/use/namespace1/my-topic-1",
* "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
* </pre>
*
* @param namespace
* Namespace name
* @param options
* List namespace topics options
*/
CompletableFuture<List<String>> getTopicsAsync(String namespace, ListNamespaceTopicsOptions options);

/**
* Get the list of bundles.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -169,6 +170,36 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public List<String> getTopics(String namespace, ListNamespaceTopicsOptions options)
throws PulsarAdminException{
return sync(() -> getTopicsAsync(namespace, options));
}

@Override
public CompletableFuture<List<String>> getTopicsAsync(String namespace, ListNamespaceTopicsOptions options) {
NamespaceName ns = NamespaceName.get(namespace);
String action = ns.isV2() ? "topics" : "destinations";
WebTarget path = namespacePath(ns, action);
path = path
.queryParam("mode", options.getMode())
.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
final CompletableFuture<List<String>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> topics) {
future.complete(topics);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public Policies getPolicies(String namespace) throws PulsarAdminException {
return sync(() -> getPoliciesAsync(namespace));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.Brokers;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.Lookup;
Expand Down Expand Up @@ -344,7 +345,7 @@ public void namespaces() throws Exception {
verify(mockNamespaces).getNamespaces("myprop", "clust");

namespaces.run(split("topics myprop/clust/ns1"));
verify(mockNamespaces).getTopics("myprop/clust/ns1");
verify(mockNamespaces).getTopics("myprop/clust/ns1", ListNamespaceTopicsOptions.builder().build());

namespaces.run(split("policies myprop/clust/ns1"));
verify(mockNamespaces).getPolicies("myprop/clust/ns1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.admin.cli.utils.IOUtils;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Mode;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -96,10 +98,22 @@ private class GetTopics extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = {"-m", "--mode"},
description = "Allowed topic domain mode (persistent, non_persistent, all).")
private Mode mode;

@Parameter(names = { "-ist",
"--include-system-topic" }, description = "Include system topic")
private boolean includeSystemTopic;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getAdmin().namespaces().getTopics(namespace));
ListNamespaceTopicsOptions options = ListNamespaceTopicsOptions.builder()
.mode(mode)
.includeSystemTopic(includeSystemTopic)
.build();
print(getAdmin().namespaces().getTopics(namespace, options));
}
}

Expand Down

0 comments on commit 6c715e0

Please sign in to comment.