Skip to content

Commit

Permalink
Added backlog and offloaded size in Prometheus stats (apache#4150)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Apr 30, 2019
1 parent 436e4ba commit 2e63911
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ public interface ManagedLedger {
*/
long getEstimatedBacklogSize();

/**
* Return the size of all ledgers offloaded to 2nd tier storage
*/
long getOffloadedSize();

/**
* Activate cursors those caught up backlog-threshold entries and deactivate slow cursors which are creating
* backlog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ enum PositionBound {
private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
// last read-operation's callback to check read-timeout on it.
// last read-operation's callback to check read-timeout on it.
private volatile ReadEntryCallbackWrapper lastReadCallback = null;

/**
Expand Down Expand Up @@ -3139,6 +3139,18 @@ private void checkReadTimeout() {
}
}

@Override
public long getOffloadedSize() {
long offloadedSize = 0;
for (LedgerInfo li : ledgers.values()) {
if (li.hasOffloadContext() && li.getOffloadContext().getComplete()) {
offloadedSize += li.getSize();
}
}

return offloadedSize;
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class AggregatedNamespaceStats {
public long storageSize;
public long msgBacklog;

long backlogSize;
long offloadedStorageUsed;
long backlogQuotaLimit;

public StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(
ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
public StatsBuckets entrySizeBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
Expand All @@ -61,6 +65,8 @@ void updateStats(TopicStats stats) {
throughputOut += stats.throughputOut;

storageSize += stats.storageSize;
backlogSize += stats.backlogSize;
offloadedStorageUsed += stats.offloadedStorageUsed;

storageWriteRate += stats.storageWriteRate;
storageReadRate += stats.storageReadRate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
*/
package org.apache.pulsar.broker.stats.prometheus;

import io.netty.util.concurrent.FastThreadLocal;

import java.util.concurrent.atomic.LongAdder;

import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

import io.netty.util.concurrent.FastThreadLocal;

public class NamespaceStatsAggregator {

private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() {
Expand Down Expand Up @@ -84,9 +86,13 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include

if (topic instanceof PersistentTopic) {
// Managed Ledger stats
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ((PersistentTopic) topic).getManagedLedger().getStats();
ManagedLedger ml = ((PersistentTopic) topic).getManagedLedger();
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ml.getStats();

stats.storageSize = mlStats.getStoredMessagesSize();
stats.backlogSize = ml.getEstimatedBacklogSize();
stats.offloadedStorageUsed = ml.getOffloadedSize();
stats.backlogQuotaLimit = topic.getBacklogQuota().getLimit();

stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
stats.storageWriteLatencyBuckets.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class TopicStats {
long storageSize;
public long msgBacklog;

long backlogSize;
long offloadedStorageUsed;

long backlogQuotaLimit;

StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
StatsBuckets entrySizeBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
double storageWriteRate;
Expand All @@ -59,6 +64,9 @@ public void reset() {
msgBacklog = 0;
storageWriteRate = 0;
storageReadRate = 0;
backlogSize = 0;
offloadedStorageUsed = 0;
backlogQuotaLimit = 0;

replicationStats.clear();
subscriptionStats.clear();
Expand All @@ -79,6 +87,9 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin

metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.storageSize);
metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size", stats.backlogSize);
metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit);

long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
Expand Down

0 comments on commit 2e63911

Please sign in to comment.