Skip to content

Commit

Permalink
[feat][broker] PIP-264: Add replication subscription stats (apache#23026
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dragosvictor authored Aug 30, 2024
1 parent dccc06b commit ed14f21
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats;
Expand Down Expand Up @@ -265,6 +266,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OpenTelemetryConsumerStats openTelemetryConsumerStats;
private OpenTelemetryProducerStats openTelemetryProducerStats;
private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;
private OpenTelemetryReplicatedSubscriptionStats openTelemetryReplicatedSubscriptionStats;
private OpenTelemetryTransactionCoordinatorStats openTelemetryTransactionCoordinatorStats;
private OpenTelemetryTransactionPendingAckStoreStats openTelemetryTransactionPendingAckStoreStats;

Expand Down Expand Up @@ -861,6 +863,7 @@ public void start() throws PulsarServerException {
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
openTelemetryReplicatorStats = new OpenTelemetryReplicatorStats(this);
openTelemetryReplicatedSubscriptionStats = new OpenTelemetryReplicatedSubscriptionStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
import org.apache.pulsar.common.api.proto.ClusterMessageId;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand All @@ -49,6 +50,7 @@
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;

/**
* Encapsulate all the logic of replicated subscriptions tracking for a given topic.
Expand All @@ -70,19 +72,25 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots =
new ConcurrentHashMap<>();

@PulsarDeprecatedMetric(
newMetricName = OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME)
@Deprecated
private static final Gauge pendingSnapshotsMetric = Gauge
.build("pulsar_replicated_subscriptions_pending_snapshots",
"Counter of currently pending snapshots")
.register();

private final OpenTelemetryReplicatedSubscriptionStats stats;

public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
this.topic = topic;
this.localCluster = localCluster;
timer = topic.getBrokerService().pulsar().getExecutor()
var pulsar = topic.getBrokerService().pulsar();
timer = pulsar.getExecutor()
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::startNewSnapshot), 0,
topic.getBrokerService().pulsar().getConfiguration()
.getReplicatedSubscriptionsSnapshotFrequencyMillis(),
pulsar.getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(),
TimeUnit.MILLISECONDS);
stats = pulsar.getOpenTelemetryReplicatedSubscriptionStats();
}

public void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
Expand Down Expand Up @@ -233,11 +241,11 @@ private void startNewSnapshot() {
}

pendingSnapshotsMetric.inc();
stats.recordSnapshotStarted();
ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
pendingSnapshots.put(builder.getSnapshotId(), builder);
builder.start();

}

public Optional<String> getLastCompletedSnapshotId() {
Expand All @@ -254,18 +262,24 @@ private void cleanupTimedOutSnapshots() {
}

pendingSnapshotsMetric.dec();
var latencyMillis = entry.getValue().getDurationMillis();
stats.recordSnapshotTimedOut(latencyMillis);
it.remove();
}
}
}

void snapshotCompleted(String snapshotId) {
ReplicatedSubscriptionsSnapshotBuilder snapshot = pendingSnapshots.remove(snapshotId);
pendingSnapshotsMetric.dec();
lastCompletedSnapshotId = snapshotId;

if (snapshot != null) {
lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();

pendingSnapshotsMetric.dec();
var latencyMillis = snapshot.getDurationMillis();
ReplicatedSubscriptionsSnapshotBuilder.SNAPSHOT_METRIC.observe(latencyMillis);
stats.recordSnapshotCompleted(latencyMillis);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;

@Slf4j
public class ReplicatedSubscriptionsSnapshotBuilder {
Expand All @@ -52,11 +54,13 @@ public class ReplicatedSubscriptionsSnapshotBuilder {

private final Clock clock;

private static final Summary snapshotMetric = Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
@PulsarDeprecatedMetric(newMetricName = OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME)
@Deprecated
public static final Summary SNAPSHOT_METRIC = Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
"Time taken to create a consistent snapshot across clusters").register();

public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller,
List<String> remoteClusters, ServiceConfiguration conf, Clock clock) {
List<String> remoteClusters, ServiceConfiguration conf, Clock clock) {
this.snapshotId = UUID.randomUUID().toString();
this.controller = controller;
this.remoteClusters = remoteClusters;
Expand Down Expand Up @@ -123,8 +127,6 @@ synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscrip
p.getLedgerId(), p.getEntryId(), responses));
controller.snapshotCompleted(snapshotId);

double latencyMillis = clock.millis() - startTimeMillis;
snapshotMetric.observe(latencyMillis);
}

boolean isTimedOut() {
Expand All @@ -134,4 +136,8 @@ boolean isTimedOut() {
long getStartTimeMillis() {
return startTimeMillis;
}

long getDurationMillis() {
return clock.millis() - startTimeMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.stats.MetricsUtil;

public class OpenTelemetryReplicatedSubscriptionStats {

public static final AttributeKey<String> SNAPSHOT_OPERATION_RESULT =
AttributeKey.stringKey("pulsar.replication.subscription.snapshot.operation.result");
public enum SnapshotOperationResult {
SUCCESS,
TIMEOUT;
private final Attributes attributes = Attributes.of(SNAPSHOT_OPERATION_RESULT, name().toLowerCase());
}

public static final String SNAPSHOT_OPERATION_COUNT_METRIC_NAME =
"pulsar.broker.replication.subscription.snapshot.operation.count";
private final LongCounter snapshotOperationCounter;

public static final String SNAPSHOT_DURATION_METRIC_NAME =
"pulsar.broker.replication.subscription.snapshot.operation.duration";
private final DoubleHistogram snapshotDuration;

public OpenTelemetryReplicatedSubscriptionStats(PulsarService pulsar) {
var meter = pulsar.getOpenTelemetry().getMeter();
snapshotOperationCounter = meter.counterBuilder(SNAPSHOT_OPERATION_COUNT_METRIC_NAME)
.setDescription("The number of snapshot operations attempted")
.setUnit("{operation}")
.build();
snapshotDuration = meter.histogramBuilder(SNAPSHOT_DURATION_METRIC_NAME)
.setDescription("Time taken to complete a consistent snapshot operation across clusters")
.setUnit("s")
.build();
}

public void recordSnapshotStarted() {
snapshotOperationCounter.add(1);
}

public void recordSnapshotTimedOut(long durationMs) {
snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, TimeUnit.MILLISECONDS),
SnapshotOperationResult.TIMEOUT.attributes);
}

public void recordSnapshotCompleted(long durationMs) {
snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, TimeUnit.MILLISECONDS),
SnapshotOperationResult.SUCCESS.attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME;
import static org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
Expand All @@ -26,7 +30,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -141,7 +146,6 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
producer.send(body.getBytes(StandardCharsets.UTF_8));
sentMessages.add(body);
}
producer.close();
}

Set<String> receivedMessages = new LinkedHashSet<>();
Expand Down Expand Up @@ -170,6 +174,17 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
// assert that all messages have been received
assertEquals(new ArrayList<>(sentMessages), new ArrayList<>(receivedMessages), "Sent and received " +
"messages don't match.");

var metrics1 = metricReader1.collectAllMetrics();
assertMetricLongSumValue(metrics1, SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
Attributes.empty(),value -> assertThat(value).isPositive());
assertMetricLongSumValue(metrics1, SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
Attributes.empty(), value -> assertThat(value).isPositive());
assertThat(metrics1)
.anySatisfy(metric -> OpenTelemetryAssertions.assertThat(metric)
.hasName(SNAPSHOT_DURATION_METRIC_NAME)
.hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying(
histogramPoint -> histogramPoint.hasSumGreaterThan(0.0))));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import lombok.Cleanup;

import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -48,6 +47,12 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) {
super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
pulsarTestContextBuilder.enableOpenTelemetry(true);
}

@Test
public void createReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
Expand Down
Loading

0 comments on commit ed14f21

Please sign in to comment.