Skip to content

Commit

Permalink
[functions] Added default metrics for Prometheus (apache#5885)
Browse files Browse the repository at this point in the history
Currently functions do not report the default Prometheus metrics
like the broker does, or any JMX metrics that normally get
reported by kafka clients.

This add the default Prometheus exports used by the broker as
well as JMX exports that are reported by kafka clients when
using the kafka wrapper.

This change is a trivial rework / code cleanup without any test coverage.
  • Loading branch information
rivernate authored and sijie committed Dec 24, 2019
1 parent feeaa30 commit 602f518
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 0 deletions.
1 change: 1 addition & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ The Apache Software License, Version 2.0
- io.prometheus-simpleclient_servlet-0.5.0.jar
- io.prometheus-simpleclient_log4j2-0.5.0.jar
- io.prometheus-simpleclient_jetty-0.5.0.jar
- io.prometheus.jmx-collector-0.12.0.jar
- io.prometheus-simpleclient_caffeine-0.5.0.jar
* Bean Validation API -- javax.validation-validation-api-1.1.0.Final.jar
* Log4J
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ flexible messaging model and an intuitive client API.</description>
<hbase.version>1.4.9</hbase.version>
<guava.version>25.1-jre</guava.version>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.12.0</prometheus-jmx.version>

<!-- test dependencies -->
<cassandra.version>3.6.0</cassandra.version>
Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@
<version>${prometheus.version}</version>
</dependency>

<dependency>
<groupId>io.prometheus.jmx</groupId>
<artifactId>collector</artifactId>
<version>${prometheus-jmx.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
import io.grpc.stub.StreamObserver;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.BufferPoolsExports;
import io.prometheus.client.hotspot.ClassLoadingExports;
import io.prometheus.client.hotspot.GarbageCollectorExports;
import io.prometheus.client.hotspot.MemoryPoolsExports;
import io.prometheus.client.hotspot.StandardExports;
import io.prometheus.client.hotspot.ThreadExports;
import io.prometheus.client.hotspot.VersionInfoExports;
import io.prometheus.jmx.JmxCollector;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
Expand All @@ -44,6 +52,7 @@
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.Reflections;

import javax.management.MalformedObjectNameException;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Map;
Expand Down Expand Up @@ -176,6 +185,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL

// Collector Registry for prometheus metrics
CollectorRegistry collectorRegistry = new CollectorRegistry();
registerDefaultCollectors(collectorRegistry);

containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
stateStorageServiceUrl,
Expand Down Expand Up @@ -234,6 +244,23 @@ public void run() {
close();
}

private void registerDefaultCollectors(CollectorRegistry registry) {
// Add the JMX exporter for functionality similar to the kafka connect JMX metrics
try {
new JmxCollector("{}").register(registry);
} catch (MalformedObjectNameException ex) {
System.err.println(ex);
}
// Add the default exports from io.prometheus.client.hotspot.DefaultExports
new StandardExports().register(registry);
new MemoryPoolsExports().register(registry);
new BufferPoolsExports().register(registry);
new GarbageCollectorExports().register(registry);
new ThreadExports().register(registry);
new ClassLoadingExports().register(registry);
new VersionInfoExports().register(registry);
}

private static boolean isTrue(String param) {
return Boolean.TRUE.toString().equals(param);
}
Expand Down

0 comments on commit 602f518

Please sign in to comment.