Skip to content

Commit

Permalink
[ISSUE#8099] Fast failure for distro sync task and verify task if clu…
Browse files Browse the repository at this point in the history
…ster disconnect. (alibaba#8693)

* Fast failure for distro sync task and verify task if cluster disconnect.

* Fix UnnecessaryStubbingException
  • Loading branch information
KomachiSion authored Jul 7, 2022
1 parent da34fbd commit 5aabba1
Show file tree
Hide file tree
Showing 4 changed files with 571 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,18 @@ public void onEvent(MembersChangeEvent event) {
Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client, event:{}, msg: {} ", event, e.getMessage());
}
}

/**
* Check whether client for member is running.
*
* @param member member
* @return {@code true} if target client is connected, otherwise {@code false}
*/
public boolean isRunning(Member member) {
RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
if (null == client) {
return false;
}
return client.isRunning();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,37 @@
package com.alibaba.nacos.core.cluster.remote;

import com.alibaba.nacos.api.ability.ServerAbilities;
import com.alibaba.nacos.api.remote.ability.ServerRemoteAbility;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.ability.ServerRemoteAbility;
import com.alibaba.nacos.api.remote.request.HealthCheckRequest;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.env.MockEnvironment;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;

/**
* {@link ClusterRpcClientProxy} unit test.
*
Expand All @@ -50,30 +59,44 @@
public class ClusterRpcClientProxyTest {

@InjectMocks
private ClusterRpcClientProxy clusterRpcClientProxy;
private ClusterRpcClientProxy clusterRpcClientProxy;

@Mock
private ServerMemberManager serverMemberManager;

@Mock
private RpcClient client;

private Member member;

@Before
public void setUp() {
public void setUp() throws NacosException {
EnvUtil.setEnvironment(new MockEnvironment());
Member member = new Member();
member = new Member();
member.setIp("1.1.1.1");
ServerAbilities serverAbilities = new ServerAbilities();
ServerRemoteAbility remoteAbility = new ServerRemoteAbility();
remoteAbility.setSupportRemoteConnection(true);
serverAbilities.setRemoteAbility(remoteAbility);
member.setAbilities(serverAbilities);
Mockito.when(serverMemberManager.allMembersWithoutSelf()).thenReturn(Collections.singletonList(member));

when(serverMemberManager.allMembersWithoutSelf()).thenReturn(Collections.singletonList(member));
clusterRpcClientProxy.init();
Map<String, RpcClient> clientMap = (Map<String, RpcClient>) ReflectionTestUtils
.getField(RpcClientFactory.class, "CLIENT_MAP");
clientMap.remove("Cluster-" + member.getAddress()).shutdown();
clientMap.put("Cluster-" + member.getAddress(), client);
when(client.getConnectionType()).thenReturn(ConnectionType.GRPC);
}

@AfterClass
public static void tearDown() throws NacosException {
Map<String, RpcClient> clientMap = (Map<String, RpcClient>) ReflectionTestUtils
.getField(RpcClientFactory.class, "CLIENT_MAP");
clientMap.remove("Cluster-1.1.1.1:-1").shutdown();
}

@Test
public void testSendRequest() {
Member member = new Member();
member.setIp("1.1.1.1");
try {
Response response = clusterRpcClientProxy.sendRequest(member, new HealthCheckRequest());
} catch (NacosException e) {
Expand All @@ -91,26 +114,23 @@ public void testAsyncRequest() {
public Executor getExecutor() {
return null;
}

@Override
public long getTimeout() {
return 0;
}

@Override
public void onResponse(Response response) {

}

@Override
public void onException(Throwable e) {
Assert.assertTrue(e instanceof NacosException);
}
};

Member member = new Member();
member.setIp("1.1.1.1");

try {
clusterRpcClientProxy.asyncRequest(member, new HealthCheckRequest(), requestCallBack);
} catch (NacosException e) {
Expand All @@ -136,4 +156,22 @@ public void testOnEvent() {
Assert.fail(e.getMessage());
}
}

@Test
public void testIsRunningForClientConnected() {
when(client.isRunning()).thenReturn(true);
assertTrue(clusterRpcClientProxy.isRunning(member));
}

@Test
public void testIsRunningForClientNotConnected() {
assertFalse(clusterRpcClientProxy.isRunning(member));
}

@Test
public void testIsRunningForNonExist() {
Member member = new Member();
member.setIp("11.11.11.11");
assertFalse(clusterRpcClientProxy.isRunning(member));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public boolean syncData(DistroData data, String targetServer) {
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
Loggers.DISTRO
.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
data.getDistroKey());
return false;
}
try {
Expand All @@ -91,6 +93,13 @@ public void syncData(DistroData data, String targetServer, DistroCallback callba
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO
.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
data.getDistroKey());
callback.onFailed(null);
return;
}
try {
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
} catch (NacosException nacosException) {
Expand All @@ -108,7 +117,9 @@ public boolean syncVerifyData(DistroData verifyData, String targetServer) {
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
Loggers.DISTRO
.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,
verifyData.getDistroKey());
return false;
}
try {
Expand All @@ -128,6 +139,13 @@ public void syncVerifyData(DistroData verifyData, String targetServer, DistroCal
}
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO
.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,
verifyData.getDistroKey());
callback.onFailed(null);
return;
}
try {
DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
verifyData.getDistroKey().getResourceKey(), callback, member);
Expand Down Expand Up @@ -174,8 +192,8 @@ public DistroData getDatumSnapshot(String targetServer) {
DistroDataRequest request = new DistroDataRequest();
request.setDataOperation(DataOperation.SNAPSHOT);
try {
Response response = clusterRpcClientProxy.sendRequest(member, request,
DistroConfig.getInstance().getLoadDataTimeoutMillis());
Response response = clusterRpcClientProxy
.sendRequest(member, request, DistroConfig.getInstance().getLoadDataTimeoutMillis());
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
Expand All @@ -193,7 +211,7 @@ private boolean isNoExistTarget(String target) {
}

private boolean checkTargetServerStatusUnhealthy(Member member) {
return null == member || !NodeState.UP.equals(member.getState());
return null == member || !NodeState.UP.equals(member.getState()) || !clusterRpcClientProxy.isRunning(member);
}

private boolean checkResponse(Response response) {
Expand Down Expand Up @@ -245,7 +263,7 @@ private class DistroVerifyCallbackWrapper implements RequestCallBack<Response> {
private final String clientId;

private final DistroCallback distroCallback;

private final Member member;

private DistroVerifyCallbackWrapper(String targetServer, String clientId, DistroCallback distroCallback,
Expand Down
Loading

0 comments on commit 5aabba1

Please sign in to comment.