Skip to content

Commit

Permalink
Optionally expose publisher stats in broker-stats admin rest api. (ap…
Browse files Browse the repository at this point in the history
…ache#1648)

* Optionally expose publisher stats in broker-stats admin rest api.

* Remove comments and add helper to non-persistent topics.

* Update admin client endpoint.

* Add newline to standalone conf.

* Add license.
  • Loading branch information
cckellogg authored and merlimat committed Apr 28, 2018
1 parent 070a9f6 commit 8eb2d21
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 54 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,8 @@ exposeTopicLevelMetricsInPrometheus=true

# Enable Functions Worker Service in Broker
functionsWorkerEnabled=false

### --- Broker Web Stats --- ###

# Enable topic level metrics
exposePublisherStats=true
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,8 @@ webSocketConnectionsPerBroker=8

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

### --- Broker Web Stats --- ###

# Enable topic level metrics
exposePublisherStats=true
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
/**** --- Functions --- ****/
private boolean functionsWorkerEnabled = false;

/**** --- Broker Web Stats --- ****/
// If true, export publisher stats when returning topics stats from the admin rest api
private boolean exposePublisherStats = true;

public String getZookeeperServers() {
return zookeeperServers;
}
Expand Down Expand Up @@ -1558,6 +1562,17 @@ public boolean isFunctionsWorkerEnabled() {
return functionsWorkerEnabled;
}


/**** --- Broker Web Stats ---- ****/

public void setExposePublisherStats(boolean expose) {
this.exposePublisherStats = expose;
}

public boolean exposePublisherStats() {
return exposePublisherStats;
}

public boolean isRunningStandalone() {
return isRunningStandalone;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.StreamingOutput;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;

Expand All @@ -38,6 +40,21 @@
@Produces(MediaType.APPLICATION_JSON)
public class BrokerStats extends BrokerStatsBase {

@GET
@Path("/topics")
@ApiOperation(
value = "Get all the topic stats by namesapce",
response = OutputStream.class,
responseContainer = "OutputStream")
// https://github.com/swagger-api/swagger-ui/issues/558
// map
// support
// missing
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public StreamingOutput getTopics2() throws Exception {
return super.getTopics2();
}

@GET
@Path("/broker-resource-availability/{tenant}/{namespace}")
@ApiOperation(value = "Broker availability report", notes = "This API gives the current broker availability in "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class PulsarStats implements Closeable {
private List<Metrics> metricsCollection;
private List<NonPersistentTopic> tempNonPersistentTopics;
private final BrokerOperabilityMetrics brokerOperabilityMetrics;
private final boolean exposePublisherStats;

private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock();

Expand All @@ -74,6 +75,8 @@ public PulsarStats(PulsarService pulsar) {
this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
pulsar.getAdvertisedAddress());
this.tempNonPersistentTopics = Lists.newArrayList();

this.exposePublisherStats = pulsar.getConfiguration().exposePublisherStats();
}

@Override
Expand Down Expand Up @@ -129,7 +132,7 @@ public synchronized void updateStats(
if (topic instanceof PersistentTopic) {
try {
topic.updateRates(nsStats, currentBundleStats, topicStatsStream,
clusterReplicationMetrics, namespaceName);
clusterReplicationMetrics, namespaceName, exposePublisherStats);
} catch (Exception e) {
log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e);
}
Expand All @@ -151,7 +154,7 @@ public synchronized void updateStats(
tempNonPersistentTopics.forEach(topic -> {
try {
topic.updateRates(nsStats, currentBundleStats, topicStatsStream,
clusterReplicationMetrics, namespaceName);
clusterReplicationMetrics, namespaceName, exposePublisherStats);
} catch (Exception e) {
log.error("Failed to generate topic stats for topic {}: {}", topic.getName(), e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.utils.StatsOutputStream;

public class StreamingStats {
private StreamingStats() {}

public static void writePublisherStats(StatsOutputStream statsStream, PublisherStats stats) {
statsStream.startObject();

statsStream.writePair("msgRateIn", stats.msgRateIn);
statsStream.writePair("msgThroughputIn", stats.msgThroughputIn);
statsStream.writePair("averageMsgSize", stats.averageMsgSize);

statsStream.writePair("address", stats.getAddress());
statsStream.writePair("producerId", stats.producerId);
statsStream.writePair("producerName", stats.getProducerName());
statsStream.writePair("connectedSince", stats.getConnectedSince());
if (stats.getClientVersion() != null) {
statsStream.writePair("clientVersion", stats.getClientVersion());
}

// add metadata
statsStream.startObject("metadata");
if (stats.metadata != null && !stats.metadata.isEmpty()) {
stats.metadata.forEach(statsStream::writePair);
}
statsStream.endObject();

statsStream.endObject();
}


public static void writeConsumerStats(StatsOutputStream statsStream, PulsarApi.CommandSubscribe.SubType subType,
ConsumerStats stats) {
// Populate consumer specific stats here
statsStream.startObject();

statsStream.writePair("address", stats.getAddress());
statsStream.writePair("consumerName", stats.consumerName);
statsStream.writePair("availablePermits", stats.availablePermits);
statsStream.writePair("connectedSince", stats.getConnectedSince());
statsStream.writePair("msgRateOut", stats.msgRateOut);
statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);

if (PulsarApi.CommandSubscribe.SubType.Shared.equals(subType)) {
statsStream.writePair("unackedMessages", stats.unackedMessages);
statsStream.writePair("blockedConsumerOnUnackedMsgs", stats.blockedConsumerOnUnackedMsgs);
}
if (stats.getClientVersion() != null) {
statsStream.writePair("clientVersion", stats.getClientVersion());
}

// add metadata
statsStream.startObject("metadata");
if (stats.metadata != null && !stats.metadata.isEmpty()) {
stats.metadata.forEach(statsStream::writePair);
}
statsStream.endObject();

statsStream.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

public interface Topic {

public interface PublishContext {
interface PublishContext {

default String getProducerName() {
return null;
Expand Down Expand Up @@ -117,7 +117,7 @@ CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, lo

void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics,
String namespaceName);
String namespaceName, boolean hydratePublishers);

Subscription getSubscription(String subscription);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
Expand Down Expand Up @@ -681,7 +682,7 @@ public String getName() {
}

public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace) {
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {

TopicStats topicStats = threadLocalTopicStats.get();
topicStats.reset();
Expand All @@ -692,22 +693,25 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
bundleStats.producerCount += producers.size();
topicStatsStream.startObject(topic);

topicStatsStream.startList("publishers");
producers.forEach(producer -> {
producer.updateRates();
PublisherStats PublisherStats = producer.getStats();
PublisherStats publisherStats = producer.getStats();

topicStats.aggMsgRateIn += PublisherStats.msgRateIn;
topicStats.aggMsgThroughputIn += PublisherStats.msgThroughputIn;
topicStats.aggMsgRateIn += publisherStats.msgRateIn;
topicStats.aggMsgThroughputIn += publisherStats.msgThroughputIn;

if (producer.isRemote()) {
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), PublisherStats);
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
});

// Creating publishers object for backward compatibility
topicStatsStream.startList("publishers");
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
});
topicStatsStream.endList();


// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
Expand Down Expand Up @@ -744,24 +748,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
subMsgRateRedeliver += consumerStats.msgRateRedeliver;

// Populate consumer specific stats here
topicStatsStream.startObject();
topicStatsStream.writePair("address", consumerStats.getAddress());
topicStatsStream.writePair("consumerName", consumerStats.consumerName);
topicStatsStream.writePair("availablePermits", consumerStats.availablePermits);
topicStatsStream.writePair("connectedSince", consumerStats.getConnectedSince());
topicStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
topicStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver);

if (SubType.Shared.equals(subscription.getType())) {
topicStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
topicStatsStream.writePair("blockedConsumerOnUnackedMsgs",
consumerStats.blockedConsumerOnUnackedMsgs);
}
if (consumerStats.getClientVersion() != null) {
topicStatsStream.writePair("clientVersion", consumerStats.getClientVersion());
}
topicStatsStream.endObject();
StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
}

// Close Consumer stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
Expand Down Expand Up @@ -1095,7 +1096,7 @@ public ManagedLedger getManagedLedger() {
}

public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace) {
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {

TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
topicStatsHelper.reset();
Expand All @@ -1106,6 +1107,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
bundleStats.producerCount += producers.size();
topicStatsStream.startObject(topic);

// start publisher stats
topicStatsStream.startList("publishers");
producers.forEach(producer -> {
producer.updateRates();
PublisherStats publisherStats = producer.getStats();
Expand All @@ -1116,10 +1119,12 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
if (producer.isRemote()) {
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
});

// Creating publishers object for backward compatibility
topicStatsStream.startList("publishers");
// Populate consumer specific stats here
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
});
topicStatsStream.endList();

// Start replicator stats
Expand Down Expand Up @@ -1215,25 +1220,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;

// Populate consumer specific stats here
topicStatsStream.startObject();
topicStatsStream.writePair("address", consumerStats.getAddress());
topicStatsStream.writePair("consumerName", consumerStats.consumerName);
topicStatsStream.writePair("availablePermits", consumerStats.availablePermits);
topicStatsStream.writePair("connectedSince", consumerStats.getConnectedSince());
topicStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
topicStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver);

if (SubType.Shared.equals(subscription.getType())) {
topicStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
topicStatsStream.writePair("blockedConsumerOnUnackedMsgs",
consumerStats.blockedConsumerOnUnackedMsgs);
}
if (consumerStats.getClientVersion() != null) {
topicStatsStream.writePair("clientVersion", consumerStats.getClientVersion());
}
topicStatsStream.endObject();
StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
}

// Close Consumer stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public JsonArray getMBeans() throws PulsarAdminException {
@Override
public JsonObject getTopics() throws PulsarAdminException {
try {
String json = request(adminV2BrokerStats.path("/destinations")).get(String.class);
String json = request(adminV2BrokerStats.path("/topics")).get(String.class);
return new Gson().fromJson(json, JsonObject.class);
} catch (Exception e) {
throw getApiException(e);
Expand Down

0 comments on commit 8eb2d21

Please sign in to comment.