From efc39937c2cd54e777b7eab2ba5bf1b3bfd50734 Mon Sep 17 00:00:00 2001 From: Tao Jiuming <95597048+tjiuming@users.noreply.github.com> Date: Tue, 2 Aug 2022 18:54:50 +0800 Subject: [PATCH] add metric for InMemoryDelayedDeliveryTracker's memory usage (#15867) --- .../InMemoryDelayedDeliveryTracker.java | 4 + ...PersistentDispatcherMultipleConsumers.java | 14 +++ .../persistent/PersistentSubscription.java | 6 ++ .../service/persistent/PersistentTopic.java | 1 + .../prometheus/AggregatedNamespaceStats.java | 3 + .../prometheus/NamespaceStatsAggregator.java | 4 + .../broker/stats/prometheus/TopicStats.java | 5 ++ .../persistent/PersistentTopicTest.java | 90 +++++++++++++++++-- .../data/stats/SubscriptionStatsImpl.java | 5 ++ .../policies/data/stats/TopicStatsImpl.java | 5 ++ 10 files changed, 131 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 837d3d1872c3e..390ce5d5071b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -192,6 +192,10 @@ public long getNumberOfDelayedMessages() { return priorityQueue.size(); } + public long getBufferMemoryUsage() { + return priorityQueue.bytesCapacity(); + } + /** * Update the scheduled timer task such that: * 1. If there are no delayed messages, return and do not schedule a timer task. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 167bc188e999c..a02b76c9aed47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -46,6 +46,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; +import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; @@ -1099,6 +1100,19 @@ public PersistentTopic getTopic() { return topic; } + + public long getDelayedTrackerMemoryUsage() { + if (delayedDeliveryTracker.isEmpty()) { + return 0; + } + + if (delayedDeliveryTracker.get() instanceof InMemoryDelayedDeliveryTracker) { + return ((InMemoryDelayedDeliveryTracker) delayedDeliveryTracker.get()).getBufferMemoryUsage(); + } + + return 0; + } + protected int getStickyKeyHash(Entry entry) { return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 22f94ee054504..a88744b4edce5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1099,6 +1099,12 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri subStats.activeConsumerName = activeConsumer.consumerName(); } } + + if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { + subStats.delayedTrackerMemoryUsage = + ((PersistentDispatcherMultipleConsumers) dispatcher).getDelayedTrackerMemoryUsage(); + } + if (Subscription.isIndividualAckMode(subType)) { if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 335355b112ba8..00e4bc01ec4ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1886,6 +1886,7 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges; stats.nonContiguousDeletedMessagesRangesSerializedSize += subStats.nonContiguousDeletedMessagesRangesSerializedSize; + stats.delayedMessageIndexSizeInBytes += subStats.delayedTrackerMemoryUsage; }); replicators.forEach((cluster, replicator) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 5610dbab218e0..761094ac0e61b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -59,6 +59,7 @@ public class AggregatedNamespaceStats { long compactionCompactedEntriesCount; long compactionCompactedEntriesSize; StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC); + int delayedTrackerMemoryUsage; void updateStats(TopicStats stats) { topicsCount++; @@ -76,6 +77,7 @@ void updateStats(TopicStats stats) { msgInCounter += stats.msgInCounter; bytesOutCounter += stats.bytesOutCounter; msgOutCounter += stats.msgOutCounter; + delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage; managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize; managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize; @@ -156,5 +158,6 @@ public void reset() { replicationStats.clear(); subscriptionStats.clear(); + delayedTrackerMemoryUsage = 0; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 69e2575562154..c3e8567de54d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -184,6 +184,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.bytesOutCounter = tStatus.bytesOutCounter; stats.averageMsgSize = tStatus.averageMsgSize; stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes; + stats.delayedTrackerMemoryUsage = tStatus.delayedMessageIndexSizeInBytes; stats.producersCount = 0; topic.getProducers().values().forEach(producer -> { @@ -348,6 +349,9 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed); + metric(stream, cluster, namespace, "pulsar_delayed_message_index_size_bytes", + stats.delayedTrackerMemoryUsage); + metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog); stats.managedLedgerStats.storageWriteLatencyBuckets.refresh(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 7f38a323cdc41..99838ccfae920 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -67,6 +67,7 @@ class TopicStats { long compactionCompactedEntriesCount; long compactionCompactedEntriesSize; StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC); + public int delayedTrackerMemoryUsage; public void reset() { subscriptionsCount = 0; @@ -100,6 +101,7 @@ public void reset() { compactionCompactedEntriesCount = 0; compactionCompactedEntriesSize = 0; compactionLatencyBuckets.reset(); + delayedTrackerMemoryUsage = 0; } static void resetTypes() { @@ -148,6 +150,9 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_delayed_message_index_size_bytes", + stats.delayedTrackerMemoryUsage, splitTopicAndPartitionIndexLabel); + long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets(); metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], splitTopicAndPartitionIndexLabel); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index a150e039d7785..a62e8762ac525 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -29,28 +29,33 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageRoutingMode; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.broker.stats.PrometheusMetricsTest; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; +import org.junit.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -269,4 +274,77 @@ public void testPersistentPartitionedTopicUnload() throws Exception { producer.close(); } } + + + @DataProvider(name = "topicAndMetricsLevel") + public Object[][] indexPatternTestData() { + return new Object[][]{ + new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", true}, + new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", false}, + }; + } + + + @Test(dataProvider = "topicAndMetricsLevel") + public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception { + PulsarClient client = pulsar.getClient(); + String namespace = TopicName.get(topic).getNamespace(); + admin.namespaces().createNamespace(namespace); + + final int messages = 100; + CountDownLatch latch = new CountDownLatch(messages); + + @Cleanup + Producer producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + @Cleanup + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub") + .subscriptionType(SubscriptionType.Shared) + .messageListener((MessageListener) (consumer1, msg) -> { + try { + latch.countDown(); + consumer1.acknowledge(msg); + } catch (PulsarClientException e) { + e.printStackTrace(); + } + }) + .subscribe(); + for (int a = 0; a < messages; a++) { + producer.newMessage() + .value(UUID.randomUUID().toString()) + .deliverAfter(30, TimeUnit.SECONDS) + .sendAsync(); + } + producer.flush(); + + latch.await(10, TimeUnit.SECONDS); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output); + String metricsStr = output.toString(StandardCharsets.UTF_8); + + Multimap metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr); + Collection metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes"); + Assert.assertTrue(metrics.size() > 0); + + int topicLevelNum = 0; + int namespaceLevelNum = 0; + for (PrometheusMetricsTest.Metric metric : metrics) { + if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) { + Assert.assertTrue(metric.value > 0); + topicLevelNum++; + } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) { + Assert.assertTrue(metric.value > 0); + namespaceLevelNum++; + } + } + + if (exposeTopicLevelMetrics) { + Assert.assertTrue(topicLevelNum > 0); + Assert.assertEquals(0, namespaceLevelNum); + } else { + Assert.assertTrue(namespaceLevelNum > 0); + Assert.assertEquals(topicLevelNum, 0); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index cd163d848230b..a945ae9cf86d5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -131,6 +131,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** The serialized size of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRangesSerializedSize; + /** The size of InMemoryDelayedDeliveryTracer memory usage. */ + public long delayedTrackerMemoryUsage; + /** SubscriptionProperties (key/value strings) associated with this subscribe. */ public Map subscriptionProperties; @@ -158,6 +161,7 @@ public void reset() { consumersAfterMarkDeletePosition.clear(); nonContiguousDeletedMessagesRanges = 0; nonContiguousDeletedMessagesRangesSerializedSize = 0; + delayedTrackerMemoryUsage = 0; subscriptionProperties.clear(); } @@ -193,6 +197,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition); this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize; + this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage; this.subscriptionProperties.putAll(stats.subscriptionProperties); return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index 3e90ca9be945c..f6cdbba5036f5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -131,6 +131,9 @@ public class TopicStatsImpl implements TopicStats { /** The serialized size of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRangesSerializedSize; + /** The size of InMemoryDelayedDeliveryTracer memory usage. */ + public int delayedMessageIndexSizeInBytes; + /** The compaction stats. */ public CompactionStatsImpl compaction; @@ -200,6 +203,7 @@ public void reset() { this.lastOffloadFailureTimeStamp = 0; this.lastOffloadSuccessTimeStamp = 0; this.publishRateLimitedTimes = 0L; + this.delayedMessageIndexSizeInBytes = 0; this.compaction.reset(); } @@ -226,6 +230,7 @@ public TopicStatsImpl add(TopicStats ts) { this.offloadedStorageSize += stats.offloadedStorageSize; this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize; + this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes; stats.getPublishers().forEach(s -> { if (s.isSupportsPartialProducer() && s.getProducerName() != null) {