Skip to content

Commit

Permalink
Added stats for proxy operations and bytes (apache#2740)
Browse files Browse the repository at this point in the history
* Added stats for proxy operations and bytes

* Added _binary marker in metrics name
  • Loading branch information
merlimat authored Oct 18, 2018
1 parent 3bde91d commit 1fe6395
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.prometheus.client.Counter;

public class DirectProxyHandler {

Expand All @@ -62,6 +63,12 @@ public class DirectProxyHandler {

private final Authentication authentication;

static final Counter opsCounter = Counter
.build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();

static final Counter bytesCounter = Counter
.build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();

public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
Expand Down Expand Up @@ -169,6 +176,10 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
break;

case HandshakeCompleted:
opsCounter.inc();
if (msg instanceof ByteBuf) {
bytesCounter.inc(((ByteBuf) msg).readableBytes());
}
inboundChannel.writeAndFlush(msg).addListener(this);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslHandler;
Expand Down Expand Up @@ -144,7 +145,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (client != null) {
client.close();
}

LOG.info("[{}] Connection closed", remoteAddress);
}

Expand All @@ -167,8 +168,11 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce

case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read
// only
// if we can write on the connection
// only if we can write on the connection
DirectProxyHandler.opsCounter.inc();
if (msg instanceof ByteBuf) {
DirectProxyHandler.bytesCounter.inc(((ByteBuf) msg).readableBytes());
}
directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
break;

Expand Down Expand Up @@ -291,7 +295,7 @@ private boolean authenticateAndCreateClient(CommandConnect connect) {
service.getWorkerGroup())));
return true;
}

String authMethod = "none";
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
Expand Down

0 comments on commit 1fe6395

Please sign in to comment.