Skip to content

Commit

Permalink
Ensure proxy and broker stats are reported to Prometheus (with 0) ev…
Browse files Browse the repository at this point in the history
…en when no traffic is present (apache#2805)

* Ensure proxy and broker stats are reported to Prometheus (with 0) even when no traffic is present

* Fixed unit test
  • Loading branch information
merlimat authored Oct 19, 2018
1 parent 4d16522 commit 3d51af1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats topicStats = localTopicStats.get();

printDefaultBrokerStats(stream, cluster);

pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();

Expand Down Expand Up @@ -151,6 +153,23 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
});
}

private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
// Print metrics with 0 values. This is necessary to have the available brokers being
// reported in the brokers dashboard even if they don't have any topic or traffi
metric(stream, cluster, "pulsar_topics_count", 0);
metric(stream, cluster, "pulsar_subscriptions_count", 0);
metric(stream, cluster, "pulsar_producers_count", 0);
metric(stream, cluster, "pulsar_consumers_count", 0);
metric(stream, cluster, "pulsar_rate_in", 0);
metric(stream, cluster, "pulsar_rate_out", 0);
metric(stream, cluster, "pulsar_throughput_in", 0);
metric(stream, cluster, "pulsar_throughput_out", 0);
metric(stream, cluster, "pulsar_storage_size", 0);
metric(stream, cluster, "pulsar_storage_write_rate", 0);
metric(stream, cluster, "pulsar_storage_read_rate", 0);
metric(stream, cluster, "pulsar_msg_backlog", 0);
}

private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
AggregatedNamespaceStats stats) {
metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
Expand Down Expand Up @@ -216,6 +235,15 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
}
}

private static void metric(SimpleTextOutputStream stream, String cluster, String name,
long value) {
TopicStats.metricType(stream, name);
stream.write(name)
.write("{cluster=\"").write(cluster).write("\"} ")
.write(value).write(' ').write(System.currentTimeMillis())
.write('\n');
}

private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
long value) {
TopicStats.metricType(stream, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ public void testPerTopicStats() throws Exception {
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_producers_count");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).value, 1.0);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.size(), 3);
assertEquals(cm.get(1).value, 1.0);
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(2).value, 1.0);
assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("topic_load_times_count");
assertEquals(cm.size(), 1);
Expand Down Expand Up @@ -126,10 +126,10 @@ public void testPerNamespaceStats() throws Exception {
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_producers_count");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, 2.0);
assertEquals(cm.get(0).tags.get("topic"), null);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.size(), 2);
assertEquals(cm.get(1).value, 2.0);
assertEquals(cm.get(1).tags.get("topic"), null);
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

p1.close();
p2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ public class DirectProxyHandler {

private final Authentication authentication;

static final Counter opsCounter = Counter
.build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();

static final Counter bytesCounter = Counter
.build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();

public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
Expand Down Expand Up @@ -176,9 +170,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
break;

case HandshakeCompleted:
opsCounter.inc();
ProxyService.opsCounter.inc();
if (msg instanceof ByteBuf) {
bytesCounter.inc(((ByteBuf) msg).readableBytes());
ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
}
inboundChannel.writeAndFlush(msg).addListener(this);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,7 @@ ConnectionPool getConnectionPool() {
return client.getCnxPool();
}

private static final Gauge activeConnections = Gauge
.build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
.register();

private static final Counter newConnections = Counter
.build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
.register();

static final Counter rejectedConnections = Counter
.build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
.register();

public ProxyConnection(ProxyService proxyService) {
super(30, TimeUnit.SECONDS);
Expand All @@ -113,24 +103,24 @@ public ProxyConnection(ProxyService proxyService) {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
activeConnections.inc();
if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ProxyService.activeConnections.inc();
if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
rejectedConnections.inc();
ProxyService.rejectedConnections.inc();
return;
}
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
activeConnections.dec();
ProxyService.activeConnections.dec();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
newConnections.inc();
ProxyService.newConnections.inc();
LOG.info("[{}] New connection opened", remoteAddress);
}

Expand Down Expand Up @@ -169,9 +159,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read
// only if we can write on the connection
DirectProxyHandler.opsCounter.inc();
ProxyService.opsCounter.inc();
if (msg instanceof ByteBuf) {
DirectProxyHandler.bytesCounter.inc(((ByteBuf) msg).readableBytes());
ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes());
}
directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
Expand All @@ -40,13 +47,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;

/**
* Pulsar proxy service
*/
Expand All @@ -72,6 +72,24 @@ public class ProxyService implements Closeable {

private static final int numThreads = Runtime.getRuntime().availableProcessors();

static final Gauge activeConnections = Gauge
.build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
.register();

static final Counter newConnections = Counter
.build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
.register();

static final Counter rejectedConnections = Counter
.build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
.register();

static final Counter opsCounter = Counter
.build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();

static final Counter bytesCounter = Counter
.build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();

public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws IOException {
checkNotNull(proxyConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ public void testInboundConnection() throws Exception {
PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
.build();
Producer<byte[]> producer2;
Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d);
try {
producer2 = client2.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
producer2.send("Message 1".getBytes());
Assert.fail("Should have failed since max num of connections is 2 and the first producer used them all up - one for discovery and other for producing.");
} catch (Exception ex) {
// OK
}
Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
Assert.assertEquals(ProxyService.rejectedConnections.get(), 1.0d);
}

private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
Expand Down

0 comments on commit 3d51af1

Please sign in to comment.