Skip to content

Commit

Permalink
When callback timeout, throw timeout exp as sync (sofastack#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
leizhiyuan authored and ujjboy committed Nov 30, 2018
1 parent 0f6f528 commit 4617bbe
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package com.alipay.sofa.rpc.message.bolt;

import com.alipay.remoting.rpc.exception.InvokeTimeoutException;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.context.AsyncRuntime;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
Expand Down Expand Up @@ -136,8 +138,14 @@ public void onException(Throwable e) {
EventBus.post(new ClientEndInvokeEvent(request, null, e));
}

SofaRpcException sofaRpcException = new SofaRpcException(
RpcErrorType.SERVER_UNDECLARED_ERROR, e.getMessage(), e);
//judge is timeout or others
SofaRpcException sofaRpcException = null;
if (e instanceof InvokeTimeoutException) {
sofaRpcException = new SofaTimeOutException(e);
} else {
sofaRpcException = new SofaRpcException(
RpcErrorType.SERVER_UNDECLARED_ERROR, e.getMessage(), e);
}
callback.onSofaException(sofaRpcException, request.getMethodName(), request);
} finally {
Thread.currentThread().setContextClassLoader(cl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.RequestBase;
import com.alipay.sofa.rpc.filter.Filter;
Expand All @@ -30,6 +31,7 @@
import com.alipay.sofa.rpc.test.ActivelyDestroyTest;
import com.alipay.sofa.rpc.test.HelloService;
import com.alipay.sofa.rpc.test.HelloServiceImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -38,31 +40,33 @@
import java.util.concurrent.TimeUnit;

/**
*
*
* @author <a href="mailto:[email protected]">GengZhang</a>
*/
public class AsyncCallbackTest extends ActivelyDestroyTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncCallbackTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncCallbackTest.class);

private ServerConfig serverConfig;
private ProviderConfig<HelloService> CProvider;
private ConsumerConfig<HelloService> BConsumer;

@Test
public void testAll() {

ServerConfig serverConfig2 = new ServerConfig()
serverConfig = new ServerConfig()
.setPort(22222)
.setDaemon(false);

// C服务的服务端
ProviderConfig<HelloService> CProvider = new ProviderConfig<HelloService>()
CProvider = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl(1000))
.setServer(serverConfig2);
.setServer(serverConfig);
CProvider.export();

// B调C的客户端
Filter filter = new TestAsyncFilter();
ConsumerConfig<HelloService> BConsumer = new ConsumerConfig<HelloService>()
BConsumer = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
.setTimeout(50000)
Expand Down Expand Up @@ -110,4 +114,151 @@ public void onSofaException(SofaRpcException sofaException, String methodName,

RpcInvokeContext.removeContext();
}

@Test
public void testTimeoutException() {

serverConfig = new ServerConfig()
.setPort(22222)
.setDaemon(false);

// C服务的服务端
CProvider = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl(500))
.setServer(serverConfig);
CProvider.export();

// B调C的客户端
Filter filter = new TestAsyncFilter();
BConsumer = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
.setTimeout(1)
.setFilterRef(Arrays.asList(filter))
// .setOnReturn() // 不设置 调用级别设置
.setDirectUrl("bolt://127.0.0.1:22222");
HelloService helloService = BConsumer.refer();

final CountDownLatch latch = new CountDownLatch(1);
final String[] ret = { null };

final boolean[] hasExp = { false };
RpcInvokeContext.getContext().setResponseCallback(new SofaResponseCallback() {
@Override
public void onAppResponse(Object appResponse, String methodName, RequestBase request) {
LOGGER.info("B get result: {}", appResponse);
latch.countDown();
}

@Override
public void onAppException(Throwable throwable, String methodName, RequestBase request) {
LOGGER.info("B get app exception: {}", throwable);
latch.countDown();
}

@Override
public void onSofaException(SofaRpcException sofaException, String methodName,
RequestBase request) {
LOGGER.info("B get sofa exception: {}", sofaException);

if (sofaException instanceof SofaTimeOutException) {
hasExp[0] = true;
}

latch.countDown();
}
});

String ret0 = helloService.sayHello("xxx", 22);
Assert.assertNull(ret0); // 第一次返回null

try {
latch.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
// 一定是一个超时异常
Assert.assertTrue(hasExp[0]);

RpcInvokeContext.removeContext();
}

@Test
public void testNoProviderException() {
//use bolt, so callback will throw connection closed exception
serverConfig = new ServerConfig()
.setPort(22222)
.setDaemon(false)
.setProtocol("rest");

serverConfig.buildIfAbsent().start();

// B调C的客户端
Filter filter = new TestAsyncFilter();
BConsumer = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
.setTimeout(1000)
.setFilterRef(Arrays.asList(filter))
// .setOnReturn() // 不设置 调用级别设置
.setDirectUrl("bolt://127.0.0.1:22222");
HelloService helloService = BConsumer.refer();

final CountDownLatch latch = new CountDownLatch(1);
final String[] ret = { null };

final boolean[] hasExp = { false };
RpcInvokeContext.getContext().setResponseCallback(new SofaResponseCallback() {
@Override
public void onAppResponse(Object appResponse, String methodName, RequestBase request) {
LOGGER.info("B get result: {}", appResponse);
latch.countDown();
}

@Override
public void onAppException(Throwable throwable, String methodName, RequestBase request) {
LOGGER.info("B get app exception: {}", throwable);
latch.countDown();
}

@Override
public void onSofaException(SofaRpcException sofaException, String methodName,
RequestBase request) {
LOGGER.info("B get sofa exception: {}", sofaException);

if ((sofaException instanceof SofaTimeOutException)) {
hasExp[0] = false;
} else {
hasExp[0] = true;
}

latch.countDown();
}
});

String ret0 = helloService.sayHello("xxx", 22);
Assert.assertNull(ret0); // 第一次返回null

try {
latch.await(1500, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
// 一定是一个超时异常
Assert.assertTrue(hasExp[0]);

RpcInvokeContext.removeContext();
}

@After
public void after() {
if (CProvider != null) {
CProvider.unExport();
}
if (BConsumer != null) {
BConsumer.unRefer();
}
if (serverConfig != null) {
serverConfig.destroy();
}
}
}

0 comments on commit 4617bbe

Please sign in to comment.