Skip to content

Commit

Permalink
Add javadoc to NettyRemotingAbstract class and several other trivial …
Browse files Browse the repository at this point in the history
…clean up.
  • Loading branch information
lizhanhui authored and dongeforever committed Jun 6, 2017
1 parent 1e4307e commit 9d983ea
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,84 @@
import org.slf4j.LoggerFactory;

public abstract class NettyRemotingAbstract {

/**
* Remoting logger instance.
*/
private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreOneway;

/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreAsync;

/**
* This map caches all on-going requests.
*/
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);

/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();

/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
*/
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();

/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;

/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
* @param permitsOneway Number of permits for one-way requests.
* @param permitsAsync Number of permits for asynchronous requests.
*/
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}

/**
* Custom channel event listener.
* @return custom channel event listener if defined; null otherwise.
*/
public abstract ChannelEventListener getChannelEventListener();

/**
* Put a netty event to the executor.
* @param event Netty event instance.
*/
public void putNettyEvent(final NettyEvent event) {
this.nettyEventExecuter.putNettyEvent(event);
this.nettyEventExecutor.putNettyEvent(event);
}

/**
* Entry of incoming command processing.
*
* <p>
* <strong>Note:</strong>
* The incoming remoting command may be
* <ul>
* <li>An inquiry request from a remote peer component;</li>
* <li>A response to a previous request issued by this very participant.</li>
* </ul>
* </p>
* @param ctx Channel handler context.
* @param msg incoming remoting command.
* @throws Exception if there were any error while processing the incoming command.
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
Expand All @@ -90,6 +142,11 @@ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand ms
}
}

/**
* Process incoming request command issued by remote peer.
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
Expand Down Expand Up @@ -175,6 +232,11 @@ public void run() {
}
}

/**
* Process response from remote peer to the previous issued requests.
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
Expand All @@ -196,7 +258,10 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
}
}

//execute callback in callback executor. If callback executor is null, run directly in current thread
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
* @param responseFuture
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
Expand Down Expand Up @@ -229,10 +294,24 @@ public void run() {
}
}

/**
* Custom RPC hook.
* @return RPC hook if specified; null otherwise.
*/
public abstract RPCHook getRPCHook();

abstract public ExecutorService getCallbackExecutor();

/**
* This method specifies thread pool to use while invoking callback methods.
* @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the
* netty event-loop thread.
*/
public abstract ExecutorService getCallbackExecutor();

/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* </p>
*/
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
Expand Down Expand Up @@ -386,7 +465,7 @@ public void operationComplete(ChannelFuture f) throws Exception {
}
}

class NettyEventExecuter extends ServiceThread {
class NettyEventExecutor extends ServiceThread {
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;

Expand Down Expand Up @@ -436,7 +515,7 @@ public void run() {

@Override
public String getServiceName() {
return NettyEventExecuter.class.getSimpleName();
return NettyEventExecutor.class.getSimpleName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void run() {
}, 1000 * 3, 1000);

if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
this.nettyEventExecutor.start();
}
}

Expand All @@ -189,8 +189,8 @@ public void shutdown() {

this.eventLoopGroupWorker.shutdownGracefully();

if (this.nettyEventExecuter != null) {
this.nettyEventExecuter.shutdown();
if (this.nettyEventExecutor != null) {
this.nettyEventExecutor.shutdown();
}

if (this.defaultEventExecutorGroup != null) {
Expand Down Expand Up @@ -586,16 +586,15 @@ class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);

}
}

class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : localAddress.toString();
final String remote = remoteAddress == null ? "UNKNOWN" : remoteAddress.toString();
final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);

super.connect(ctx, remoteAddress, localAddress, promise);
Expand All @@ -613,7 +612,7 @@ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws
super.disconnect(ctx, promise);

if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}

Expand All @@ -625,7 +624,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
super.close(ctx, promise);

if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}

Expand All @@ -639,7 +638,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
Expand All @@ -654,7 +653,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void initChannel(SocketChannel ch) throws Exception {
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyConnectManageHandler(),
new NettyServerHandler());
}
});
Expand All @@ -178,7 +178,7 @@ public void initChannel(SocketChannel ch) throws Exception {
}

if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
this.nettyEventExecutor.start();
}

this.timer.scheduleAtFixedRate(new TimerTask() {
Expand All @@ -205,8 +205,8 @@ public void shutdown() {

this.eventLoopGroupSelector.shutdownGracefully();

if (this.nettyEventExecuter != null) {
this.nettyEventExecuter.shutdown();
if (this.nettyEventExecutor != null) {
this.nettyEventExecutor.shutdown();
}

if (this.defaultEventExecutorGroup != null) {
Expand Down Expand Up @@ -297,7 +297,7 @@ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) thro
}
}

class NettyConnetManageHandler extends ChannelDuplexHandler {
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
Expand All @@ -319,7 +319,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);

if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}

Expand All @@ -330,21 +330,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);

if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent evnet = (IdleStateEvent) evt;
if (evnet.state().equals(IdleState.ALL_IDLE)) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
Expand All @@ -359,7 +359,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}

RemotingUtil.closeChannel(ctx.channel());
Expand Down

0 comments on commit 9d983ea

Please sign in to comment.