diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 9e147517ac724..0b994c640a9f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -114,6 +114,7 @@ import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats; import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats; +import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats; import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats; @@ -265,6 +266,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private OpenTelemetryConsumerStats openTelemetryConsumerStats; private OpenTelemetryProducerStats openTelemetryProducerStats; private OpenTelemetryReplicatorStats openTelemetryReplicatorStats; + private OpenTelemetryReplicatedSubscriptionStats openTelemetryReplicatedSubscriptionStats; private OpenTelemetryTransactionCoordinatorStats openTelemetryTransactionCoordinatorStats; private OpenTelemetryTransactionPendingAckStoreStats openTelemetryTransactionPendingAckStoreStats; @@ -861,6 +863,7 @@ public void start() throws PulsarServerException { openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this); openTelemetryProducerStats = new OpenTelemetryProducerStats(this); openTelemetryReplicatorStats = new OpenTelemetryReplicatorStats(this); + openTelemetryReplicatedSubscriptionStats = new OpenTelemetryReplicatedSubscriptionStats(this); localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index a8e6885525a19..b873bc93cd1e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -39,6 +39,7 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats; import org.apache.pulsar.common.api.proto.ClusterMessageId; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -49,6 +50,7 @@ import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate; import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; /** * Encapsulate all the logic of replicated subscriptions tracking for a given topic. @@ -70,19 +72,25 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P private final ConcurrentMap pendingSnapshots = new ConcurrentHashMap<>(); + @PulsarDeprecatedMetric( + newMetricName = OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME) + @Deprecated private static final Gauge pendingSnapshotsMetric = Gauge .build("pulsar_replicated_subscriptions_pending_snapshots", "Counter of currently pending snapshots") .register(); + private final OpenTelemetryReplicatedSubscriptionStats stats; + public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) { this.topic = topic; this.localCluster = localCluster; - timer = topic.getBrokerService().pulsar().getExecutor() + var pulsar = topic.getBrokerService().pulsar(); + timer = pulsar.getExecutor() .scheduleAtFixedRate(catchingAndLoggingThrowables(this::startNewSnapshot), 0, - topic.getBrokerService().pulsar().getConfiguration() - .getReplicatedSubscriptionsSnapshotFrequencyMillis(), + pulsar.getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(), TimeUnit.MILLISECONDS); + stats = pulsar.getOpenTelemetryReplicatedSubscriptionStats(); } public void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) { @@ -233,11 +241,11 @@ private void startNewSnapshot() { } pendingSnapshotsMetric.inc(); + stats.recordSnapshotStarted(); ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this, topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC()); pendingSnapshots.put(builder.getSnapshotId(), builder); builder.start(); - } public Optional getLastCompletedSnapshotId() { @@ -254,6 +262,8 @@ private void cleanupTimedOutSnapshots() { } pendingSnapshotsMetric.dec(); + var latencyMillis = entry.getValue().getDurationMillis(); + stats.recordSnapshotTimedOut(latencyMillis); it.remove(); } } @@ -261,11 +271,15 @@ private void cleanupTimedOutSnapshots() { void snapshotCompleted(String snapshotId) { ReplicatedSubscriptionsSnapshotBuilder snapshot = pendingSnapshots.remove(snapshotId); - pendingSnapshotsMetric.dec(); lastCompletedSnapshotId = snapshotId; if (snapshot != null) { lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis(); + + pendingSnapshotsMetric.dec(); + var latencyMillis = snapshot.getDurationMillis(); + ReplicatedSubscriptionsSnapshotBuilder.SNAPSHOT_METRIC.observe(latencyMillis); + stats.recordSnapshotCompleted(latencyMillis); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java index 4eb20f02907c0..0dacade3eed1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java @@ -30,9 +30,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats; import org.apache.pulsar.common.api.proto.MarkersMessageIdData; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse; import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; @Slf4j public class ReplicatedSubscriptionsSnapshotBuilder { @@ -52,11 +54,13 @@ public class ReplicatedSubscriptionsSnapshotBuilder { private final Clock clock; - private static final Summary snapshotMetric = Summary.build("pulsar_replicated_subscriptions_snapshot_ms", + @PulsarDeprecatedMetric(newMetricName = OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME) + @Deprecated + public static final Summary SNAPSHOT_METRIC = Summary.build("pulsar_replicated_subscriptions_snapshot_ms", "Time taken to create a consistent snapshot across clusters").register(); public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller, - List remoteClusters, ServiceConfiguration conf, Clock clock) { + List remoteClusters, ServiceConfiguration conf, Clock clock) { this.snapshotId = UUID.randomUUID().toString(); this.controller = controller; this.remoteClusters = remoteClusters; @@ -123,8 +127,6 @@ synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscrip p.getLedgerId(), p.getEntryId(), responses)); controller.snapshotCompleted(snapshotId); - double latencyMillis = clock.millis() - startTimeMillis; - snapshotMetric.observe(latencyMillis); } boolean isTimedOut() { @@ -134,4 +136,8 @@ boolean isTimedOut() { long getStartTimeMillis() { return startTimeMillis; } + + long getDurationMillis() { + return clock.millis() - startTimeMillis; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java new file mode 100644 index 0000000000000..55982eba24312 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.stats.MetricsUtil; + +public class OpenTelemetryReplicatedSubscriptionStats { + + public static final AttributeKey SNAPSHOT_OPERATION_RESULT = + AttributeKey.stringKey("pulsar.replication.subscription.snapshot.operation.result"); + public enum SnapshotOperationResult { + SUCCESS, + TIMEOUT; + private final Attributes attributes = Attributes.of(SNAPSHOT_OPERATION_RESULT, name().toLowerCase()); + } + + public static final String SNAPSHOT_OPERATION_COUNT_METRIC_NAME = + "pulsar.broker.replication.subscription.snapshot.operation.count"; + private final LongCounter snapshotOperationCounter; + + public static final String SNAPSHOT_DURATION_METRIC_NAME = + "pulsar.broker.replication.subscription.snapshot.operation.duration"; + private final DoubleHistogram snapshotDuration; + + public OpenTelemetryReplicatedSubscriptionStats(PulsarService pulsar) { + var meter = pulsar.getOpenTelemetry().getMeter(); + snapshotOperationCounter = meter.counterBuilder(SNAPSHOT_OPERATION_COUNT_METRIC_NAME) + .setDescription("The number of snapshot operations attempted") + .setUnit("{operation}") + .build(); + snapshotDuration = meter.histogramBuilder(SNAPSHOT_DURATION_METRIC_NAME) + .setDescription("Time taken to complete a consistent snapshot operation across clusters") + .setUnit("s") + .build(); + } + + public void recordSnapshotStarted() { + snapshotOperationCounter.add(1); + } + + public void recordSnapshotTimedOut(long durationMs) { + snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, TimeUnit.MILLISECONDS), + SnapshotOperationResult.TIMEOUT.attributes); + } + + public void recordSnapshotCompleted(long durationMs) { + snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, TimeUnit.MILLISECONDS), + SnapshotOperationResult.SUCCESS.attributes); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index e5aad47dc89c7..4273e8bbaeb5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME; +import static org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -26,7 +30,8 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; - +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; @@ -141,7 +146,6 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception { producer.send(body.getBytes(StandardCharsets.UTF_8)); sentMessages.add(body); } - producer.close(); } Set receivedMessages = new LinkedHashSet<>(); @@ -170,6 +174,17 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception { // assert that all messages have been received assertEquals(new ArrayList<>(sentMessages), new ArrayList<>(receivedMessages), "Sent and received " + "messages don't match."); + + var metrics1 = metricReader1.collectAllMetrics(); + assertMetricLongSumValue(metrics1, SNAPSHOT_OPERATION_COUNT_METRIC_NAME, + Attributes.empty(),value -> assertThat(value).isPositive()); + assertMetricLongSumValue(metrics1, SNAPSHOT_OPERATION_COUNT_METRIC_NAME, + Attributes.empty(), value -> assertThat(value).isPositive()); + assertThat(metrics1) + .anySatisfy(metric -> OpenTelemetryAssertions.assertThat(metric) + .hasName(SNAPSHOT_DURATION_METRIC_NAME) + .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying( + histogramPoint -> histogramPoint.hasSumGreaterThan(0.0)))); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java index aa0015742f662..604326203e876 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java @@ -20,10 +20,9 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; - import lombok.Cleanup; - import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; @@ -48,6 +47,12 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + @Test public void createReplicatedSubscription() throws Exception { this.conf.setEnableReplicatedSubscriptions(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java index f5c3bb9d75bbd..562c5eda58109 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java @@ -25,11 +25,8 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; - import java.time.Clock; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -71,7 +68,8 @@ public void setup() { Commands.skipMessageMetadata(marker); markers.add(marker); return null; - }).when(controller) + }) + .when(controller) .writeMarker(any(ByteBuf.class)); } @@ -80,7 +78,8 @@ public void testBuildSnapshotWith2Clusters() throws Exception { List remoteClusters = Collections.singletonList("b"); ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, conf, clock); + remoteClusters, + conf, clock); assertTrue(markers.isEmpty()); @@ -93,8 +92,8 @@ public void testBuildSnapshotWith2Clusters() throws Exception { assertEquals(request.getSourceCluster(), localCluster); // Simulate the responses coming back - ReplicatedSubscriptionsSnapshotResponse response = new ReplicatedSubscriptionsSnapshotResponse() - .setSnapshotId("snapshot-1"); + ReplicatedSubscriptionsSnapshotResponse response = new ReplicatedSubscriptionsSnapshotResponse().setSnapshotId( + "snapshot-1"); response.setCluster() .setCluster("b") .setMessageId() @@ -119,7 +118,8 @@ public void testBuildSnapshotWith3Clusters() throws Exception { List remoteClusters = Arrays.asList("b", "c"); ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, conf, clock); + remoteClusters, + conf, clock); assertTrue(markers.isEmpty()); @@ -132,8 +132,8 @@ public void testBuildSnapshotWith3Clusters() throws Exception { assertEquals(request.getSourceCluster(), localCluster); // Simulate the responses coming back - ReplicatedSubscriptionsSnapshotResponse response1 = new ReplicatedSubscriptionsSnapshotResponse() - .setSnapshotId("snapshot-1"); + ReplicatedSubscriptionsSnapshotResponse response1 = new ReplicatedSubscriptionsSnapshotResponse().setSnapshotId( + "snapshot-1"); response1.setCluster() .setCluster("b") .setMessageId() @@ -144,8 +144,8 @@ public void testBuildSnapshotWith3Clusters() throws Exception { // No markers should be sent out assertTrue(markers.isEmpty()); - ReplicatedSubscriptionsSnapshotResponse response2 = new ReplicatedSubscriptionsSnapshotResponse() - .setSnapshotId("snapshot-1"); + ReplicatedSubscriptionsSnapshotResponse response2 = new ReplicatedSubscriptionsSnapshotResponse().setSnapshotId( + "snapshot-1"); response2.setCluster() .setCluster("c") .setMessageId() @@ -159,8 +159,8 @@ public void testBuildSnapshotWith3Clusters() throws Exception { assertEquals(request.getSourceCluster(), localCluster); // Responses coming back - ReplicatedSubscriptionsSnapshotResponse response3 = new ReplicatedSubscriptionsSnapshotResponse() - .setSnapshotId("snapshot-1"); + ReplicatedSubscriptionsSnapshotResponse response3 = new ReplicatedSubscriptionsSnapshotResponse().setSnapshotId( + "snapshot-1"); response3.setCluster() .setCluster("b") .setMessageId() @@ -171,8 +171,8 @@ public void testBuildSnapshotWith3Clusters() throws Exception { // No markers should be sent out assertTrue(markers.isEmpty()); - ReplicatedSubscriptionsSnapshotResponse response4 = new ReplicatedSubscriptionsSnapshotResponse() - .setSnapshotId("snapshot-1"); + ReplicatedSubscriptionsSnapshotResponse response4 = new ReplicatedSubscriptionsSnapshotResponse().setSnapshotId( + "snapshot-1"); response4.setCluster() .setCluster("c") .setMessageId() @@ -201,7 +201,8 @@ public void testBuildTimeout() { List remoteClusters = Collections.singletonList("b"); ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, conf, clock); + remoteClusters, + conf, clock); assertFalse(builder.isTimedOut());