Skip to content

Commit

Permalink
Client transport decides to support reusable by itself. (sofastack#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy authored May 22, 2018
1 parent d4af480 commit 109c1e9
Show file tree
Hide file tree
Showing 27 changed files with 713 additions and 404 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ protected void checkAlias(ProviderInfo providerInfo, SofaRequest message) {
*/
protected SofaResponse filterChain(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
RpcInternalContext context = RpcInternalContext.getContext();
context.setInterfaceConfig(consumerConfig);
context.setProviderInfo(providerInfo);
return filterChain.invoke(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,8 @@ protected void printSuccess(String interfaceId, ProviderInfo providerInfo, Clien
*/
protected void printFailure(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {
if (LOGGER.isInfoEnabled(consumerConfig.getAppName())) {
LOGGER.infoWithApp("Connect to {} provider:{} failure !", interfaceId, providerInfo);
LOGGER.infoWithApp(consumerConfig.getAppName(), "Connect to {} provider:{} failure !", interfaceId,
providerInfo);
}
}

Expand All @@ -715,7 +716,7 @@ protected void printFailure(String interfaceId, ProviderInfo providerInfo, Clien
protected void printDead(String interfaceId, ProviderInfo providerInfo, ClientTransport transport, Exception e) {
Throwable cause = e.getCause();
if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
LOGGER.warnWithApp(
LOGGER.warnWithApp(consumerConfig.getAppName(),
"Connect to {} provider:{} failure !! The exception is " + ExceptionUtils.toShortString(e, 1)
+ (cause != null ? ", cause by " + cause.getMessage() + "." : "."),
interfaceId, providerInfo);
Expand Down Expand Up @@ -833,7 +834,7 @@ private void doReconnect() {
}
ClientTransport transport = entry.getValue();
if (LOGGER.isDebugEnabled(appName)) {
LOGGER.debugWithApp("Retry connect to {} provider:{} ...", interfaceId, providerInfo);
LOGGER.debugWithApp(appName, "Retry connect to {} provider:{} ...", interfaceId, providerInfo);
}
try {
transport.connect();
Expand All @@ -844,12 +845,12 @@ private void doReconnect() {
} catch (Exception e) {
if (print) {
if (LOGGER.isWarnEnabled(appName)) {
LOGGER.warnWithApp("Retry connect to {} provider:{} error ! The exception is " + e
LOGGER.warnWithApp(appName, "Retry connect to {} provider:{} error ! The exception is " + e
.getMessage(), interfaceId, providerInfo);
}
} else {
if (LOGGER.isDebugEnabled(appName)) {
LOGGER.debugWithApp("Retry connect to {} provider:{} error ! The exception is " + e
LOGGER.debugWithApp(appName, "Retry connect to {} provider:{} error ! The exception is " + e
.getMessage(), interfaceId, providerInfo);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.SystemInfo;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
Expand Down Expand Up @@ -175,8 +174,8 @@ public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcExc
} finally {
afterSend(context, request);
if (EventBus.isEnable(ClientSyncReceiveEvent.class)) {
EventBus.post(new ClientSyncReceiveEvent((ConsumerConfig) context.getInterfaceConfig(),
context.getProviderInfo(), request, response, throwable));
EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(),
transportConfig.getProviderInfo(), request, response, throwable));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import com.alipay.sofa.rpc.common.RpcConfigs;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.common.annotation.Unstable;
import com.alipay.sofa.rpc.common.struct.StopWatch;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
import com.alipay.sofa.rpc.message.ResponseFuture;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -150,48 +148,40 @@ protected RpcInternalContext() {
/**
* The Future.
*/
private ResponseFuture<?> future;
private ResponseFuture<?> future;

/**
* The Local address.
*/
private InetSocketAddress localAddress;
private InetSocketAddress localAddress;

/**
* The Remote address.
*/
private InetSocketAddress remoteAddress;
private InetSocketAddress remoteAddress;

/**
* 附带属性功能,遵循谁使用谁清理的原则。Key必须为 "_" 和 "."开头<br>
* 如果关闭了 {@link #ATTACHMENT_ENABLE} 功能,"_" 开头的Key将不被保持和传递。
*
* @see #ATTACHMENT_ENABLE
*/
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();

/**
* The Stopwatch
*/
private StopWatch stopWatch = new StopWatch();
private StopWatch stopWatch = new StopWatch();

/**
* The Provider side.
*/
private Boolean providerSide;
private Boolean providerSide;

/**
* 要调用的服务端信息
*/
private ProviderInfo providerInfo;

/**
* 发起调用的客户端信息
*
* @since 5.3.3
*/
@Unstable
private AbstractInterfaceConfig interfaceConfig;
private ProviderInfo providerInfo;

/**
* Is provider side.
Expand Down Expand Up @@ -432,7 +422,7 @@ public StopWatch getStopWatch() {
*/
public void clear() {
this.setRemoteAddress(null).setLocalAddress(null).setFuture(null).setProviderSide(null)
.setProviderInfo(null).setInterfaceConfig(null);
.setProviderInfo(null);
this.attachments = new ConcurrentHashMap<String, Object>();
this.stopWatch.reset();
}
Expand All @@ -457,30 +447,6 @@ public ProviderInfo getProviderInfo() {
return providerInfo;
}

/**
* Gets interface config.
*
* @return the config
* @since 5.3.3
*/
@Unstable
public AbstractInterfaceConfig getInterfaceConfig() {
return interfaceConfig;
}

/**
* Sets interface config.
*
* @param interfaceConfig the interface config
* @return the config
* @since 5.3.3
*/
@Unstable
public RpcInternalContext setInterfaceConfig(AbstractInterfaceConfig interfaceConfig) {
this.interfaceConfig = interfaceConfig;
return this;
}

@Override
public String toString() {
return super.toString() + "{" +
Expand All @@ -491,7 +457,6 @@ public String toString() {
", stopWatch=" + stopWatch +
", providerSide=" + providerSide +
", providerInfo=" + providerInfo +
", interfaceConfig=" + interfaceConfig +
'}';
}

Expand All @@ -507,7 +472,6 @@ public RpcInternalContext clone() {
context.stopWatch = this.stopWatch.clone();
context.providerSide = this.providerSide;
context.providerInfo = this.providerInfo;
context.interfaceConfig = this.interfaceConfig;
context.attachments.putAll(this.attachments);
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,20 @@ public ClientTransportConfig setChannelListeners(List<ChannelListener> channelLi
this.channelListeners = channelListeners;
return this;
}

@Override
public String toString() {
return super.toString() + "{" +
"consumerConfig=" + consumerConfig +
", providerInfo=" + providerInfo +
", container='" + container + '\'' +
", connectTimeout=" + connectTimeout +
", disconnectTimeout=" + disconnectTimeout +
", invokeTimeout=" + invokeTimeout +
", connectionNum=" + connectionNum +
", payload=" + payload +
", useEpoll=" + useEpoll +
", channelListeners=" + channelListeners +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.rpc.transport;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.annotation.VisibleForTesting;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;
import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory;
Expand All @@ -25,9 +26,6 @@

import java.util.concurrent.ConcurrentHashMap;

import static com.alipay.sofa.rpc.common.RpcConfigs.getBooleanValue;
import static com.alipay.sofa.rpc.common.RpcOptions.TRANSPORT_CONNECTION_REUSE;

/**
* Factory of ClientTransport
*
Expand All @@ -41,25 +39,9 @@ public class ClientTransportFactory {
.getLogger(ClientTransportFactory.class);

/**
* 是否长连接复用
*/
private final static boolean CHANNEL_REUSE = getBooleanValue(TRANSPORT_CONNECTION_REUSE);

/**
* 长连接过滤器
*/
private final static ClientTransportHolder CLIENT_TRANSPORT_HOLDER = CHANNEL_REUSE ? new ReusableClientTransportHolder()
: new NotReusableClientTransportHolder();

/**
* 通过配置获取长连接
*
* @param config 传输层配置
* @return 传输层
* 不可复用长连接管理器
*/
public static ClientTransport getClientTransport(ClientTransportConfig config) {
return CLIENT_TRANSPORT_HOLDER.getClientTransport(config);
}
private final static ClientTransportHolder CLIENT_TRANSPORT_HOLDER = new NotReusableClientTransportHolder();

/**
* 销毁长连接
Expand Down Expand Up @@ -107,6 +89,16 @@ public static void releaseTransport(ClientTransport clientTransport, int disconn
}
}

/**
* 通过配置获取长连接
*
* @param config 传输层配置
* @return 传输层
*/
public static ClientTransport getClientTransport(ClientTransportConfig config) {
return CLIENT_TRANSPORT_HOLDER.getClientTransport(config);
}

/**
* 关闭全部客户端连接
*/
Expand All @@ -121,6 +113,7 @@ public static void closeAll() {
}
}

@VisibleForTesting
static ClientTransportHolder getClientTransportHolder() {
return CLIENT_TRANSPORT_HOLDER;
}
Expand Down
Loading

0 comments on commit 109c1e9

Please sign in to comment.