Skip to content

Commit

Permalink
Recover data after zookeeper registry reconnected. (sofastack#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy committed Oct 25, 2018
1 parent 6eab49b commit 88024be
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;

import java.net.URLEncoder;
import java.util.Collections;
Expand Down Expand Up @@ -214,6 +217,33 @@ public synchronized void init() {
.retryPolicy(retryPolicy)
.defaultData(null)
.build();

zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {

if (LOGGER.isInfoEnabled()) {
LOGGER.info("reconnect to zookeeper,recover provider and consumer data");
}
if (newState == ConnectionState.RECONNECTED) {
recoverRegistryData();
}
}
});
}

//recover data when connect with zk again.

protected void recoverRegistryData() {

for (ProviderConfig providerConfig : providerUrls.keySet()) {
registerProviderUrls(providerConfig);
}

for (ConsumerConfig consumerConfig : consumerUrls.keySet()) {
subscribeConsumerUrls(consumerConfig);
}

}

@Override
Expand Down Expand Up @@ -278,43 +308,74 @@ public void register(ProviderConfig config) {
}
return;
}

//发布
if (config.isRegister()) {
// 注册服务端节点
try {
List<String> urls = ZookeeperRegistryHelper.convertProviderToUrls(config);
if (CommonUtils.isNotEmpty(urls)) {
String providerPath = buildProviderPath(rootPath, config);
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName,
LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, providerPath));
}
for (String url : urls) {
url = URLEncoder.encode(url, "UTF-8");
String providerUrl = providerPath + CONTEXT_SEP + url;
registerProviderUrls(config);
}

if (config.isSubscribe()) {
// 订阅配置节点
if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) {
//订阅接口级配置
subscribeConfig(config, config.getConfigListener());
}
}
}

/***
* 注册 服务信息
* @param config
* @return
* @throws Exception
*/
protected void registerProviderUrls(ProviderConfig config) {
String appName = config.getAppName();

// 注册服务端节点
try {
// 避免重复计算
List<String> urls;
if (providerUrls.containsKey(config)) {
urls = providerUrls.get(config);
} else {
urls = ZookeeperRegistryHelper.convertProviderToUrls(config);
providerUrls.put(config, urls);
}
if (CommonUtils.isNotEmpty(urls)) {

String providerPath = buildProviderPath(rootPath, config);
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName,
LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, providerPath));
}
for (String url : urls) {
url = URLEncoder.encode(url, "UTF-8");
String providerUrl = providerPath + CONTEXT_SEP + url;

try {
getAndCheckZkClient().create().creatingParentContainersIfNeeded()
.withMode(ephemeralNode ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT) // 是否永久节点
.forPath(providerUrl, config.isDynamic() ? PROVIDER_ONLINE : PROVIDER_OFFLINE); // 是否默认上下线
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, providerUrl));
}
}
providerUrls.put(config, urls);
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName,
LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, providerPath));
} catch (KeeperException.NodeExistsException nodeExistsException) {
if (LOGGER.isWarnEnabled(appName)) {
LOGGER.warnWithApp(appName,
"provider has exists in zookeeper, provider=" + providerUrl);
}
}
}
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to register provider to zookeeperRegistry!", e);
}
}

if (config.isSubscribe()) {
// 订阅配置节点
if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) {
//订阅接口级配置
subscribeConfig(config, config.getConfigListener());
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName,
LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, providerPath));
}

}
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to register provider to zookeeperRegistry!", e);
}
}

Expand Down Expand Up @@ -474,20 +535,10 @@ public List<ProviderGroup> subscribe(final ConsumerConfig config) {
}
return null;
}
// 注册Consumer节点
if (config.isRegister()) {
try {
String consumerPath = buildConsumerPath(rootPath, config);
String url = ZookeeperRegistryHelper.convertConsumerToUrl(config);
String encodeUrl = URLEncoder.encode(url, "UTF-8");
getAndCheckZkClient().create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL) // Consumer临时节点
.forPath(consumerPath + CONTEXT_SEP + encodeUrl);
consumerUrls.put(config, url);
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to register consumer to zookeeperRegistry!", e);
}
}

//订阅如果有必要
subscribeConsumerUrls(config);

if (config.isSubscribe()) {
// 订阅配置
if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) {
Expand Down Expand Up @@ -555,6 +606,37 @@ public void childEvent(CuratorFramework client1, PathChildrenCacheEvent event) t
return null;
}

/***
* 订阅
* @param config
*/
protected void subscribeConsumerUrls(ConsumerConfig config) {
// 注册Consumer节点
String url = null;
if (config.isRegister()) {
try {
String consumerPath = buildConsumerPath(rootPath, config);
if (consumerUrls.containsKey(config)) {
url = consumerUrls.get(config);
} else {
url = ZookeeperRegistryHelper.convertConsumerToUrl(config);
consumerUrls.put(config, url);
}
String encodeUrl = URLEncoder.encode(url, "UTF-8");
getAndCheckZkClient().create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL) // Consumer临时节点
.forPath(consumerPath + CONTEXT_SEP + encodeUrl);

} catch (KeeperException.NodeExistsException nodeExistsException) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("consumer has exists in zookeeper, consumer=" + url);
}
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to register consumer to zookeeperRegistry!", e);
}
}
}

@Override
public void unSubscribe(ConsumerConfig config) {
// 反注册服务端节点
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
*/
public class ZookeeperServerRestartTest extends BaseZkTest {

/** Logger for ZookeeperServerRestartTest **/
/**
* Logger for ZookeeperServerRestartTest
**/
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServerRestartTest.class);

@Test
public void testAll() throws Exception {
final RegistryConfig registryConfig = new RegistryConfig().setProtocol("zookeeper")
.setAddress("127.0.0.1:2181");
.setAddress("127.0.0.1:2181").setConnectTimeout(100);
final ZookeeperRegistry registry = (ZookeeperRegistry) RegistryFactory
.getRegistry(registryConfig);
registry.start();
Expand Down Expand Up @@ -88,7 +90,9 @@ public void run() {
}
serverConfig2.getServer().start();
Registry registry1 = RegistryFactory.getRegistry(registryConfig);
// mock server restart and register provider
// mock server restart and register provider
// if we don't unRegistry,create will fail,beacuse data is not clean quickly.
registry1.unRegister(providerConfig);
registry1.register(providerConfig);
}
});
Expand Down

0 comments on commit 88024be

Please sign in to comment.