From 3d51af1f2ab72f7e877a098c2b602b760a3c185d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Oct 2018 17:20:43 -0700 Subject: [PATCH] Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present (#2805) * Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present * Fixed unit test --- .../prometheus/NamespaceStatsAggregator.java | 28 +++++++++++++++ .../broker/stats/PrometheusMetricsTest.java | 18 +++++----- .../proxy/server/DirectProxyHandler.java | 10 ++---- .../pulsar/proxy/server/ProxyConnection.java | 24 ++++--------- .../pulsar/proxy/server/ProxyService.java | 36 ++++++++++++++----- .../server/ProxyConnectionThrottlingTest.java | 4 +-- 6 files changed, 75 insertions(+), 45 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 8a727f2d397ea..e3a009129fad3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -48,6 +48,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b AggregatedNamespaceStats namespaceStats = localNamespaceStats.get(); TopicStats topicStats = localTopicStats.get(); + printDefaultBrokerStats(stream, cluster); + pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { namespaceStats.reset(); @@ -151,6 +153,23 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include }); } + private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) { + // Print metrics with 0 values. This is necessary to have the available brokers being + // reported in the brokers dashboard even if they don't have any topic or traffi + metric(stream, cluster, "pulsar_topics_count", 0); + metric(stream, cluster, "pulsar_subscriptions_count", 0); + metric(stream, cluster, "pulsar_producers_count", 0); + metric(stream, cluster, "pulsar_consumers_count", 0); + metric(stream, cluster, "pulsar_rate_in", 0); + metric(stream, cluster, "pulsar_rate_out", 0); + metric(stream, cluster, "pulsar_throughput_in", 0); + metric(stream, cluster, "pulsar_throughput_out", 0); + metric(stream, cluster, "pulsar_storage_size", 0); + metric(stream, cluster, "pulsar_storage_write_rate", 0); + metric(stream, cluster, "pulsar_storage_read_rate", 0); + metric(stream, cluster, "pulsar_msg_backlog", 0); + } + private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace, AggregatedNamespaceStats stats) { metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount); @@ -216,6 +235,15 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl } } + private static void metric(SimpleTextOutputStream stream, String cluster, String name, + long value) { + TopicStats.metricType(stream, name); + stream.write(name) + .write("{cluster=\"").write(cluster).write("\"} ") + .write(value).write(' ').write(System.currentTimeMillis()) + .write('\n'); + } + private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, long value) { TopicStats.metricType(stream, name); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 91bc0b826b322..90a4bd881da60 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -82,13 +82,13 @@ public void testPerTopicStats() throws Exception { assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); cm = (List) metrics.get("pulsar_producers_count"); - assertEquals(cm.size(), 2); - assertEquals(cm.get(0).value, 1.0); - assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); - assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.size(), 3); assertEquals(cm.get(1).value, 1.0); - assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(2).value, 1.0); + assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns"); cm = (List) metrics.get("topic_load_times_count"); assertEquals(cm.size(), 1); @@ -126,10 +126,10 @@ public void testPerNamespaceStats() throws Exception { assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); cm = (List) metrics.get("pulsar_producers_count"); - assertEquals(cm.size(), 1); - assertEquals(cm.get(0).value, 2.0); - assertEquals(cm.get(0).tags.get("topic"), null); - assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.size(), 2); + assertEquals(cm.get(1).value, 2.0); + assertEquals(cm.get(1).tags.get("topic"), null); + assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); p1.close(); p2.close(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index e6f0b5fcd0b9d..232db4307e298 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -63,12 +63,6 @@ public class DirectProxyHandler { private final Authentication authentication; - static final Counter opsCounter = Counter - .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register(); - - static final Counter bytesCounter = Counter - .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register(); - public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) { this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); @@ -176,9 +170,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce break; case HandshakeCompleted: - opsCounter.inc(); + ProxyService.opsCounter.inc(); if (msg instanceof ByteBuf) { - bytesCounter.inc(((ByteBuf) msg).readableBytes()); + ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes()); } inboundChannel.writeAndFlush(msg).addListener(this); break; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 42e4894ca2543..2ebea8a7745a0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -92,17 +92,7 @@ ConnectionPool getConnectionPool() { return client.getCnxPool(); } - private static final Gauge activeConnections = Gauge - .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create() - .register(); - private static final Counter newConnections = Counter - .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create() - .register(); - - static final Counter rejectedConnections = Counter - .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create() - .register(); public ProxyConnection(ProxyService proxyService) { super(30, TimeUnit.SECONDS); @@ -113,10 +103,10 @@ public ProxyConnection(ProxyService proxyService) { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); - activeConnections.inc(); - if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { + ProxyService.activeConnections.inc(); + if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { ctx.close(); - rejectedConnections.inc(); + ProxyService.rejectedConnections.inc(); return; } } @@ -124,13 +114,13 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); - activeConnections.dec(); + ProxyService.activeConnections.dec(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - newConnections.inc(); + ProxyService.newConnections.inc(); LOG.info("[{}] New connection opened", remoteAddress); } @@ -169,9 +159,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce case ProxyConnectionToBroker: // Pass the buffer to the outbound connection and schedule next read // only if we can write on the connection - DirectProxyHandler.opsCounter.inc(); + ProxyService.opsCounter.inc(); if (msg instanceof ByteBuf) { - DirectProxyHandler.bytesCounter.inc(((ByteBuf) msg).readableBytes()); + ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes()); } directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this); break; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 964cd1e8d9ef0..eaf5f04fe7d41 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -21,15 +21,22 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; + import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -40,13 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.AdaptiveRecvByteBufAllocator; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Pulsar proxy service */ @@ -72,6 +72,24 @@ public class ProxyService implements Closeable { private static final int numThreads = Runtime.getRuntime().availableProcessors(); + static final Gauge activeConnections = Gauge + .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create() + .register(); + + static final Counter newConnections = Counter + .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create() + .register(); + + static final Counter rejectedConnections = Counter + .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create() + .register(); + + static final Counter opsCounter = Counter + .build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register(); + + static final Counter bytesCounter = Counter + .build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register(); + public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService) throws IOException { checkNotNull(proxyConfig); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 3e28242092f62..20ebeb11872e2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -78,7 +78,7 @@ public void testInboundConnection() throws Exception { PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) .build(); Producer producer2; - Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d); + Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d); try { producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create(); producer2.send("Message 1".getBytes()); @@ -86,7 +86,7 @@ public void testInboundConnection() throws Exception { } catch (Exception ex) { // OK } - Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d); + Assert.assertEquals(ProxyService.rejectedConnections.get(), 1.0d); } private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);