Skip to content

Commit

Permalink
Fix address judge of registry. (sofastack#633)
Browse files Browse the repository at this point in the history
  • Loading branch information
zonghaishang committed Jul 1, 2019
1 parent f421322 commit 678d52e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ protected AllConnectConnectionHolder(ConsumerBootstrap consumerBootstrap) {
*/
protected ConcurrentMap<ProviderInfo, ClientTransport> retryConnections = new ConcurrentHashMap<ProviderInfo, ClientTransport>();

/**
* last address for registry pushed
*/
protected Set<ProviderInfo> lastAddresses = new HashSet<ProviderInfo>();

/**
* 客户端变化provider的锁
*/
Expand Down Expand Up @@ -391,6 +396,10 @@ public void updateAllProviders(List<ProviderGroup> providerGroups) {
}

protected void addNode(List<ProviderInfo> providerInfoList) {

//first update last all providers
lastAddresses.addAll(providerInfoList);

final String interfaceId = consumerConfig.getInterfaceId();
int providerSize = providerInfoList.size();
String appName = consumerConfig.getAppName();
Expand Down Expand Up @@ -424,7 +433,6 @@ protected void addNode(List<ProviderInfo> providerInfoList) {

/**
* 线程池建立长连接
*
*/
protected void initClientRunnable(ThreadPoolExecutor initPool, final CountDownLatch latch,
final ProviderInfo providerInfo) {
Expand Down Expand Up @@ -467,6 +475,10 @@ protected void initClientTransport(String interfaceId, ProviderInfo providerInfo
}

public void removeNode(List<ProviderInfo> providerInfos) {

//first update last all providers
lastAddresses.removeAll(providerInfos);

String interfaceId = consumerConfig.getInterfaceId();
String appName = consumerConfig.getAppName();
if (LOGGER.isInfoEnabled(appName)) {
Expand Down Expand Up @@ -566,9 +578,7 @@ public Set<ProviderInfo> currentProviderList() {
providerLock.lock();
try {
ConcurrentHashSet<ProviderInfo> providerInfos = new ConcurrentHashSet<ProviderInfo>();
providerInfos.addAll(aliveConnections.keySet());
providerInfos.addAll(subHealthConnections.keySet());
providerInfos.addAll(retryConnections.keySet());
providerInfos.addAll(lastAddresses);
return providerInfos;
} finally {
providerLock.unlock();
Expand Down Expand Up @@ -634,6 +644,7 @@ protected Map<ProviderInfo, ClientTransport> clearProviders() {
*/
@Override
public void closeAllClientTransports(DestroyHook destroyHook) {

// 清空所有列表,不让再调了
Map<ProviderInfo, ClientTransport> all = clearProviders();
if (destroyHook != null) {
Expand Down Expand Up @@ -746,6 +757,7 @@ public Map<String, Set<ProviderInfo>> currentProviderMap() {
tmp.put("subHealth", new HashSet<ProviderInfo>(subHealthConnections.keySet()));
tmp.put("retry", new HashSet<ProviderInfo>(retryConnections.keySet()));
tmp.put("uninitialized", new HashSet<ProviderInfo>(uninitializedConnections.keySet()));
tmp.put("all", new HashSet<ProviderInfo>(lastAddresses));
return tmp;
} finally {
providerLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
*/
package com.alipay.sofa.rpc.test.client;

import java.util.Set;

import com.alipay.sofa.rpc.client.AllConnectConnectionHolder;
import com.alipay.sofa.rpc.client.ClientProxyInvoker;
import com.alipay.sofa.rpc.client.Cluster;
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderHelper;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand Down Expand Up @@ -122,4 +126,71 @@ public void getAvailableClientTransport2() throws Exception {
ProviderHelper.toProviderInfo("bolt://127.0.0.1:22224")));
consumerConfig.unRefer();
}

@Test
public void getAvailableClientTransport3() throws Exception {
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setDirectUrl("bolt://127.0.0.1:22223,bolt://127.0.0.1:22224")
.setConnectionHolder("all")
.setRegister(false)
.setLazy(true)
.setTimeout(3000);
HelloService helloService = consumerConfig.refer();
ClientProxyInvoker invoker = (ClientProxyInvoker) ProxyFactory.getInvoker(helloService,
consumerConfig.getProxy());
Cluster cluster = invoker.getCluster();
Assert.assertTrue(cluster.getConnectionHolder() instanceof AllConnectConnectionHolder);
AllConnectConnectionHolder holder = (AllConnectConnectionHolder) cluster.getConnectionHolder();

ProviderGroup providerGroups = new ProviderGroup();
providerGroups.add(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22223"));
providerGroups.add(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22224"));
holder.updateProviders(providerGroups);
Set<ProviderInfo> last = holder.currentProviderList();
Assert.assertEquals(2, last.size());

ProviderGroup providerGroups2 = new ProviderGroup();
providerGroups2.add(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22223"));
providerGroups2.add(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22224"));
providerGroups2.add(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22225"));
holder.updateProviders(providerGroups2);
Set<ProviderInfo> current = holder.currentProviderList();

Assert.assertEquals(3, current.size());

consumerConfig.unRefer();
}

@Test
public void getAvailableClientTransport4() throws Exception {
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setDirectUrl("bolt://127.0.0.1:22223,bolt://127.0.0.1:22224")
.setConnectionHolder("all")
.setRegister(false)
.setLazy(true)
.setTimeout(3000);
HelloService helloService = consumerConfig.refer();
ClientProxyInvoker invoker = (ClientProxyInvoker) ProxyFactory.getInvoker(helloService,
consumerConfig.getProxy());
Cluster cluster = invoker.getCluster();
Assert.assertTrue(cluster.getConnectionHolder() instanceof AllConnectConnectionHolder);
AllConnectConnectionHolder holder = (AllConnectConnectionHolder) cluster.getConnectionHolder();

ProviderGroup providerGroups = new ProviderGroup();
providerGroups.add(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22223"));
providerGroups.add(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22224"));
holder.updateProviders(providerGroups);
Set<ProviderInfo> last = holder.currentProviderList();
Assert.assertEquals(2, last.size());

ProviderGroup providerGroups2 = new ProviderGroup();
holder.updateProviders(providerGroups2);
Set<ProviderInfo> current = holder.currentProviderList();

Assert.assertEquals(0, current.size());

consumerConfig.unRefer();
}
}

0 comments on commit 678d52e

Please sign in to comment.