Skip to content

Commit

Permalink
Provide raw counters metrics(apache#5802)
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreZ authored and codelipenghui committed Dec 17, 2019
1 parent 2d81c57 commit 9b55324
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.admin.AdminResource;
Expand Down Expand Up @@ -81,6 +82,9 @@ public abstract class AbstractTopic implements Topic {

protected volatile PublishRateLimiter topicPublishRateLimiter;

private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand Down Expand Up @@ -262,6 +266,9 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
this.topicPublishRateLimiter.incrementPublishCount(numOfMessages, msgSizeInBytes);
// increase broker publish rate limiter
getBrokerPublishRateLimiter().incrementPublishCount(numOfMessages, msgSizeInBytes);
// increase counters
bytesInCounter.add(msgSizeInBytes);
msgInCounter.add(numOfMessages);
}

@Override
Expand Down Expand Up @@ -384,5 +391,11 @@ private void updatePublishDispatcher(Policies policies) {
}
}

public long getMsgInCounter() { return this.msgInCounter.longValue(); }

public long getBytesInCounter() {
return this.bytesInCounter.longValue();
}

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,8 @@ public NonPersistentTopicStats getStats() {
});

stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();

subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStats subStats = subscription.getStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,8 @@ public TopicStats getStats() {
});

stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();

subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.storageReadRate = mlStats.getReadEntriesRate();
}

stats.msgInCounter = topic.getStats().msgInCounter;
stats.bytesInCounter = topic.getStats().bytesInCounter;

stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
if (producer.isRemote()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class TopicStats {
double rateOut;
double throughputIn;
double throughputOut;
long msgInCounter;
long bytesInCounter;

long storageSize;
public long msgBacklog;
Expand Down Expand Up @@ -63,6 +65,8 @@ public void reset() {
rateOut = 0;
throughputIn = 0;
throughputOut = 0;
bytesInCounter = 0;
msgInCounter = 0;

storageSize = 0;
msgBacklog = 0;
Expand Down Expand Up @@ -162,6 +166,9 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
replStats.replicationBacklog);
});
}

metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);
}

static void metricType(SimpleTextOutputStream stream, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ public void testPerTopicStats() throws Exception {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");

cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

p1.close();
p2.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class TopicStats {

public String deduplicationStatus;

public long bytesInCounter;
public long msgInCounter;

public TopicStats() {
this.publishers = Lists.newArrayList();
this.subscriptions = Maps.newHashMap();
Expand All @@ -78,6 +81,8 @@ public void reset() {
this.averageMsgSize = 0;
this.storageSize = 0;
this.backlogSize = 0;
this.bytesInCounter = 0;
this.msgInCounter = 0;
this.publishers.clear();
this.subscriptions.clear();
this.replication.clear();
Expand All @@ -93,6 +98,8 @@ public TopicStats add(TopicStats stats) {
this.msgThroughputIn += stats.msgThroughputIn;
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
this.bytesInCounter += stats.bytesInCounter;
this.msgInCounter += stats.msgInCounter;
double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ All the topic metrics are labelled with the following labels:
| pulsar_subscription_delayed | Gauge | The total message batches (entries) are delayed for dispatching. |
| pulsar_storage_write_latency_le_* | Histogram | The entry rate of a topic that the storage write latency is smaller with a given threshold.<br> Available thresholds: <br><ul><li>pulsar_storage_write_latency_le_0_5: <= 0.5ms </li><li>pulsar_storage_write_latency_le_1: <= 1ms</li><li>pulsar_storage_write_latency_le_5: <= 5ms</li><li>pulsar_storage_write_latency_le_10: <= 10ms</li><li>pulsar_storage_write_latency_le_20: <= 20ms</li><li>pulsar_storage_write_latency_le_50: <= 50ms</li><li>pulsar_storage_write_latency_le_100: <= 100ms</li><li>pulsar_storage_write_latency_le_200: <= 200ms</li><li>pulsar_storage_write_latency_le_1000: <= 1s</li><li>pulsar_storage_write_latency_le_overflow: > 1s</li></ul> |
| pulsar_entry_size_le_* | Histogram | The entry rate of a topic that the entry size is smaller with a given threshold.<br> Available thresholds: <br><ul><li>pulsar_entry_size_le_128: <= 128 bytes </li><li>pulsar_entry_size_le_512: <= 512 bytes</li><li>pulsar_entry_size_le_1_kb: <= 1 KB</li><li>pulsar_entry_size_le_2_kb: <= 2 KB</li><li>pulsar_entry_size_le_4_kb: <= 4 KB</li><li>pulsar_entry_size_le_16_kb: <= 16 KB</li><li>pulsar_entry_size_le_100_kb: <= 100 KB</li><li>pulsar_entry_size_le_1_mb: <= 1 MB</li><li>pulsar_entry_size_le_overflow: > 1 MB</li></ul> |
| pulsar_in_bytes_total | Counter | The total number of bytes received for this topic |
| pulsar_producers_count | Counter | The total number of messages received for this topic |

#### Replication metrics

Expand Down

0 comments on commit 9b55324

Please sign in to comment.