Skip to content

Commit

Permalink
[feature][admin] Support to get topic properties. (apache#15944)
Browse files Browse the repository at this point in the history
### Motivation

As apache#12818 has supported creating topics with metadata, this patch is adding a `get` API to support getting topic properties.
  • Loading branch information
Technoboy- authored Jun 8, 2022
1 parent 2c7c9e5 commit 1ebe4ee
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,34 @@ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMeta
});
}

protected CompletableFuture<Map<String, String>> internalGetPropertiesAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return getPropertiesAsync();
}
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return getPropertiesAsync();
}
return CompletableFuture.completedFuture(metadata.properties);
});
});
}

private CompletableFuture<Map<String, String>> getPropertiesAsync() {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenApply(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
return ((PersistentTopic) opt.get()).getManagedLedger().getProperties();
});
}

protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(exist -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,40 @@ public void getPartitionedMetadata(
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Get topic properties.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void getProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalGetPropertiesAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,27 @@ public void testPersistentTopicList() throws Exception {
assertEquals(topicsInNs.size(), 0);
}

@Test
public void testCreateAndGetTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String nonPartitionedTopicName = "persistent://" + namespace + "/non-partitioned-TopicProperties";
admin.namespaces().createNamespace(namespace, 20);
Map<String, String> nonPartitionedTopicProperties = new HashMap<>();
nonPartitionedTopicProperties.put("key1", "value1");
admin.topics().createNonPartitionedTopic(nonPartitionedTopicName, nonPartitionedTopicProperties);
Map<String, String> properties11 = admin.topics().getProperties(nonPartitionedTopicName);
Assert.assertNotNull(properties11);
Assert.assertEquals(properties11.get("key1"), "value1");

final String partitionedTopicName = "persistent://" + namespace + "/partitioned-TopicProperties";
Map<String, String> partitionedTopicProperties = new HashMap<>();
partitionedTopicProperties.put("key2", "value2");
admin.topics().createPartitionedTopic(partitionedTopicName, 2, partitionedTopicProperties);
Map<String, String> properties22 = admin.topics().getProperties(partitionedTopicName);
Assert.assertNotNull(properties22);
Assert.assertEquals(properties22.get("key2"), "value2");
}

@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = "prop-xyz/ns2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ public void testNonPartitionedTopics() {

@Test
public void testCreateNonPartitionedTopic() {
final String topic = "standard-topic-partition-a";
final String topic = "testCreateNonPartitionedTopic-a";
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic);
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic, true, null);
Expand All @@ -476,7 +476,7 @@ public void testCreateNonPartitionedTopic() {
response = mock(AsyncResponse.class);
metaResponse = mock(AsyncResponse.class);
metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
final String topic2 = "standard-topic-partition-b";
final String topic2 = "testCreateNonPartitionedTopic-b";
TopicName topicName2 = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic2);
Map<String, String> topicMetadata = Maps.newHashMap();
topicMetadata.put("key1", "value1");
Expand All @@ -488,6 +488,13 @@ public void testCreateNonPartitionedTopic() {
testTenant, testNamespace, topic2, true, false);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
Assert.assertNull(metaResponseCaptor.getValue().properties);
metaResponse = mock(AsyncResponse.class);
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
persistentTopics.getProperties(metaResponse,
testTenant, testNamespace, topic2, true);
verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
Assert.assertNotNull(metaResponseCaptor2.getValue());
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
}

@Test
Expand Down Expand Up @@ -516,6 +523,12 @@ public void testCreatePartitionedTopic() {
Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
});
AsyncResponse response3 = mock(AsyncResponse.class);
ArgumentCaptor<Map> metaResponseCaptor2 = ArgumentCaptor.forClass(Map.class);
persistentTopics.getProperties(response3, testTenant, testNamespace, topicName2, true);
verify(response3, timeout(5000).times(1)).resume(metaResponseCaptor2.capture());
Assert.assertNotNull(metaResponseCaptor2.getValue());
Assert.assertEquals(metaResponseCaptor2.getValue().get("key1"), "value1");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,22 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
*/
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic);

/**
* Get properties of a topic.
* @param topic
* Topic name
* @return Topic properties
*/
Map<String, String> getProperties(String topic) throws PulsarAdminException;

/**
* Get properties of a topic asynchronously.
* @param topic
* Topic name
* @return a future that can be used to track when the topic properties is returned
*/
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);

/**
* Delete a partitioned topic and its schemas.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,32 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public Map<String, String> getProperties(String topic) throws PulsarAdminException {
return sync(() -> getPropertiesAsync(topic));
}

@Override
public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "properties");
final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Map<String, String>>() {

@Override
public void completed(Map<String, String> response) {
future.complete(response);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
deletePartitionedTopic(topic, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("create", new CreateNonPartitionedCmd());
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
jcommander.addCommand("get-properties", new GetPropertiesCmd());

jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
jcommander.addCommand("peek-messages", new PeekMessages());
Expand Down Expand Up @@ -605,6 +606,19 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Get the topic properties.")
private class GetPropertiesCmd extends CliCommand {

@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
print(getTopics().getProperties(topic));
}
}

@Parameters(commandDescription = "Delete a partitioned topic. "
+ "It will also delete all the partitions of the topic if it exists."
+ "And the application is not able to connect to the topic(delete then re-create with same name) again "
Expand Down

0 comments on commit 1ebe4ee

Please sign in to comment.