From 5aabba13c9fe71beafcf0e1f12fd71f8c9d826e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E7=BF=8A=20SionYang?= Date: Thu, 7 Jul 2022 10:41:23 +0800 Subject: [PATCH] [ISSUE#8099] Fast failure for distro sync task and verify task if cluster disconnect. (#8693) * Fast failure for distro sync task and verify task if cluster disconnect. * Fix UnnecessaryStubbingException --- .../cluster/remote/ClusterRpcClientProxy.java | 14 + .../remote/ClusterRpcClientProxyTest.java | 70 ++- .../distro/v2/DistroClientTransportAgent.java | 30 +- .../v2/DistroClientTransportAgentTest.java | 479 ++++++++++++++++++ 4 files changed, 571 insertions(+), 22 deletions(-) create mode 100644 naming/src/test/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgentTest.java diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java index 2f81765d4df..b0febd7a9e2 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java @@ -226,4 +226,18 @@ public void onEvent(MembersChangeEvent event) { Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client, event:{}, msg: {} ", event, e.getMessage()); } } + + /** + * Check whether client for member is running. + * + * @param member member + * @return {@code true} if target client is connected, otherwise {@code false} + */ + public boolean isRunning(Member member) { + RpcClient client = RpcClientFactory.getClient(memberClientKey(member)); + if (null == client) { + return false; + } + return client.isRunning(); + } } diff --git a/core/src/test/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxyTest.java b/core/src/test/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxyTest.java index fef612c3b44..ffa6167e5ba 100644 --- a/core/src/test/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxyTest.java +++ b/core/src/test/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxyTest.java @@ -18,28 +18,37 @@ package com.alibaba.nacos.core.cluster.remote; import com.alibaba.nacos.api.ability.ServerAbilities; -import com.alibaba.nacos.api.remote.ability.ServerRemoteAbility; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RequestCallBack; +import com.alibaba.nacos.api.remote.ability.ServerRemoteAbility; import com.alibaba.nacos.api.remote.request.HealthCheckRequest; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.common.remote.client.RpcClient; +import com.alibaba.nacos.common.remote.client.RpcClientFactory; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.MembersChangeEvent; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.sys.env.EnvUtil; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.springframework.mock.env.MockEnvironment; +import org.springframework.test.util.ReflectionTestUtils; import java.util.Collections; +import java.util.Map; import java.util.concurrent.Executor; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + /** * {@link ClusterRpcClientProxy} unit test. * @@ -50,30 +59,44 @@ public class ClusterRpcClientProxyTest { @InjectMocks - private ClusterRpcClientProxy clusterRpcClientProxy; + private ClusterRpcClientProxy clusterRpcClientProxy; @Mock private ServerMemberManager serverMemberManager; + @Mock + private RpcClient client; + + private Member member; + @Before - public void setUp() { + public void setUp() throws NacosException { EnvUtil.setEnvironment(new MockEnvironment()); - Member member = new Member(); + member = new Member(); member.setIp("1.1.1.1"); ServerAbilities serverAbilities = new ServerAbilities(); ServerRemoteAbility remoteAbility = new ServerRemoteAbility(); remoteAbility.setSupportRemoteConnection(true); serverAbilities.setRemoteAbility(remoteAbility); member.setAbilities(serverAbilities); - Mockito.when(serverMemberManager.allMembersWithoutSelf()).thenReturn(Collections.singletonList(member)); - + when(serverMemberManager.allMembersWithoutSelf()).thenReturn(Collections.singletonList(member)); clusterRpcClientProxy.init(); + Map clientMap = (Map) ReflectionTestUtils + .getField(RpcClientFactory.class, "CLIENT_MAP"); + clientMap.remove("Cluster-" + member.getAddress()).shutdown(); + clientMap.put("Cluster-" + member.getAddress(), client); + when(client.getConnectionType()).thenReturn(ConnectionType.GRPC); + } + + @AfterClass + public static void tearDown() throws NacosException { + Map clientMap = (Map) ReflectionTestUtils + .getField(RpcClientFactory.class, "CLIENT_MAP"); + clientMap.remove("Cluster-1.1.1.1:-1").shutdown(); } @Test public void testSendRequest() { - Member member = new Member(); - member.setIp("1.1.1.1"); try { Response response = clusterRpcClientProxy.sendRequest(member, new HealthCheckRequest()); } catch (NacosException e) { @@ -91,26 +114,23 @@ public void testAsyncRequest() { public Executor getExecutor() { return null; } - + @Override public long getTimeout() { return 0; } - + @Override public void onResponse(Response response) { - + } - + @Override public void onException(Throwable e) { Assert.assertTrue(e instanceof NacosException); } }; - Member member = new Member(); - member.setIp("1.1.1.1"); - try { clusterRpcClientProxy.asyncRequest(member, new HealthCheckRequest(), requestCallBack); } catch (NacosException e) { @@ -136,4 +156,22 @@ public void testOnEvent() { Assert.fail(e.getMessage()); } } + + @Test + public void testIsRunningForClientConnected() { + when(client.isRunning()).thenReturn(true); + assertTrue(clusterRpcClientProxy.isRunning(member)); + } + + @Test + public void testIsRunningForClientNotConnected() { + assertFalse(clusterRpcClientProxy.isRunning(member)); + } + + @Test + public void testIsRunningForNonExist() { + Member member = new Member(); + member.setIp("11.11.11.11"); + assertFalse(clusterRpcClientProxy.isRunning(member)); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgent.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgent.java index 6c20ad2248e..828db09b127 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgent.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgent.java @@ -71,7 +71,9 @@ public boolean syncData(DistroData data, String targetServer) { DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); if (checkTargetServerStatusUnhealthy(member)) { - Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer); + Loggers.DISTRO + .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer, + data.getDistroKey()); return false; } try { @@ -91,6 +93,13 @@ public void syncData(DistroData data, String targetServer, DistroCallback callba } DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); + if (checkTargetServerStatusUnhealthy(member)) { + Loggers.DISTRO + .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer, + data.getDistroKey()); + callback.onFailed(null); + return; + } try { clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member)); } catch (NacosException nacosException) { @@ -108,7 +117,9 @@ public boolean syncVerifyData(DistroData verifyData, String targetServer) { DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY); Member member = memberManager.find(targetServer); if (checkTargetServerStatusUnhealthy(member)) { - Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer); + Loggers.DISTRO + .warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer, + verifyData.getDistroKey()); return false; } try { @@ -128,6 +139,13 @@ public void syncVerifyData(DistroData verifyData, String targetServer, DistroCal } DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY); Member member = memberManager.find(targetServer); + if (checkTargetServerStatusUnhealthy(member)) { + Loggers.DISTRO + .warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer, + verifyData.getDistroKey()); + callback.onFailed(null); + return; + } try { DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer, verifyData.getDistroKey().getResourceKey(), callback, member); @@ -174,8 +192,8 @@ public DistroData getDatumSnapshot(String targetServer) { DistroDataRequest request = new DistroDataRequest(); request.setDataOperation(DataOperation.SNAPSHOT); try { - Response response = clusterRpcClientProxy.sendRequest(member, request, - DistroConfig.getInstance().getLoadDataTimeoutMillis()); + Response response = clusterRpcClientProxy + .sendRequest(member, request, DistroConfig.getInstance().getLoadDataTimeoutMillis()); if (checkResponse(response)) { return ((DistroDataResponse) response).getDistroData(); } else { @@ -193,7 +211,7 @@ private boolean isNoExistTarget(String target) { } private boolean checkTargetServerStatusUnhealthy(Member member) { - return null == member || !NodeState.UP.equals(member.getState()); + return null == member || !NodeState.UP.equals(member.getState()) || !clusterRpcClientProxy.isRunning(member); } private boolean checkResponse(Response response) { @@ -245,7 +263,7 @@ private class DistroVerifyCallbackWrapper implements RequestCallBack { private final String clientId; private final DistroCallback distroCallback; - + private final Member member; private DistroVerifyCallbackWrapper(String targetServer, String clientId, DistroCallback distroCallback, diff --git a/naming/src/test/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgentTest.java b/naming/src/test/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgentTest.java new file mode 100644 index 00000000000..6c724bcf7e1 --- /dev/null +++ b/naming/src/test/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/v2/DistroClientTransportAgentTest.java @@ -0,0 +1,479 @@ +/* + * Copyright 1999-2021 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.consistency.ephemeral.distro.v2; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.RequestCallBack; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.remote.response.ResponseCode; +import com.alibaba.nacos.core.cluster.Member; +import com.alibaba.nacos.core.cluster.NodeState; +import com.alibaba.nacos.core.cluster.ServerMemberManager; +import com.alibaba.nacos.core.cluster.remote.ClusterRpcClientProxy; +import com.alibaba.nacos.core.distributed.distro.component.DistroCallback; +import com.alibaba.nacos.core.distributed.distro.entity.DistroData; +import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; +import com.alibaba.nacos.core.distributed.distro.exception.DistroException; +import com.alibaba.nacos.core.remote.control.TpsMonitorManager; +import com.alibaba.nacos.naming.cluster.remote.response.DistroDataResponse; +import com.alibaba.nacos.sys.env.EnvUtil; +import com.alibaba.nacos.sys.utils.ApplicationUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.mock.env.MockEnvironment; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DistroClientTransportAgentTest { + + @Mock + ClusterRpcClientProxy clusterRpcClientProxy; + + @Mock + ServerMemberManager memberManager; + + @Mock + TpsMonitorManager tpsMonitorManager; + + @Mock + ConfigurableApplicationContext context; + + @Mock + DistroCallback distroCallback; + + @InjectMocks + DistroClientTransportAgent transportAgent; + + Member member; + + Response response; + + @Before + public void setUp() throws Exception { + when(context.getBean(TpsMonitorManager.class)).thenReturn(tpsMonitorManager); + ApplicationUtils.injectContext(context); + EnvUtil.setEnvironment(new MockEnvironment()); + member = new Member(); + member.setIp("1.1.1.1"); + member.setPort(8848); + response = new DistroDataResponse(); + when(memberManager.find(member.getAddress())).thenReturn(member); + when(memberManager.getSelf()).thenReturn(member); + when(clusterRpcClientProxy.sendRequest(eq(member), any())).thenReturn(response); + doAnswer(invocationOnMock -> { + RequestCallBack callback = invocationOnMock.getArgument(2); + callback.onResponse(response); + return null; + }).when(clusterRpcClientProxy).asyncRequest(eq(member), any(), any()); + // When run all project, the TpsNamingMonitor will be init by other unit test, will throw UnnecessaryStubbingException. + ApplicationUtils.getBean(TpsMonitorManager.class); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testSupportCallbackTransport() { + assertTrue(transportAgent.supportCallbackTransport()); + } + + @Test + public void testSyncDataForMemberNonExist() throws NacosException { + assertTrue(transportAgent.syncData(new DistroData(), member.getAddress())); + verify(memberManager, never()).find(member.getAddress()); + verify(clusterRpcClientProxy, never()).sendRequest(any(Member.class), any()); + } + + @Test + public void testSyncDataForMemberUnhealthy() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + assertFalse(transportAgent.syncData(new DistroData(), member.getAddress())); + verify(clusterRpcClientProxy, never()).sendRequest(any(Member.class), any()); + } + + @Test + public void testSyncDataForMemberDisconnect() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + assertFalse(transportAgent.syncData(new DistroData(), member.getAddress())); + verify(clusterRpcClientProxy, never()).sendRequest(any(Member.class), any()); + } + + @Test + public void testSyncDataFailure() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + response.setErrorInfo(ResponseCode.FAIL.getCode(), "TEST"); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + assertFalse(transportAgent.syncData(new DistroData(), member.getAddress())); + } + + @Test + public void testSyncDataException() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.sendRequest(eq(member), any())).thenThrow(new NacosException()); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + assertFalse(transportAgent.syncData(new DistroData(), member.getAddress())); + } + + @Test + public void testSyncDataSuccess() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + assertTrue(transportAgent.syncData(new DistroData(), member.getAddress())); + } + + @Test + public void testSyncDataWithCallbackForMemberNonExist() throws NacosException { + transportAgent.syncData(new DistroData(), member.getAddress(), distroCallback); + verify(distroCallback).onSuccess(); + verify(memberManager, never()).find(member.getAddress()); + verify(clusterRpcClientProxy, never()).asyncRequest(any(Member.class), any(), any()); + } + + @Test + public void testSyncDataWithCallbackForMemberUnhealthy() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + transportAgent.syncData(new DistroData(), member.getAddress(), distroCallback); + verify(distroCallback).onFailed(null); + verify(clusterRpcClientProxy, never()).asyncRequest(any(Member.class), any(), any()); + } + + @Test + public void testSyncDataWithCallbackForMemberDisconnect() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + transportAgent.syncData(new DistroData(), member.getAddress(), distroCallback); + verify(distroCallback).onFailed(null); + verify(clusterRpcClientProxy, never()).asyncRequest(any(Member.class), any(), any()); + } + + @Test + public void testSyncDataWithCallbackFailure() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + response.setErrorInfo(ResponseCode.FAIL.getCode(), "TEST"); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + transportAgent.syncData(new DistroData(), member.getAddress(), distroCallback); + verify(distroCallback).onFailed(null); + } + + @Test + public void testSyncDataWithCallbackException() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + doThrow(new NacosException()).when(clusterRpcClientProxy).asyncRequest(eq(member), any(), any()); + transportAgent.syncData(new DistroData(), member.getAddress(), distroCallback); + verify(distroCallback).onFailed(any(NacosException.class)); + } + + @Test + public void testSyncDataWithCallbackException2() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + doAnswer(invocationOnMock -> { + RequestCallBack callback = invocationOnMock.getArgument(2); + callback.onException(new NacosException()); + return null; + }).when(clusterRpcClientProxy).asyncRequest(eq(member), any(), any()); + transportAgent.syncData(new DistroData(), member.getAddress(), distroCallback); + verify(distroCallback).onFailed(any(NacosException.class)); + } + + @Test + public void testSyncDataWithCallbackSuccess() throws NacosException { + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + transportAgent.syncData(new DistroData(), member.getAddress(), distroCallback); + verify(distroCallback).onSuccess(); + } + + @Test + public void testSyncVerifyDataForMemberNonExist() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + assertTrue(transportAgent.syncVerifyData(verifyData, member.getAddress())); + verify(memberManager, never()).find(member.getAddress()); + verify(clusterRpcClientProxy, never()).sendRequest(any(Member.class), any()); + } + + @Test + public void testSyncVerifyDataForMemberUnhealthy() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + assertFalse(transportAgent.syncVerifyData(verifyData, member.getAddress())); + verify(clusterRpcClientProxy, never()).sendRequest(any(Member.class), any()); + } + + @Test + public void testSyncVerifyDataForMemberDisconnect() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + assertFalse(transportAgent.syncVerifyData(verifyData, member.getAddress())); + verify(clusterRpcClientProxy, never()).sendRequest(any(Member.class), any()); + } + + @Test + public void testSyncVerifyDataFailure() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + response.setErrorInfo(ResponseCode.FAIL.getCode(), "TEST"); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + assertFalse(transportAgent.syncVerifyData(verifyData, member.getAddress())); + } + + @Test + public void testSyncVerifyDataException() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.sendRequest(eq(member), any())).thenThrow(new NacosException()); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + assertFalse(transportAgent.syncVerifyData(verifyData, member.getAddress())); + } + + @Test + public void testSyncVerifyDataSuccess() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + assertTrue(transportAgent.syncVerifyData(verifyData, member.getAddress())); + } + + @Test + public void testSyncVerifyDataWithCallbackForMemberNonExist() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + transportAgent.syncVerifyData(verifyData, member.getAddress(), distroCallback); + verify(distroCallback).onSuccess(); + verify(memberManager, never()).find(member.getAddress()); + verify(clusterRpcClientProxy, never()).asyncRequest(any(Member.class), any(), any()); + } + + @Test + public void testSyncVerifyDataWithCallbackForMemberUnhealthy() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + transportAgent.syncVerifyData(verifyData, member.getAddress(), distroCallback); + verify(distroCallback).onFailed(null); + verify(clusterRpcClientProxy, never()).asyncRequest(any(Member.class), any(), any()); + } + + @Test + public void testSyncVerifyDataWithCallbackForMemberDisconnect() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + transportAgent.syncVerifyData(verifyData, member.getAddress(), distroCallback); + verify(distroCallback).onFailed(null); + verify(clusterRpcClientProxy, never()).asyncRequest(any(Member.class), any(), any()); + } + + @Test + public void testSyncVerifyDataWithCallbackFailure() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + response.setErrorInfo(ResponseCode.FAIL.getCode(), "TEST"); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + transportAgent.syncVerifyData(verifyData, member.getAddress(), distroCallback); + verify(distroCallback).onFailed(null); + } + + @Test + public void testSyncVerifyDataWithCallbackException() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + doThrow(new NacosException()).when(clusterRpcClientProxy).asyncRequest(eq(member), any(), any()); + transportAgent.syncVerifyData(verifyData, member.getAddress(), distroCallback); + verify(distroCallback).onFailed(any(NacosException.class)); + } + + @Test + public void testSyncVerifyDataWithCallbackException2() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + doAnswer(invocationOnMock -> { + RequestCallBack callback = invocationOnMock.getArgument(2); + callback.onException(new NacosException()); + return null; + }).when(clusterRpcClientProxy).asyncRequest(eq(member), any(), any()); + transportAgent.syncVerifyData(verifyData, member.getAddress(), distroCallback); + verify(distroCallback).onFailed(any(NacosException.class)); + } + + @Test + public void testSyncVerifyDataWithCallbackSuccess() throws NacosException { + DistroData verifyData = new DistroData(); + verifyData.setDistroKey(new DistroKey()); + when(memberManager.hasMember(member.getAddress())).thenReturn(true); + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + transportAgent.syncVerifyData(verifyData, member.getAddress(), distroCallback); + verify(distroCallback).onSuccess(); + } + + @Test(expected = DistroException.class) + public void testGetDataForMemberNonExist() { + transportAgent.getData(new DistroKey(), member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDataForMemberUnhealthy() { + when(memberManager.find(member.getAddress())).thenReturn(member); + transportAgent.getData(new DistroKey(), member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDataForMemberDisconnect() { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + transportAgent.getData(new DistroKey(), member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDataException() throws NacosException { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + when(clusterRpcClientProxy.sendRequest(eq(member), any())).thenThrow(new NacosException()); + transportAgent.getData(new DistroKey(), member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDataFailure() { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + response.setErrorInfo(ResponseCode.FAIL.getCode(), "TEST"); + transportAgent.getData(new DistroKey(), member.getAddress()); + } + + @Test + public void testGetDataSuccess() { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + transportAgent.getData(new DistroKey(), member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDatumSnapshotForMemberNonExist() { + transportAgent.getDatumSnapshot(member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDatumSnapshotForMemberUnhealthy() { + when(memberManager.find(member.getAddress())).thenReturn(member); + transportAgent.getDatumSnapshot(member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDatumSnapshotForMemberDisconnect() { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + transportAgent.getDatumSnapshot(member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDatumSnapshotException() throws NacosException { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + when(clusterRpcClientProxy.sendRequest(eq(member), any(), any(Long.class))).thenThrow(new NacosException()); + transportAgent.getDatumSnapshot(member.getAddress()); + } + + @Test(expected = DistroException.class) + public void testGetDatumSnapshotFailure() throws NacosException { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + when(clusterRpcClientProxy.sendRequest(eq(member), any(), any(Long.class))).thenReturn(response); + response.setErrorInfo(ResponseCode.FAIL.getCode(), "TEST"); + transportAgent.getDatumSnapshot(member.getAddress()); + } + + @Test + public void testGetDatumSnapshotSuccess() throws NacosException { + when(memberManager.find(member.getAddress())).thenReturn(member); + member.setState(NodeState.UP); + when(clusterRpcClientProxy.isRunning(member)).thenReturn(true); + when(clusterRpcClientProxy.sendRequest(eq(member), any(), any(Long.class))).thenReturn(response); + transportAgent.getDatumSnapshot(member.getAddress()); + } +}