Skip to content

Commit

Permalink
add metric for InMemoryDelayedDeliveryTracker's memory usage (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tjiuming authored Aug 2, 2022
1 parent 3bc7c92 commit efc3993
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public long getNumberOfDelayedMessages() {
return priorityQueue.size();
}

public long getBufferMemoryUsage() {
return priorityQueue.bytesCapacity();
}

/**
* Update the scheduled timer task such that:
* 1. If there are no delayed messages, return and do not schedule a timer task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -1099,6 +1100,19 @@ public PersistentTopic getTopic() {
return topic;
}


public long getDelayedTrackerMemoryUsage() {
if (delayedDeliveryTracker.isEmpty()) {
return 0;
}

if (delayedDeliveryTracker.get() instanceof InMemoryDelayedDeliveryTracker) {
return ((InMemoryDelayedDeliveryTracker) delayedDeliveryTracker.get()).getBufferMemoryUsage();
}

return 0;
}

protected int getStickyKeyHash(Entry entry) {
return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,12 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.activeConsumerName = activeConsumer.consumerName();
}
}

if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
subStats.delayedTrackerMemoryUsage =
((PersistentDispatcherMultipleConsumers) dispatcher).getDelayedTrackerMemoryUsage();
}

if (Subscription.isIndividualAckMode(subType)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,7 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.delayedMessageIndexSizeInBytes += subStats.delayedTrackerMemoryUsage;
});

replicators.forEach((cluster, replicator) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class AggregatedNamespaceStats {
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
int delayedTrackerMemoryUsage;

void updateStats(TopicStats stats) {
topicsCount++;
Expand All @@ -76,6 +77,7 @@ void updateStats(TopicStats stats) {
msgInCounter += stats.msgInCounter;
bytesOutCounter += stats.bytesOutCounter;
msgOutCounter += stats.msgOutCounter;
delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;

managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
Expand Down Expand Up @@ -156,5 +158,6 @@ public void reset() {

replicationStats.clear();
subscriptionStats.clear();
delayedTrackerMemoryUsage = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
stats.delayedTrackerMemoryUsage = tStatus.delayedMessageIndexSizeInBytes;

stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
Expand Down Expand Up @@ -348,6 +349,9 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl

metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);

metric(stream, cluster, namespace, "pulsar_delayed_message_index_size_bytes",
stats.delayedTrackerMemoryUsage);

metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);

stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class TopicStats {
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
public int delayedTrackerMemoryUsage;

public void reset() {
subscriptionsCount = 0;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void reset() {
compactionCompactedEntriesCount = 0;
compactionCompactedEntriesSize = 0;
compactionLatencyBuckets.reset();
delayedTrackerMemoryUsage = 0;
}

static void resetTypes() {
Expand Down Expand Up @@ -148,6 +150,9 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);

metric(stream, cluster, namespace, topic, "pulsar_delayed_message_index_size_bytes",
stats.delayedTrackerMemoryUsage, splitTopicAndPartitionIndexLabel);

long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
splitTopicAndPartitionIndexLabel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
Expand Down Expand Up @@ -269,4 +274,77 @@ public void testPersistentPartitionedTopicUnload() throws Exception {
producer.close();
}
}


@DataProvider(name = "topicAndMetricsLevel")
public Object[][] indexPatternTestData() {
return new Object[][]{
new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", true},
new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", false},
};
}


@Test(dataProvider = "topicAndMetricsLevel")
public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception {
PulsarClient client = pulsar.getClient();
String namespace = TopicName.get(topic).getNamespace();
admin.namespaces().createNamespace(namespace);

final int messages = 100;
CountDownLatch latch = new CountDownLatch(messages);

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test_sub")
.subscriptionType(SubscriptionType.Shared)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
try {
latch.countDown();
consumer1.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
})
.subscribe();
for (int a = 0; a < messages; a++) {
producer.newMessage()
.value(UUID.randomUUID().toString())
.deliverAfter(30, TimeUnit.SECONDS)
.sendAsync();
}
producer.flush();

latch.await(10, TimeUnit.SECONDS);
ByteArrayOutputStream output = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
String metricsStr = output.toString(StandardCharsets.UTF_8);

Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes");
Assert.assertTrue(metrics.size() > 0);

int topicLevelNum = 0;
int namespaceLevelNum = 0;
for (PrometheusMetricsTest.Metric metric : metrics) {
if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) {
Assert.assertTrue(metric.value > 0);
topicLevelNum++;
} else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
Assert.assertTrue(metric.value > 0);
namespaceLevelNum++;
}
}

if (exposeTopicLevelMetrics) {
Assert.assertTrue(topicLevelNum > 0);
Assert.assertEquals(0, namespaceLevelNum);
} else {
Assert.assertTrue(namespaceLevelNum > 0);
Assert.assertEquals(topicLevelNum, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
/** The serialized size of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRangesSerializedSize;

/** The size of InMemoryDelayedDeliveryTracer memory usage. */
public long delayedTrackerMemoryUsage;

/** SubscriptionProperties (key/value strings) associated with this subscribe. */
public Map<String, String> subscriptionProperties;

Expand Down Expand Up @@ -158,6 +161,7 @@ public void reset() {
consumersAfterMarkDeletePosition.clear();
nonContiguousDeletedMessagesRanges = 0;
nonContiguousDeletedMessagesRangesSerializedSize = 0;
delayedTrackerMemoryUsage = 0;
subscriptionProperties.clear();
}

Expand Down Expand Up @@ -193,6 +197,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) {
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
this.subscriptionProperties.putAll(stats.subscriptionProperties);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public class TopicStatsImpl implements TopicStats {
/** The serialized size of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRangesSerializedSize;

/** The size of InMemoryDelayedDeliveryTracer memory usage. */
public int delayedMessageIndexSizeInBytes;

/** The compaction stats. */
public CompactionStatsImpl compaction;

Expand Down Expand Up @@ -200,6 +203,7 @@ public void reset() {
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
this.publishRateLimitedTimes = 0L;
this.delayedMessageIndexSizeInBytes = 0;
this.compaction.reset();
}

Expand All @@ -226,6 +230,7 @@ public TopicStatsImpl add(TopicStats ts) {
this.offloadedStorageSize += stats.offloadedStorageSize;
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;

stats.getPublishers().forEach(s -> {
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
Expand Down

0 comments on commit efc3993

Please sign in to comment.