Skip to content

Commit

Permalink
Support uniqueId in zookeeper registry. (sofastack#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy authored Apr 17, 2018
1 parent 69581ee commit a289c7b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public final class ProviderInfoAttrs {
* 静态配置key:appName
*/
public static final String ATTR_APP_NAME = RpcConstants.CONFIG_KEY_APP_NAME;
/**
* 静态配置key:uniqueId
*/
public static final String ATTR_UNIQUEID = RpcConstants.CONFIG_KEY_UNIQUEID;
/**
* 静态配置key:source 来源
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ public void register(ProviderConfig config) {
getAndCheckZkClient().create().creatingParentContainersIfNeeded()
.withMode(ephemeralNode ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT) // 是否永久节点
.forPath(providerUrl, config.isDynamic() ? PROVIDER_ONLINE : PROVIDER_OFFLINE); // 是否默认上下线
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, providerUrl));
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, providerUrl));
}
}
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName,
Expand Down Expand Up @@ -439,8 +441,8 @@ public void childEvent(CuratorFramework client1, PathChildrenCacheEvent event) t
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
List<ProviderInfo> providerInfos = ZookeeperRegistryHelper.convertUrlsToProviders(
providerPath, pathChildrenCache.getCurrentData());

return Collections.singletonList(new ProviderGroup().addAll(providerInfos));
List<ProviderInfo> matchProviders = ZookeeperRegistryHelper.matchProviderInfos(config, providerInfos);
return Collections.singletonList(new ProviderGroup().addAll(matchProviders));
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to subscribe provider from zookeeperRegistry!", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alipay.sofa.rpc.common.Version;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand Down Expand Up @@ -181,4 +182,17 @@ static String buildConsumerPath(String rootPath, AbstractInterfaceConfig config)
static String buildConfigPath(String rootPath, AbstractInterfaceConfig config) {
return rootPath + "sofa-rpc/" + config.getInterfaceId() + "/configs";
}

static List<ProviderInfo> matchProviderInfos(ConsumerConfig consumerConfig, List<ProviderInfo> providerInfos) {
String protocol = consumerConfig.getProtocol();
List<ProviderInfo> result = new ArrayList<ProviderInfo>();
for (ProviderInfo providerInfo : providerInfos) {
if (providerInfo.getProtocolType().equalsIgnoreCase(protocol)
&& StringUtils.equals(consumerConfig.getUniqueId(),
providerInfo.getAttr(ProviderInfoAttrs.ATTR_UNIQUEID))) {
result.add(providerInfo);
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public static void setUp() {
.setSubscribe(true)
.setAddress("127.0.0.1:2181")
.setRegister(true);
//.setConnectTimeout(5000)
//.setHeartbeatPeriod(60000)
//.setReconnectPeriod(15000)
//.setBatch(true)
//.setBatchSize(10);

registry = (ZookeeperRegistry) RegistryFactory.getRegistry(registryConfig);
registry.init();
Expand Down Expand Up @@ -117,6 +112,23 @@ public void testAll() throws Exception {
Map<String, ProviderInfo> ps = providerInfoListener.getData();
Assert.assertTrue(ps.size() == 1);

// 订阅 错误的uniqueId
ConsumerConfig<?> consumerNoUniqueId = new ConsumerConfig();
consumerNoUniqueId.setInterfaceId("com.alipay.xxx.TestService")
.setApplication(new ApplicationConfig().setAppName("test-server"))
.setProxy("javassist")
.setSubscribe(true)
.setSerialization("java")
.setInvokeType("sync")
.setTimeout(4444);
latch = new CountDownLatch(1);
providerInfoListener.setCountDownLatch(latch);
consumerNoUniqueId.setProviderInfoListener(providerInfoListener);
all = registry.subscribe(consumerNoUniqueId);
providerInfoListener.updateAllProviders(all);
ps = providerInfoListener.getData();
Assert.assertTrue(ps.size() == 0);

// 反注册
latch = new CountDownLatch(1);
providerInfoListener.setCountDownLatch(latch);
Expand Down Expand Up @@ -154,12 +166,9 @@ public void testAll() throws Exception {

Map<String, ProviderInfo> ps2 = providerInfoListener2.getData();
Assert.assertTrue(ps2.size() == 2);
// Assert.assertTrue(registry.subscribers.size() == 2);

// 取消订阅者1
registry.unSubscribe(consumer);
// Assert.assertFalse(callback.providerInfoListeners.contains(consumer));
// Assert.assertTrue(registry.subscribers.size() == 2);

// 批量反注册,判断订阅者2的数据
latch = new CountDownLatch(2);
Expand All @@ -170,14 +179,12 @@ public void testAll() throws Exception {

latch.await(timeoutPerSub * 2, TimeUnit.MILLISECONDS);
Assert.assertTrue(ps2.size() == 0);
// Assert.assertTrue(registry.subscribers.size() == 2); // 1个服务 订阅服务列表和服务配置 2个dataId

// 批量取消订阅
List<ConsumerConfig> consumerConfigList = new ArrayList<ConsumerConfig>();
consumerConfigList.add(consumer2);
registry.batchUnSubscribe(consumerConfigList);

// Assert.assertTrue(registry.subscribers.size() == 0);
}

private static class MockProviderInfoListener implements ProviderInfoListener {
Expand Down

0 comments on commit a289c7b

Please sign in to comment.