Skip to content

Commit

Permalink
PIP-105: new API to get subscription properties (apache#16095)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Jun 21, 2022
1 parent 53cc84a commit face8bb
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,35 @@ private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncRes
});
}

private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenApply((Topic topic) -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.getSubscriptionProperties();
}).thenAccept((Map<String, String> properties) -> {
if (properties == null) {
properties = Collections.emptyMap();
}
asyncResponse.resume(Response.ok(properties).build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
return null;
});
}

protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
CompletableFuture<Void> future;
Expand Down Expand Up @@ -2422,6 +2451,91 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
});
}

protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAcceptAsync(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, String>>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.getSubscriptionPropertiesAsync(topicNamePartition.toString(),
subName));
} catch (Exception e) {
log.error("[{}] Failed to update properties for subscription {} {}",
clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else {
log.error("[{}] Failed to get properties for subscription {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

Map<String, String> aggregatedResult = new HashMap<>();
futures.forEach(f -> {
// in theory all the partitions have the same properties
try {
aggregatedResult.putAll(f.get());
} catch (Exception impossible) {
// we already waited for this Future
asyncResponse.resume(new RestException(impossible));
}
});

asyncResponse.resume(Response.ok(aggregatedResult).build());
return null;
});
} else {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
}, pulsar().getExecutor()).exceptionally(ex -> {
log.error("[{}] Failed to update properties for subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,42 @@ public void updateSubscriptionProperties(
}
}

@GET
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
@ApiOperation(value = "Replaces all the properties on the given subscription")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Method Not Allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void getSubscriptionProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetSubscriptionProperties(asyncResponse, decode(encodedSubName),
authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
@ApiOperation(value = "Reset subscription to message position closest to given position.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,41 @@ public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exc
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
assertEquals(value, props.get("foo"));
}

// properties are never null, but an empty map
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());

Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName2);
assertTrue(props.isEmpty());
}

// aggregated properties
SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
assertEquals(value, props.get("foo"));

} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
assertEquals(value, props.get("foo"));

SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());

Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2);
assertTrue(props2.isEmpty());
}

// clear the properties on subscriptionName
Expand All @@ -183,16 +198,25 @@ public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exc
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());

Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
assertTrue(props.isEmpty());
}

// aggregated properties
SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
.getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());

Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
assertTrue(props.isEmpty());

} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());

Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
assertTrue(props.isEmpty());
}

// update the properties on subscriptionName
Expand All @@ -204,19 +228,31 @@ public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exc
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName);
assertEquals(value, props.get("foo"));
}

// aggregated properties
SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
assertEquals(value, props.get("foo"));

} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName);
assertEquals(value, props.get("foo"));

SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());

Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2);
assertTrue(props2.isEmpty());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,15 @@ CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptio
void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties)
throws PulsarAdminException;

/**
* Get Subscription Properties on a topic subscription.
* @param topic
* @param subName
* @throws PulsarAdminException
*/
Map<String, String> getSubscriptionProperties(String topic, String subName)
throws PulsarAdminException;

/**
* Reset cursor position on a topic subscription.
*
Expand Down Expand Up @@ -1873,6 +1882,13 @@ void updateSubscriptionProperties(String topic, String subName, Map<String, Stri
CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
Map<String, String> subscriptionProperties);

/**
* Get Subscription Properties on a topic subscription.
* @param topic
* @param subName
*/
CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName);

/**
* Reset cursor position on a topic subscription.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,12 @@ public void updateSubscriptionProperties(String topic, String subName, Map<Strin
sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties));
}

@Override
public Map<String, String> getSubscriptionProperties(String topic, String subName)
throws PulsarAdminException {
return sync(() -> getSubscriptionPropertiesAsync(topic, subName));
}

@Override
public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName,
Map<String, String> subscriptionProperties) {
Expand All @@ -1226,6 +1232,29 @@ public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, S
return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName,
"properties");
final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Map<String, String>>() {

@Override
public void completed(Map<String, String> response) {
future.complete(response);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public void resetCursor(String topic, String subName, MessageId messageId
, boolean isExcluded) throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,10 @@ public void topics() throws Exception {
cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());

cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("get-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1");

cmdTopics = new CmdTopics(() -> admin);
props = new HashMap<>();
props.put("a", "b");
Expand Down
Loading

0 comments on commit face8bb

Please sign in to comment.