Skip to content

Commit

Permalink
Add partitioned-topic stats internal admin-api (apache#2958)
Browse files Browse the repository at this point in the history
* Add partitioned-topic stats internal admin-api

* add class
  • Loading branch information
rdhabalia authored and merlimat committed Dec 13, 2018
1 parent 61cddd1 commit 2fab7fe
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -622,6 +623,33 @@ protected PartitionedTopicStats internalGetPartitionedStats(boolean authoritativ
return stats;
}

protected PartitionedTopicInternalStats internalGetPartitionedStatsInternal(boolean authoritative) {
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
try {
for (int i = 0; i < partitionMetadata.partitions; i++) {
PersistentTopicInternalStats partitionStats = pulsar().getAdminClient().topics()
.getInternalStats(topicName.getPartition(i).toString());
stats.partitions.put(topicName.getPartition(i).toString(), partitionStats);
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet");
} else {
throw new RestException(e);
}
} catch (Exception e) {
throw new RestException(e);
}
return stats;
}

protected void internalDeleteSubscription(String subName, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -278,6 +279,20 @@ public PartitionedTopicStats getPartitionedStats(@PathParam("property") String p
return internalGetPartitionedStats(authoritative);
}


@GET
@Path("{property}/{cluster}/{namespace}/{topic}/partitioned-internalStats")
@ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}

@DELETE
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -285,6 +286,18 @@ public PartitionedTopicStats getPartitionedStats(@PathParam("tenant") String ten
return internalGetPartitionedStats(authoritative);
}

@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-internalStats")
@ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetPartitionedStatsInternal(authoritative);
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -675,6 +676,26 @@ List<String> getListInBundle(String namespace, String bundleRange)
*/
CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, boolean perPartition);

/**
* Get the stats for the partitioned topic
*
* @param topic
* @param perPartition
* @return
* @throws PulsarAdminException
*/
PartitionedTopicInternalStats getPartitionedInternalStats(String topic)
throws PulsarAdminException;

/**
* Get the stats-internal for the partitioned topic asynchronously
*
* @param topic
* topic Name
* @return a future that can be used to track when the partitioned topic statistics are returned
*/
CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalStatsAsync(String topic);

/**
* Delete a subscription.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -510,6 +511,40 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public PartitionedTopicInternalStats getPartitionedInternalStats(String topic)
throws PulsarAdminException {
try {
return getPartitionedInternalStatsAsync(topic).get();
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e.getCause());
}
}

@Override
public CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalStatsAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-internalStats");
final CompletableFuture<PartitionedTopicInternalStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PartitionedTopicInternalStats>() {

@Override
public void completed(PartitionedTopicInternalStats response) {
future.complete(response);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public CmdPersistentTopics(PulsarAdmin admin) {
jcommander.addCommand("stats-internal", new GetInternalStats());
jcommander.addCommand("info-internal", new GetInternalInfo());
jcommander.addCommand("partitioned-stats", new GetPartitionedStats());
jcommander.addCommand("partitioned-stats-internal", new GetPartitionedStatsInternal());
jcommander.addCommand("skip", new Skip());
jcommander.addCommand("skip-all", new SkipAll());
jcommander.addCommand("expire-messages", new ExpireMessages());
Expand Down Expand Up @@ -361,6 +362,19 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Get the stats-internal for the partitioned topic and its connected producers and consumers. \n"
+ "\t All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.")
private class GetPartitionedStatsInternal extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic\n", required = true)
private java.util.List<String> params;

@Override
void run() throws Exception {
String persistentTopic = validatePersistentTopic(params);
print(persistentTopics.getPartitionedInternalStats(persistentTopic));
}
}

@Parameters(commandDescription = "Skip all the messages for the subscription")
private class SkipAll extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;

import java.util.Map;

import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

import com.google.common.collect.Maps;

public class PartitionedTopicInternalStats {

public PartitionedTopicMetadata metadata;

public Map<String, PersistentTopicInternalStats> partitions;

public PartitionedTopicInternalStats() {
super();
metadata = new PartitionedTopicMetadata();
partitions = Maps.newHashMap();
}

public PartitionedTopicInternalStats(PartitionedTopicMetadata metadata) {
this();
this.metadata = metadata;
}

public void reset() {
partitions.clear();
metadata.partitions = 0;
}

}

0 comments on commit 2fab7fe

Please sign in to comment.