From 88024be37a2e71c3dd277962ff45dad0c7d64510 Mon Sep 17 00:00:00 2001 From: Geng Zhang Date: Wed, 24 Oct 2018 10:35:10 +0800 Subject: [PATCH] Recover data after zookeeper registry reconnected. (#337) --- .../rpc/registry/zk/ZookeeperRegistry.java | 162 +++++++++++++----- .../zk/ZookeeperServerRestartTest.java | 10 +- 2 files changed, 129 insertions(+), 43 deletions(-) diff --git a/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistry.java b/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistry.java index 3791e6897..4f06b394b 100644 --- a/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistry.java +++ b/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistry.java @@ -40,8 +40,11 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import java.net.URLEncoder; import java.util.Collections; @@ -214,6 +217,33 @@ public synchronized void init() { .retryPolicy(retryPolicy) .defaultData(null) .build(); + + zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + + if (LOGGER.isInfoEnabled()) { + LOGGER.info("reconnect to zookeeper,recover provider and consumer data"); + } + if (newState == ConnectionState.RECONNECTED) { + recoverRegistryData(); + } + } + }); + } + + //recover data when connect with zk again. + + protected void recoverRegistryData() { + + for (ProviderConfig providerConfig : providerUrls.keySet()) { + registerProviderUrls(providerConfig); + } + + for (ConsumerConfig consumerConfig : consumerUrls.keySet()) { + subscribeConsumerUrls(consumerConfig); + } + } @Override @@ -278,43 +308,74 @@ public void register(ProviderConfig config) { } return; } + + //发布 if (config.isRegister()) { - // 注册服务端节点 - try { - List urls = ZookeeperRegistryHelper.convertProviderToUrls(config); - if (CommonUtils.isNotEmpty(urls)) { - String providerPath = buildProviderPath(rootPath, config); - if (LOGGER.isInfoEnabled(appName)) { - LOGGER.infoWithApp(appName, - LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, providerPath)); - } - for (String url : urls) { - url = URLEncoder.encode(url, "UTF-8"); - String providerUrl = providerPath + CONTEXT_SEP + url; + registerProviderUrls(config); + } + + if (config.isSubscribe()) { + // 订阅配置节点 + if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) { + //订阅接口级配置 + subscribeConfig(config, config.getConfigListener()); + } + } + } + + /*** + * 注册 服务信息 + * @param config + * @return + * @throws Exception + */ + protected void registerProviderUrls(ProviderConfig config) { + String appName = config.getAppName(); + + // 注册服务端节点 + try { + // 避免重复计算 + List urls; + if (providerUrls.containsKey(config)) { + urls = providerUrls.get(config); + } else { + urls = ZookeeperRegistryHelper.convertProviderToUrls(config); + providerUrls.put(config, urls); + } + if (CommonUtils.isNotEmpty(urls)) { + + String providerPath = buildProviderPath(rootPath, config); + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, + LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, providerPath)); + } + for (String url : urls) { + url = URLEncoder.encode(url, "UTF-8"); + String providerUrl = providerPath + CONTEXT_SEP + url; + + try { getAndCheckZkClient().create().creatingParentContainersIfNeeded() .withMode(ephemeralNode ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT) // 是否永久节点 .forPath(providerUrl, config.isDynamic() ? PROVIDER_ONLINE : PROVIDER_OFFLINE); // 是否默认上下线 if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, providerUrl)); } - } - providerUrls.put(config, urls); - if (LOGGER.isInfoEnabled(appName)) { - LOGGER.infoWithApp(appName, - LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, providerPath)); + } catch (KeeperException.NodeExistsException nodeExistsException) { + if (LOGGER.isWarnEnabled(appName)) { + LOGGER.warnWithApp(appName, + "provider has exists in zookeeper, provider=" + providerUrl); + } } } - } catch (Exception e) { - throw new SofaRpcRuntimeException("Failed to register provider to zookeeperRegistry!", e); - } - } - if (config.isSubscribe()) { - // 订阅配置节点 - if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) { - //订阅接口级配置 - subscribeConfig(config, config.getConfigListener()); + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, + LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, providerPath)); + } + } + } catch (Exception e) { + throw new SofaRpcRuntimeException("Failed to register provider to zookeeperRegistry!", e); } } @@ -474,20 +535,10 @@ public List subscribe(final ConsumerConfig config) { } return null; } - // 注册Consumer节点 - if (config.isRegister()) { - try { - String consumerPath = buildConsumerPath(rootPath, config); - String url = ZookeeperRegistryHelper.convertConsumerToUrl(config); - String encodeUrl = URLEncoder.encode(url, "UTF-8"); - getAndCheckZkClient().create().creatingParentContainersIfNeeded() - .withMode(CreateMode.EPHEMERAL) // Consumer临时节点 - .forPath(consumerPath + CONTEXT_SEP + encodeUrl); - consumerUrls.put(config, url); - } catch (Exception e) { - throw new SofaRpcRuntimeException("Failed to register consumer to zookeeperRegistry!", e); - } - } + + //订阅如果有必要 + subscribeConsumerUrls(config); + if (config.isSubscribe()) { // 订阅配置 if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) { @@ -555,6 +606,37 @@ public void childEvent(CuratorFramework client1, PathChildrenCacheEvent event) t return null; } + /*** + * 订阅 + * @param config + */ + protected void subscribeConsumerUrls(ConsumerConfig config) { + // 注册Consumer节点 + String url = null; + if (config.isRegister()) { + try { + String consumerPath = buildConsumerPath(rootPath, config); + if (consumerUrls.containsKey(config)) { + url = consumerUrls.get(config); + } else { + url = ZookeeperRegistryHelper.convertConsumerToUrl(config); + consumerUrls.put(config, url); + } + String encodeUrl = URLEncoder.encode(url, "UTF-8"); + getAndCheckZkClient().create().creatingParentContainersIfNeeded() + .withMode(CreateMode.EPHEMERAL) // Consumer临时节点 + .forPath(consumerPath + CONTEXT_SEP + encodeUrl); + + } catch (KeeperException.NodeExistsException nodeExistsException) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("consumer has exists in zookeeper, consumer=" + url); + } + } catch (Exception e) { + throw new SofaRpcRuntimeException("Failed to register consumer to zookeeperRegistry!", e); + } + } + } + @Override public void unSubscribe(ConsumerConfig config) { // 反注册服务端节点 diff --git a/test/test-integration-3rd/src/test/java/com/alipay/sofa/rpc/registry/zk/ZookeeperServerRestartTest.java b/test/test-integration-3rd/src/test/java/com/alipay/sofa/rpc/registry/zk/ZookeeperServerRestartTest.java index 4405876e6..8a8e0e213 100644 --- a/test/test-integration-3rd/src/test/java/com/alipay/sofa/rpc/registry/zk/ZookeeperServerRestartTest.java +++ b/test/test-integration-3rd/src/test/java/com/alipay/sofa/rpc/registry/zk/ZookeeperServerRestartTest.java @@ -37,13 +37,15 @@ */ public class ZookeeperServerRestartTest extends BaseZkTest { - /** Logger for ZookeeperServerRestartTest **/ + /** + * Logger for ZookeeperServerRestartTest + **/ private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServerRestartTest.class); @Test public void testAll() throws Exception { final RegistryConfig registryConfig = new RegistryConfig().setProtocol("zookeeper") - .setAddress("127.0.0.1:2181"); + .setAddress("127.0.0.1:2181").setConnectTimeout(100); final ZookeeperRegistry registry = (ZookeeperRegistry) RegistryFactory .getRegistry(registryConfig); registry.start(); @@ -88,7 +90,9 @@ public void run() { } serverConfig2.getServer().start(); Registry registry1 = RegistryFactory.getRegistry(registryConfig); - // mock server restart and register provider + // mock server restart and register provider + // if we don't unRegistry,create will fail,beacuse data is not clean quickly. + registry1.unRegister(providerConfig); registry1.register(providerConfig); } });