Skip to content

Commit

Permalink
Refactor TmRpcClient & RmClient for common use. (apache#1105)
Browse files Browse the repository at this point in the history
* for checkstyle.

* for checkstyle.

* simplify AbstractRpcRemotingClient.

* refactor AbstractRPcRemotingClient.

* add ClientPoolKeyGenerator & implement RMClientPoolKeyGenerator.

* add RmClientKeyGenerator.

* revise ClientPoolKeyGenerator.

* add NettyClientChannelManager to refactor AbstractRpcRemotingClient.

* fix unit test error.

* for checkstyle.

* add RmRpcClientTest.

* add mockito-junit-jupiter to integrate with mockito annotation.

* add unit test for NettyClientChannelManager.

* add header for test file.
  • Loading branch information
cherrylzhao authored and leizhiyuan committed Jun 10, 2019
1 parent 3c1f36b commit 9ee5c32
Show file tree
Hide file tree
Showing 12 changed files with 1,108 additions and 1,001 deletions.
24 changes: 24 additions & 0 deletions core/src/main/java/io/seata/core/rpc/RemotingClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.rpc;

/**
* The interface remoting client.
*
* @author zhaojun
*/
public interface RemotingClient extends RemotingService {
}
61 changes: 27 additions & 34 deletions core/src/main/java/io/seata/core/rpc/netty/AbstractRpcRemoting.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@
*/
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeMessage;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.Disposable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.SocketAddress;
Expand All @@ -32,25 +50,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeMessage;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.Disposable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type Abstract rpc remoting.
*
Expand All @@ -72,12 +71,11 @@ public abstract class AbstractRpcRemoting extends ChannelDuplexHandler implement
/**
* The Futures.
*/
protected final ConcurrentHashMap<Long, MessageFuture> futures = new ConcurrentHashMap<Long, MessageFuture>();
protected final ConcurrentHashMap<Long, MessageFuture> futures = new ConcurrentHashMap<>();
/**
* The Basket map.
*/
protected final ConcurrentHashMap<String, BlockingQueue<RpcMessage>> basketMap
= new ConcurrentHashMap<String, BlockingQueue<RpcMessage>>();
protected final ConcurrentHashMap<String, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();

private static final long NOT_WRITEABLE_CHECK_MILLS = 10L;
/**
Expand All @@ -98,7 +96,7 @@ public abstract class AbstractRpcRemoting extends ChannelDuplexHandler implement
/**
* The Merge msg map.
*/
protected final Map<Long, MergeMessage> mergeMsgMap = new ConcurrentHashMap<Long, MergeMessage>();
protected final Map<Long, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();
/**
* The Channel handlers.
*/
Expand All @@ -121,13 +119,11 @@ public void init() {
@Override
public void run() {
List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

for (MessageFuture future : futures.values()) {
if (future.isTimeout()) {
timeoutMessageFutures.add(future);
}
}

for (MessageFuture messageFuture : timeoutMessageFutures) {
futures.remove(messageFuture.getRequestMessage().getId());
messageFuture.setResultMessage(null);
Expand All @@ -146,6 +142,7 @@ public void run() {
@Override
public void destroy() {
timerExecutor.shutdown();
messageExecutor.shutdown();
}

@Override
Expand All @@ -162,14 +159,13 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) {
/**
* Send async request with response object.
*
* @param address the address
* @param channel the channel
* @param msg the msg
* @return the object
* @throws TimeoutException the timeout exception
*/
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg) throws TimeoutException {
return sendAsyncRequestWithResponse(address, channel, msg, NettyClientConfig.getRpcRequestTimeout());
protected Object sendAsyncRequestWithResponse(Channel channel, Object msg) throws TimeoutException {
return sendAsyncRequestWithResponse(null, channel, msg, NettyClientConfig.getRpcRequestTimeout());
}

/**
Expand All @@ -193,15 +189,14 @@ protected Object sendAsyncRequestWithResponse(String address, Channel channel, O
/**
* Send async request without response object.
*
* @param address the address
* @param channel the channel
* @param msg the msg
* @return the object
* @throws TimeoutException the timeout exception
*/
protected Object sendAsyncRequestWithoutResponse(String address, Channel channel, Object msg) throws
protected Object sendAsyncRequestWithoutResponse(Channel channel, Object msg) throws
TimeoutException {
return sendAsyncRequest(address, channel, msg, 0);
return sendAsyncRequest(null, channel, msg, 0);
}

private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
Expand All @@ -226,7 +221,7 @@ private Object sendAsyncRequest(String address, Channel channel, Object msg, lon
ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
BlockingQueue<RpcMessage> basket = map.get(address);
if (basket == null) {
map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
map.putIfAbsent(address, new LinkedBlockingQueue<>());
basket = map.get(address);
}
basket.offer(rpcMessage);
Expand Down Expand Up @@ -513,6 +508,4 @@ protected String getAddressFromChannel(Channel channel) {
}
return address;
}


}
Loading

0 comments on commit 9ee5c32

Please sign in to comment.