Skip to content

Commit

Permalink
KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911 (apache#…
Browse files Browse the repository at this point in the history
…13420)

New add.source.alias.to.metrics setting to add the source cluster alias to the MirrorSourceConnector metrics

Reviewers: Chris Egerton <[email protected]>
  • Loading branch information
mimaison authored Mar 21, 2023
1 parent 1b278c4 commit 1d8f799
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
public static final long OFFSET_LAG_MAX_DEFAULT = 100L;

public static final String ADD_SOURCE_ALIAS_TO_METRICS = "add.source.alias.to.metrics";
private static final String ADD_SOURCE_ALIAS_TO_METRICS_DOC = "Deprecated. Whether to tag metrics with the source cluster alias. "
+ "Metrics have the target, topic and partition tags. When this setting is enabled, it adds the source tag. "
+ "This configuration will be removed in Kafka 4.0 and the default behavior will be to always have the source tag.";
public static final boolean ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT = false;

public MirrorSourceConfig(Map<String, String> props) {
super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
{TOPICS_EXCLUDE, TOPICS_EXCLUDE_ALIAS},
Expand Down Expand Up @@ -191,6 +197,10 @@ Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}

boolean addSourceAliasToMetrics() {
return getBoolean(ADD_SOURCE_ALIAS_TO_METRICS);
}

protected static final ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)
.define(
TOPICS,
Expand Down Expand Up @@ -300,7 +310,13 @@ Duration consumerPollTimeout() {
OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT,
ConfigDef.ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
ConfigDef.Importance.LOW,
OFFSET_SYNCS_TOPIC_LOCATION_DOC);
OFFSET_SYNCS_TOPIC_LOCATION_DOC)
.define(
ADD_SOURCE_ALIAS_TO_METRICS,
ConfigDef.Type.BOOLEAN,
ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT,
ConfigDef.Importance.LOW,
ADD_SOURCE_ALIAS_TO_METRICS_DOC);

public static void main(String[] args) {
System.out.println(CONNECTOR_CONFIG_DEF.toHtml(4, config -> "mirror_source_" + config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,56 +39,72 @@ class MirrorSourceMetrics implements AutoCloseable {

private static final String SOURCE_CONNECTOR_GROUP = MirrorSourceConnector.class.getSimpleName();

private static final Set<String> PARTITION_TAGS = new HashSet<>(Arrays.asList("target", "topic", "partition"));

private static final MetricNameTemplate RECORD_COUNT = new MetricNameTemplate(
"record-count", SOURCE_CONNECTOR_GROUP,
"Number of source records replicated to the target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate RECORD_RATE = new MetricNameTemplate(
"record-rate", SOURCE_CONNECTOR_GROUP,
"Average number of source records replicated to the target cluster per second.", PARTITION_TAGS);
private static final MetricNameTemplate RECORD_AGE = new MetricNameTemplate(
"record-age-ms", SOURCE_CONNECTOR_GROUP,
"The age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate RECORD_AGE_MAX = new MetricNameTemplate(
"record-age-ms-max", SOURCE_CONNECTOR_GROUP,
"The max age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate RECORD_AGE_MIN = new MetricNameTemplate(
"record-age-ms-min", SOURCE_CONNECTOR_GROUP,
"The min age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate RECORD_AGE_AVG = new MetricNameTemplate(
"record-age-ms-avg", SOURCE_CONNECTOR_GROUP,
"The average age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate BYTE_COUNT = new MetricNameTemplate(
"byte-count", SOURCE_CONNECTOR_GROUP,
"Number of bytes replicated to the target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate BYTE_RATE = new MetricNameTemplate(
"byte-rate", SOURCE_CONNECTOR_GROUP,
"Average number of bytes replicated per second.", PARTITION_TAGS);
private static final MetricNameTemplate REPLICATION_LATENCY = new MetricNameTemplate(
"replication-latency-ms", SOURCE_CONNECTOR_GROUP,
"Time it takes records to replicate from source to target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate REPLICATION_LATENCY_MAX = new MetricNameTemplate(
"replication-latency-ms-max", SOURCE_CONNECTOR_GROUP,
"Max time it takes records to replicate from source to target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate REPLICATION_LATENCY_MIN = new MetricNameTemplate(
"replication-latency-ms-min", SOURCE_CONNECTOR_GROUP,
"Min time it takes records to replicate from source to target cluster.", PARTITION_TAGS);
private static final MetricNameTemplate REPLICATION_LATENCY_AVG = new MetricNameTemplate(
"replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
"Average time it takes records to replicate from source to target cluster.", PARTITION_TAGS);

private final MetricNameTemplate recordCount;
private final MetricNameTemplate recordRate;
private final MetricNameTemplate recordAge;
private final MetricNameTemplate recordAgeMax;
private final MetricNameTemplate recordAgeMin;
private final MetricNameTemplate recordAgeAvg;
private final MetricNameTemplate byteCount;
private final MetricNameTemplate byteRate;
private final MetricNameTemplate replicationLatency;
private final MetricNameTemplate replicationLatencyMax;
private final MetricNameTemplate replicationLatencyMin;
private final MetricNameTemplate replicationLatencyAvg;

private final Metrics metrics;
private final Map<TopicPartition, PartitionMetrics> partitionMetrics;
private final String source;
private final String target;
private final boolean addSourceAlias;

MirrorSourceMetrics(MirrorSourceTaskConfig taskConfig) {
this.target = taskConfig.targetClusterAlias();
this.source = taskConfig.sourceClusterAlias();
this.addSourceAlias = taskConfig.addSourceAliasToMetrics();
this.metrics = new Metrics();

Set<String> partitionTags = new HashSet<>(addSourceAlias
? Arrays.asList("source", "target", "topic", "partition")
: Arrays.asList("target", "topic", "partition"));

recordCount = new MetricNameTemplate(
"record-count", SOURCE_CONNECTOR_GROUP,
"Number of source records replicated to the target cluster.", partitionTags);
recordRate = new MetricNameTemplate(
"record-rate", SOURCE_CONNECTOR_GROUP,
"Average number of source records replicated to the target cluster per second.", partitionTags);
recordAge = new MetricNameTemplate(
"record-age-ms", SOURCE_CONNECTOR_GROUP,
"The age of incoming source records when replicated to the target cluster.", partitionTags);
recordAgeMax = new MetricNameTemplate(
"record-age-ms-max", SOURCE_CONNECTOR_GROUP,
"The max age of incoming source records when replicated to the target cluster.", partitionTags);
recordAgeMin = new MetricNameTemplate(
"record-age-ms-min", SOURCE_CONNECTOR_GROUP,
"The min age of incoming source records when replicated to the target cluster.", partitionTags);
recordAgeAvg = new MetricNameTemplate(
"record-age-ms-avg", SOURCE_CONNECTOR_GROUP,
"The average age of incoming source records when replicated to the target cluster.", partitionTags);
byteCount = new MetricNameTemplate(
"byte-count", SOURCE_CONNECTOR_GROUP,
"Number of bytes replicated to the target cluster.", partitionTags);
byteRate = new MetricNameTemplate(
"byte-rate", SOURCE_CONNECTOR_GROUP,
"Average number of bytes replicated per second.", partitionTags);
replicationLatency = new MetricNameTemplate(
"replication-latency-ms", SOURCE_CONNECTOR_GROUP,
"Time it takes records to replicate from source to target cluster.", partitionTags);
replicationLatencyMax = new MetricNameTemplate(
"replication-latency-ms-max", SOURCE_CONNECTOR_GROUP,
"Max time it takes records to replicate from source to target cluster.", partitionTags);
replicationLatencyMin = new MetricNameTemplate(
"replication-latency-ms-min", SOURCE_CONNECTOR_GROUP,
"Min time it takes records to replicate from source to target cluster.", partitionTags);
replicationLatencyAvg = new MetricNameTemplate(
"replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
"Average time it takes records to replicate from source to target cluster.", partitionTags);

// for side-effect
metrics.sensor("record-count");
metrics.sensor("byte-rate");
Expand Down Expand Up @@ -137,27 +153,28 @@ private class PartitionMetrics {
String prefix = topicPartition.topic() + "-" + topicPartition.partition() + "-";

Map<String, String> tags = new LinkedHashMap<>();
if (addSourceAlias) tags.put("source", source);
tags.put("target", target);
tags.put("topic", topicPartition.topic());
tags.put("partition", Integer.toString(topicPartition.partition()));

recordSensor = metrics.sensor(prefix + "records-sent");
recordSensor.add(new Meter(metrics.metricInstance(RECORD_RATE, tags), metrics.metricInstance(RECORD_COUNT, tags)));
recordSensor.add(new Meter(metrics.metricInstance(recordRate, tags), metrics.metricInstance(recordCount, tags)));

byteSensor = metrics.sensor(prefix + "bytes-sent");
byteSensor.add(new Meter(metrics.metricInstance(BYTE_RATE, tags), metrics.metricInstance(BYTE_COUNT, tags)));
byteSensor.add(new Meter(metrics.metricInstance(byteRate, tags), metrics.metricInstance(byteCount, tags)));

recordAgeSensor = metrics.sensor(prefix + "record-age");
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE, tags), new Value());
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MAX, tags), new Max());
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MIN, tags), new Min());
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_AVG, tags), new Avg());
recordAgeSensor.add(metrics.metricInstance(recordAge, tags), new Value());
recordAgeSensor.add(metrics.metricInstance(recordAgeMax, tags), new Max());
recordAgeSensor.add(metrics.metricInstance(recordAgeMin, tags), new Min());
recordAgeSensor.add(metrics.metricInstance(recordAgeAvg, tags), new Avg());

replicationLatencySensor = metrics.sensor(prefix + "replication-latency");
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY, tags), new Value());
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MAX, tags), new Max());
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MIN, tags), new Min());
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_AVG, tags), new Avg());
replicationLatencySensor.add(metrics.metricInstance(replicationLatency, tags), new Value());
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyMax, tags), new Max());
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyMin, tags), new Min());
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyAvg, tags), new Avg());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class MirrorSourceMetricsTest {

private final static String SOURCE = "source";
private final static String TARGET = "target";
private final static TopicPartition TP = new TopicPartition("topic", 0);
private final static TopicPartition SOURCE_TP = new TopicPartition(SOURCE + "." + TP.topic(), TP.partition());

private final Map<String, String> configs = new HashMap<>();
private TestReporter reporter;

@BeforeEach
public void setUp() {
configs.put(ConnectorConfig.NAME_CONFIG, "name");
configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, MirrorSourceConnector.class.getName());
configs.put(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS, SOURCE);
configs.put(MirrorConnectorConfig.TARGET_CLUSTER_ALIAS, TARGET);
configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS, TP.toString());
reporter = new TestReporter();
}

@Test
public void testTags() {
MirrorSourceTaskConfig taskConfig = new MirrorSourceTaskConfig(configs);
MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
metrics.addReporter(reporter);

metrics.countRecord(SOURCE_TP);
assertEquals(13, reporter.metrics.size());
Map<String, String> tags = reporter.metrics.get(0).metricName().tags();
assertEquals(TARGET, tags.get("target"));
assertEquals(SOURCE_TP.topic(), tags.get("topic"));
assertEquals(String.valueOf(SOURCE_TP.partition()), tags.get("partition"));
assertNull(tags.get("source"));
}

@Test
public void testTagsWithSourceAlias() {
configs.put(MirrorSourceConfig.ADD_SOURCE_ALIAS_TO_METRICS, "true");
MirrorSourceTaskConfig taskConfig = new MirrorSourceTaskConfig(configs);
MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
metrics.addReporter(reporter);

metrics.countRecord(SOURCE_TP);
assertEquals(13, reporter.metrics.size());
Map<String, String> tags = reporter.metrics.get(0).metricName().tags();
assertEquals(SOURCE, tags.get("source"));
assertEquals(TARGET, tags.get("target"));
assertEquals(SOURCE_TP.topic(), tags.get("topic"));
assertEquals(String.valueOf(SOURCE_TP.partition()), tags.get("partition"));
}

static class TestReporter implements MetricsReporter {

List<KafkaMetric> metrics = new ArrayList<>();

@Override
public void init(List<KafkaMetric> metrics) {
for (KafkaMetric metric : metrics) {
metricChange(metric);
}
}

@Override
public void metricChange(KafkaMetric metric) {
metrics.add(metric);
}

@Override
public void metricRemoval(KafkaMetric metric) {}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}
}

0 comments on commit 1d8f799

Please sign in to comment.