Skip to content

Commit

Permalink
Fix get wrong ProviderInfo when use reuses client transport. (sofasta…
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy authored May 9, 2018
1 parent 855ef18 commit e5e6a67
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,9 @@ protected void checkAlias(ProviderInfo providerInfo, SofaRequest message) {
* @throws SofaRpcException 请求RPC异常
*/
protected SofaResponse filterChain(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
RpcInternalContext.getContext().setProviderInfo(providerInfo);
RpcInternalContext context = RpcInternalContext.getContext();
context.setInterfaceConfig(consumerConfig);
context.setProviderInfo(providerInfo);
return filterChain.invoke(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.SystemInfo;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.event.ClientBeforeSendEvent;
import com.alipay.sofa.rpc.event.ClientSyncReceiveEvent;
import com.alipay.sofa.rpc.event.EventBus;
import com.alipay.sofa.rpc.message.ResponseFuture;

import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -149,17 +153,31 @@ public ResponseFuture asyncSend(SofaRequest message, int timeout) throws SofaRpc
@Override
public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcException {
RpcInternalContext context = RpcInternalContext.getContext();
SofaResponse response = null;
SofaRpcException throwable = null;
try {
beforeSend(context, request);
return doInvokeSync(request, timeout);
if (EventBus.isEnable(ClientBeforeSendEvent.class)) {
EventBus.post(new ClientBeforeSendEvent(request));
}
response = doInvokeSync(request, timeout);
return response;
} catch (InvocationTargetException e) {
throw convertToRpcException(e);
throwable = convertToRpcException(e);
throw throwable;
} catch (SofaRpcException e) {
throwable = e;
throw e;
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Fail to send message to remote", e);
throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
"Failed to send message to remote", e);
throw throwable;
} finally {
afterSend(context, request);
if (EventBus.isEnable(ClientSyncReceiveEvent.class)) {
EventBus.post(new ClientSyncReceiveEvent((ConsumerConfig) context.getInterfaceConfig(),
context.getProviderInfo(), request, response, throwable));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.alipay.sofa.rpc.common.RpcConfigs;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.common.annotation.Unstable;
import com.alipay.sofa.rpc.common.struct.StopWatch;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
import com.alipay.sofa.rpc.message.ResponseFuture;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -148,40 +150,48 @@ protected RpcInternalContext() {
/**
* The Future.
*/
private ResponseFuture<?> future;
private ResponseFuture<?> future;

/**
* The Local address.
*/
private InetSocketAddress localAddress;
private InetSocketAddress localAddress;

/**
* The Remote address.
*/
private InetSocketAddress remoteAddress;
private InetSocketAddress remoteAddress;

/**
* 附带属性功能,遵循谁使用谁清理的原则。Key必须为 "_" 和 "."开头<br>
* 如果关闭了 {@link #ATTACHMENT_ENABLE} 功能,"_" 开头的Key将不被保持和传递。
*
* @see #ATTACHMENT_ENABLE
*/
private Map<String, Object> attachments = new HashMap<String, Object>();
private Map<String, Object> attachments = new HashMap<String, Object>();

/**
* The Stopwatch
*/
private StopWatch stopWatch = new StopWatch();
private StopWatch stopWatch = new StopWatch();

/**
* The Provider side.
*/
private Boolean providerSide;
private Boolean providerSide;

/**
* 要调用的服务端信息
*/
private ProviderInfo providerInfo;
private ProviderInfo providerInfo;

/**
* 发起调用的客户端信息
*
* @since 5.3.3
*/
@Unstable
private AbstractInterfaceConfig interfaceConfig;

/**
* Is provider side.
Expand Down Expand Up @@ -422,8 +432,8 @@ public StopWatch getStopWatch() {
* Clear context for next user
*/
public void clear() {
this.setRemoteAddress(null).setLocalAddress(null).setFuture(null).setProviderSide(false)
.setProviderInfo(null);
this.setRemoteAddress(null).setLocalAddress(null).setFuture(null).setProviderSide(null)
.setProviderInfo(null).setInterfaceConfig(null);
this.attachments = new HashMap<String, Object>();
this.stopWatch.reset();
}
Expand All @@ -448,6 +458,30 @@ public ProviderInfo getProviderInfo() {
return providerInfo;
}

/**
* Gets interface config.
*
* @return the config
* @since 5.3.3
*/
@Unstable
public AbstractInterfaceConfig getInterfaceConfig() {
return interfaceConfig;
}

/**
* Sets interface config.
*
* @param interfaceConfig the interface config
* @return the config
* @since 5.3.3
*/
@Unstable
public RpcInternalContext setInterfaceConfig(AbstractInterfaceConfig interfaceConfig) {
this.interfaceConfig = interfaceConfig;
return this;
}

@Override
public String toString() {
return "RpcInternalContext{" +
Expand All @@ -458,6 +492,7 @@ public String toString() {
", stopWatch=" + stopWatch +
", providerSide=" + providerSide +
", providerInfo=" + providerInfo +
", interfaceConfig=" + interfaceConfig +
'}';
}

Expand All @@ -473,6 +508,7 @@ public RpcInternalContext clone() {
context.stopWatch = this.stopWatch.clone();
context.providerSide = this.providerSide;
context.providerInfo = this.providerInfo;
context.interfaceConfig = this.interfaceConfig;
context.attachments.putAll(this.attachments);
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@
*/
package com.alipay.sofa.rpc.context;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.message.ResponseFuture;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
*
*
Expand Down Expand Up @@ -64,4 +73,63 @@ public void testPop() {
Assert.assertEquals(RpcInternalContext.getContext().getRemoteAddress().toString(), "127.0.0.1:12200");
}

@Test
public void testClear() {
RpcInternalContext context = RpcInternalContext.getContext();
context.setRemoteAddress("127.0.0.1", 1234);
context.setLocalAddress("127.0.0.1", 2345);
context.setFuture(new ResponseFuture<String>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return false;
}

@Override
public String get() throws InterruptedException, ExecutionException {
return null;
}

@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return null;
}

@Override
public ResponseFuture addListeners(List<SofaResponseCallback> sofaResponseCallbacks) {
return null;
}

@Override
public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
return null;
}
});

context.setProviderInfo(ProviderInfo.valueOf("127.0.0.1:80"));
context.setInterfaceConfig(new ProviderConfig());
context.setAttachment("_xxxx", "yyyy");

context.clear();
Assert.assertNull(context.getRemoteAddress());
Assert.assertNull(context.getLocalAddress());
Assert.assertNull(context.getFuture());
Assert.assertFalse(context.isProviderSide());
Assert.assertFalse(context.isConsumerSide());
Assert.assertNull(context.getProviderInfo());
Assert.assertNull(context.getInterfaceConfig());
Assert.assertTrue(context.getAttachments().isEmpty());

RpcInternalContext.removeAllContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public abstract class FaultBaseServiceTest extends FaultBaseTest {
public void beforeClass() throws Exception {
providerConfig.setRef(new HelloServiceTimeOutImpl());
providerConfig.export();
// test reuse client transport
consumerConfigNotUse.refer();
helloService = consumerConfig.refer();
}

@After
public void afterClass() {
providerConfig.unExport();
consumerConfigNotUse.unRefer();
consumerConfig.unRefer();
consumerConfig = null;
consumerConfig2 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class FaultBaseTest {
public static final String APP_NAME2 = "testAnotherApp";

protected ServerConfig serverConfig;
protected ConsumerConfig<FaultHelloService> consumerConfigNotUse;
protected ConsumerConfig<FaultHelloService> consumerConfig;
protected ConsumerConfig<FaultHelloService2> consumerConfig2;
protected ConsumerConfig<FaultHelloService> consumerConfigAnotherApp;
Expand All @@ -72,6 +73,15 @@ public void init() {
.setRegister(false)
.setApplication(providerAconfig);

// just for test
consumerConfigNotUse = new ConsumerConfig<FaultHelloService>()
.setInterfaceId(FaultHelloService.class.getName())
.setTimeout(500)
.setDirectUrl("127.0.0.1:12299")
.setRegister(false)
.setUniqueId("xxx")
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT);

ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setAppName(APP_NAME1);
consumerConfig = new ConsumerConfig<FaultHelloService>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest
}
if (invoker instanceof ProviderProxyInvoker) {
providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();
context.setInterfaceConfig(providerConfig);
// 找到服务后,打印服务的appName
appName = providerConfig != null ? providerConfig.getAppName() : null;
}
Expand Down
Loading

0 comments on commit e5e6a67

Please sign in to comment.