Skip to content

Commit

Permalink
Retry logic lose exeception bugfix (sofastack#1044)
Browse files Browse the repository at this point in the history
* LocalDateTime test

* fix retry leads to lose exeception bug and sometime first time to call exeception bug

* code format

* Remove irrelevant code.

* retry select provider logic modify and add related test

* Remove irrelevant code.

* if all provider retry,then stop

* Retry until the number of attempts is exhausted

Co-authored-by: liujianjun.ljj <[email protected]>
  • Loading branch information
EvenLjj and liujianjun.ljj authored May 27, 2021
1 parent 77a178c commit 651b407
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ protected void checkClusterState() {
@Override
public void addProvider(ProviderGroup providerGroup) {
// 包装了各个组件的操作
addressHolder.addProvider(providerGroup);
connectionHolder.addProvider(providerGroup);
addressHolder.addProvider(providerGroup);
if (EventBus.isEnable(ProviderInfoAddEvent.class)) {
ProviderInfoAddEvent event = new ProviderInfoAddEvent(consumerConfig, providerGroup);
EventBus.post(event);
Expand Down Expand Up @@ -420,8 +420,13 @@ protected ProviderInfo select(SofaRequest message, List<ProviderInfo> invokedPro
} else {
originalProviderInfos = new ArrayList<>(providerInfos);
}
if (CommonUtils.isNotEmpty(invokedProviderInfos) && providerInfos.size() > invokedProviderInfos.size()) { // 总数大于已调用数
providerInfos.removeAll(invokedProviderInfos);// 已经调用异常的本次不再重试
if (CommonUtils.isNotEmpty(invokedProviderInfos)) {
// 已经调用异常的本次不再重试
providerInfos.removeAll(invokedProviderInfos);
// If all providers have retried once, then select by loadBalancer without filter.
if(CommonUtils.isEmpty(providerInfos)){
providerInfos = originalProviderInfos;
}
}

String targetIP = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
String methodName = request.getMethodName();
int retries = consumerConfig.getMethodRetries(methodName);
int time = 0;
SofaRpcException throwable = null;// 异常日志
// 异常日志
SofaRpcException throwable = null;
List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
do {

ProviderInfo providerInfo = select(request, invokedProviderInfos);
ProviderInfo providerInfo = null;
try {
providerInfo = select(request, invokedProviderInfos);
SofaResponse response = filterChain(providerInfo, request);
if (response != null) {
if (throwable != null) {
Expand All @@ -87,7 +88,12 @@ public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
throwable = e;
time++;
} else {
throw e;
// throw the exception that caused the retry
if (throwable != null) {
throw throwable;
} else {
throw e;
}
}
} catch (Exception e) { // 其它异常不重试
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
Expand All @@ -100,7 +106,9 @@ public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
time + 1); // 重试次数
}
}
invokedProviderInfos.add(providerInfo);
if (providerInfo != null) {
invokedProviderInfos.add(providerInfo);
}
} while (time <= retries);

throw throwable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;
import com.alipay.sofa.rpc.core.exception.SofaRouteException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.test.ActivelyDestroyTest;
import com.alipay.sofa.rpc.test.HelloService;
import com.alipay.sofa.rpc.test.HelloServiceImpl;
Expand Down Expand Up @@ -89,7 +90,7 @@ public String sayHello(String name, int age) {
} catch (Exception ignore) {
}
}
Assert.assertEquals(count1, 2);
Assert.assertEquals(2, count1);

ConsumerConfig<HelloService> consumerConfig2 = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
Expand Down Expand Up @@ -421,6 +422,44 @@ public void testRpcDirectInvokeFromContextNotAllowed() {
}
}

@Test(expected = SofaTimeOutException.class)
public void testRetryLogicThrowException() {
//one provider and retry twice
ServerConfig serverConfig = new ServerConfig()
.setStopTimeout(0)
.setPort(22222)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setQueues(100).setCoreThreads(5).setMaxThreads(5);

// 发布一个服务,每个请求要执行2秒
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl() {
@Override
public String sayHello(String name, int age) {
try {
Thread.sleep(2000);
} catch (Exception ignore) {
}
LOGGER.info("xxxxxxxxxxxxxxxxx" + age);
return "hello " + name + " from server! age: " + age;
}
})
.setServer(serverConfig)
.setRegister(false);
providerConfig.export();

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setDirectUrl("bolt://127.0.0.1:22222")
.setCluster("failover")
.setTimeout(1000)
.setRetries(2) // 失败后自动重试2次
.setRegister(false);
final HelloService helloService = consumerConfig.refer();
helloService.sayHello("xxx", 20);
}

@After
public void stopServer() {
RpcRuntimeContext.destroy();
Expand Down

0 comments on commit 651b407

Please sign in to comment.