Skip to content

Commit

Permalink
Exposing prometheus metrics for Pulsar function local run mode (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored Apr 7, 2021
1 parent 7b35328 commit 45661ea
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -81,6 +82,7 @@
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -411,6 +413,7 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);

functionConfig.setJar(jarFilePathUrl);
int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
.functionConfig(functionConfig)
Expand All @@ -420,6 +423,7 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
.tlsAllowInsecureConnection(true)
.tlsHostNameVerificationEnabled(false)
.metricsPortStart(metricsPort)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
localRunner.start(false);

Expand Down Expand Up @@ -463,6 +467,21 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
totalMsgs);

// validate prometheus metrics
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
log.info("prometheus metrics: {}", prometheusMetrics);

Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
assertFalse(metrics.isEmpty());

PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, 5.0);

// stop functions
localRunner.stop();

Expand Down Expand Up @@ -771,6 +790,7 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
}

sinkConfig.setArchive(jarFilePathUrl);
int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
.sinkConfig(sinkConfig)
Expand All @@ -782,6 +802,7 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
.tlsHostNameVerificationEnabled(false)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
.connectorsDirectory(workerConfig.getConnectorsDirectory())
.metricsPortStart(metricsPort)
.build();

localRunner.start(false);
Expand Down Expand Up @@ -819,6 +840,21 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
}
}, 5, 200);

// validate prometheus metrics
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
log.info("prometheus metrics: {}", prometheusMetrics);

Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
assertFalse(metrics.isEmpty());

PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 10.0);

// stop sink
localRunner.stop();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;

import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkArgument;

@Slf4j
public class PulsarFunctionTestUtils {
public static String getPrometheusMetrics(int metricsPort) throws IOException {
StringBuilder result = new StringBuilder();
URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line + System.lineSeparator());
}
rd.close();
return result.toString();
}

/**
* Hacky parsing of Prometheus text format. Sould be good enough for unit tests
*/
public static Map<String, Metric> parseMetrics(String metrics) {
final Map<String, Metric> parsed = new HashMap<>();
// Example of lines are
// jvm_threads_current{cluster="standalone",} 203.0
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
Pattern pattern = Pattern.compile("^(\\w+)(\\{[^\\}]+\\})?\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
Arrays.asList(metrics.split("\n")).forEach(line -> {
if (line.isEmpty() || line.startsWith("#")) {
return;
}
Matcher matcher = pattern.matcher(line);
log.info("line: {}", line);
checkArgument(matcher.matches());
String name = matcher.group(1);
Metric m = new Metric();
String numericValue = matcher.group(3);
if (numericValue.equalsIgnoreCase("-Inf")) {
m.value = Double.NEGATIVE_INFINITY;
} else if (numericValue.equalsIgnoreCase("+Inf")) {
m.value = Double.POSITIVE_INFINITY;
} else {
m.value = Double.parseDouble(numericValue);
}
String tags = matcher.group(2);
if (tags != null) {
tags = tags.replace("{", "").replace("}", "");
Matcher tagsMatcher = tagsPattern.matcher(tags);
while (tagsMatcher.find()) {
String tag = tagsMatcher.group(1);
String value = tagsMatcher.group(2);
m.tags.put(tag, value);
}
}
parsed.put(name, m);
});

log.info("parsed metrics: {}", parsed);
return parsed;
}

@ToString
public static class Metric {
public final Map<String, String> tags = new TreeMap<>();
public double value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,64 +297,4 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerService.init(workerConfig, null, false);
return workerService;
}

protected static String getPrometheusMetrics(int metricsPort) throws IOException {
StringBuilder result = new StringBuilder();
URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line + System.lineSeparator());
}
rd.close();
return result.toString();
}

/**
* Hacky parsing of Prometheus text format. Sould be good enough for unit tests
*/
protected static Map<String, Metric> parseMetrics(String metrics) {
final Map<String, Metric> parsed = new HashMap<>();
// Example of lines are
// jvm_threads_current{cluster="standalone",} 203.0
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
Arrays.asList(metrics.split("\n")).forEach(line -> {
if (line.isEmpty() || line.startsWith("#")) {
return;
}
Matcher matcher = pattern.matcher(line);
checkArgument(matcher.matches());
String name = matcher.group(1);
Metric m = new Metric();
String numericValue = matcher.group(3);
if (numericValue.equalsIgnoreCase("-Inf")) {
m.value = Double.NEGATIVE_INFINITY;
} else if (numericValue.equalsIgnoreCase("+Inf")) {
m.value = Double.POSITIVE_INFINITY;
} else {
m.value = Double.parseDouble(numericValue);
}
String tags = matcher.group(2);
Matcher tagsMatcher = tagsPattern.matcher(tags);
while (tagsMatcher.find()) {
String tag = tagsMatcher.group(1);
String value = tagsMatcher.group(2);
m.tags.put(tag, value);
}
parsed.put(name, m);
});
return parsed;
}

@ToString
static class Metric {
final Map<String, String> tags = new TreeMap<>();
double value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.utils.FunctionCommon;

import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -101,11 +102,11 @@ private void testPulsarBatchSourceStats(String jarFilePathUrl) throws Exception
}, 50, 150);
assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);

String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheusMetrics: {}", prometheusMetrics);

Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
Metric m = metrics.get("pulsar_source_received_total");
Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), sourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -48,6 +49,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
Expand All @@ -57,6 +59,7 @@
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -320,11 +323,11 @@ public void testPulsarFunctionStats() throws Exception {
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());

// validate prometheus metrics empty
String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheus metrics: {}", prometheusMetrics);

Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
Metric m = metrics.get("pulsar_function_received_total");
Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
Expand Down Expand Up @@ -478,10 +481,10 @@ public void testPulsarFunctionStats() throws Exception {
assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics());

// validate prometheus metrics
prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheus metrics: {}", prometheusMetrics);

metrics = parseMetrics(prometheusMetrics);
metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
m = metrics.get("pulsar_function_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
Expand Down Expand Up @@ -927,5 +930,4 @@ public void testFunctionAutomaticSubCleanup() throws Exception {
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -107,9 +108,9 @@ public void testReadCompactedSink() throws Exception {
// 5 Sink should only read compacted value,so we will only receive compacted messages
retryStrategically((test) -> {
try {
String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
Metric m = metrics.get("pulsar_sink_received_total");
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
return m.value == (double) maxKeys;
} catch (Exception e) {
return false;
Expand Down Expand Up @@ -249,11 +250,11 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 523);

// validate prometheus metrics empty
String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheus metrics: {}", prometheusMetrics);

Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
Metric m = metrics.get("pulsar_sink_received_total");
Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), sinkName);
Expand Down Expand Up @@ -332,10 +333,10 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
}, 5, 200);

// get stats after producing
prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheusMetrics: {}", prometheusMetrics);

metrics = parseMetrics(prometheusMetrics);
metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
Expand Down
Loading

0 comments on commit 45661ea

Please sign in to comment.