Skip to content

Commit

Permalink
refactor: 重构ZooKeeperRegistry,修改单测
Browse files Browse the repository at this point in the history
  • Loading branch information
zyseap committed Apr 27, 2016
1 parent 887b6c5 commit 83183d6
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.weibo.api.motan.registry.zookeeper;

enum ZkNodeType {
AVAILABLE_SERVER,
UNAVAILABLE_SERVER,
CLIENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ public ConcurrentHashMap<URL, ConcurrentHashMap<NotifyListener, IZkChildListener
@Override
protected void doRegister(URL url) {
try {
setUnavailable(url);
// 防止旧节点未正常注销
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
} catch (Throwable e) {
throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()));
}
Expand All @@ -55,10 +58,8 @@ protected void doRegister(URL url) {
@Override
protected void doUnregister(URL url) {
try {
String availableNodePath = toServerNodePath(url, toAvailableServerTypePath(url));
deleteNode(availableNodePath);
String unavailableNodePath = toServerNodePath(url, toUnavailableServerTypePath(url));
deleteNode(unavailableNodePath);
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
} catch (Throwable e) {
throw new MotanFrameworkException(String.format("Failed to unregister %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()));
}
Expand All @@ -84,16 +85,14 @@ public void handleChildChange(String parentPath, List<String> currentChilds) {
zkChildListener = childChangeListeners.get(notifyListener);
}

String clientNodePath = toClientNodePath(url);
// 防止旧节点未正常注销
deleteNode(clientNodePath);

createClientNode(url, toClientTypePath(url));
removeNode(url, ZkNodeType.CLIENT);
createNode(url, ZkNodeType.CLIENT);

// 获取当前可用server
String serverTypePath = toAvailableServerTypePath(url);
// 订阅server节点,并获取当前可用server
String serverTypePath = toNodeTypePath(url, ZkNodeType.AVAILABLE_SERVER);
List<String> currentChilds = zkClient.subscribeChildChanges(serverTypePath, zkChildListener);
LoggerUtil.info(String.format("[ZookeeperRegistry] subscribe: path=%s, info=%s", toServerNodePath(url, serverTypePath), url.toFullStr()));
LoggerUtil.info(String.format("[ZookeeperRegistry] subscribe: path=%s, info=%s", toNodePath(url, ZkNodeType.AVAILABLE_SERVER), url.toFullStr()));
notify(url, notifyListener, nodeChildsToUrls(serverTypePath, currentChilds));
} catch (Throwable e) {
throw new MotanFrameworkException(String.format("Failed to subscribe %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()));
Expand All @@ -107,7 +106,7 @@ protected void doUnsubscribe(URL url, NotifyListener notifyListener) {
if (childChangeListeners != null) {
IZkChildListener zkChildListener = childChangeListeners.get(notifyListener);
if (zkChildListener != null) {
zkClient.unsubscribeChildChanges(toClientTypePath(url), zkChildListener);
zkClient.unsubscribeChildChanges(toNodeTypePath(url, ZkNodeType.CLIENT), zkChildListener);
childChangeListeners.remove(notifyListener);
}
}
Expand All @@ -119,8 +118,11 @@ protected void doUnsubscribe(URL url, NotifyListener notifyListener) {
@Override
protected List<URL> doDiscover(URL url) {
try {
String parentPath = toAvailableServerTypePath(url);
List<String> currentChilds = zkClient.getChildren(parentPath);
String parentPath = toNodeTypePath(url, ZkNodeType.AVAILABLE_SERVER);
List<String> currentChilds = new ArrayList<String>();
if (zkClient.exists(parentPath)) {
currentChilds = zkClient.getChildren(parentPath);
}
return nodeChildsToUrls(parentPath, currentChilds);
} catch (Throwable e) {
throw new MotanFrameworkException(String.format("Failed to discover %s from zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()));
Expand All @@ -131,38 +133,32 @@ protected List<URL> doDiscover(URL url) {
protected void doAvailable(URL url) {
if (url == null) {
for (URL u : getRegisteredServiceUrls()) {
setAvailable(u);
removeNode(u, ZkNodeType.AVAILABLE_SERVER);
removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
createNode(u, ZkNodeType.AVAILABLE_SERVER);
}
} else {
setAvailable(url);
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
createNode(url, ZkNodeType.AVAILABLE_SERVER);
}
}

@Override
protected void doUnavailable(URL url) {
if (url == null) {
for (URL u : getRegisteredServiceUrls()) {
setUnavailable(u);
removeNode(u, ZkNodeType.AVAILABLE_SERVER);
removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
createNode(u, ZkNodeType.UNAVAILABLE_SERVER);
}
} else {
setUnavailable(url);
removeNode(url, ZkNodeType.AVAILABLE_SERVER);
removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
}
}

private void setAvailable(URL url) {
doUnregister(url);
String serverTypePath = toAvailableServerTypePath(url);
createServerNode(url, serverTypePath);
LoggerUtil.info(String.format("[ZookeeperRegistry] set available: path=%s", toServerNodePath(url, serverTypePath)));
}

private void setUnavailable(URL url) {
doUnregister(url);
String serverTypePath = toUnavailableServerTypePath(url);
createServerNode(url, serverTypePath);
LoggerUtil.info(String.format("[ZookeeperRegistry] set unavailable: path=%s", toServerNodePath(url, serverTypePath)));
}

private List<URL> nodeChildsToUrls(String parentPath, List<String> currentChilds) {
List<URL> urls = new ArrayList<URL>();
for (String node : currentChilds) {
Expand All @@ -178,26 +174,6 @@ private List<URL> nodeChildsToUrls(String parentPath, List<String> currentChilds
return urls;
}

private void createServerNode(URL url, String serverTypePath) {
if (!zkClient.exists(serverTypePath)) {
zkClient.createPersistent(serverTypePath, true);
}
zkClient.createEphemeral(toServerNodePath(url, serverTypePath), url.toFullStr());
}

private void createClientNode(URL url, String clientTypePath) {
if (!zkClient.exists(clientTypePath)) {
zkClient.createPersistent(clientTypePath, true);
}
zkClient.createEphemeral(toClientNodePath(url), url.toFullStr());
}

private void deleteNode(String path) {
if (zkClient.exists(path)) {
zkClient.delete(path);
}
}

private String toGroupPath(URL url) {
return MotanConstants.ZOOKEEPER_REGISTRY_NAMESPACE + MotanConstants.PATH_SEPARATOR + url.getGroup();
}
Expand All @@ -206,27 +182,34 @@ private String toServicePath(URL url) {
return toGroupPath(url) + MotanConstants.PATH_SEPARATOR + url.getPath();
}

private String toNodeTypePath(URL url, String nodeType) {
return toServicePath(url) + MotanConstants.PATH_SEPARATOR + nodeType;
}

private String toServerNodePath(URL url, String ServerTypePath) {
return ServerTypePath + MotanConstants.PATH_SEPARATOR + url.getServerPortStr();
}

private String toClientNodePath(URL url) {
return toClientTypePath(url) + MotanConstants.PATH_SEPARATOR + url.getServerPortStr();
private String toNodeTypePath(URL url, ZkNodeType nodeType) {
String type;
if (nodeType == ZkNodeType.AVAILABLE_SERVER) {
type = "server";
} else if (nodeType == ZkNodeType.UNAVAILABLE_SERVER) {
type = "unavailbleServer";
} else {
type = "client";
}
return toServicePath(url) + MotanConstants.PATH_SEPARATOR + type;
}

private String toUnavailableServerTypePath(URL url) {
return toNodeTypePath(url, "unavailbleServer");
protected String toNodePath(URL url, ZkNodeType nodeType) {
return toNodeTypePath(url, nodeType) + MotanConstants.PATH_SEPARATOR + url.getServerPortStr();
}

private String toAvailableServerTypePath(URL url) {
return toNodeTypePath(url, "server");
private void createNode(URL url, ZkNodeType nodeType) {
String nodeTypePath = toNodeTypePath(url, nodeType);
if (!zkClient.exists(nodeTypePath)) {
zkClient.createPersistent(nodeTypePath, true);
}
zkClient.createEphemeral(toNodePath(url, nodeType), url.toFullStr());
}

private String toClientTypePath(URL url) {
return toNodeTypePath(url, "client");
private void removeNode(URL url, ZkNodeType nodeType) {
String nodePath = toNodePath(url, nodeType);
if (zkClient.exists(nodePath)) {
zkClient.delete(nodePath);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.weibo.api.motan.registry.zookeeper;

import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class EmbeddedZookeeper {
private static Properties properties = new Properties();

static {
InputStream in = EmbeddedZookeeper.class.getResourceAsStream("/zoo.cfg");
try {
properties.load(in);
} catch (IOException e) {
e.printStackTrace();
}
}

private ZooKeeperServerMain zookeeperServer;
private Thread t1;

public void start() throws IOException, QuorumPeerConfig.ConfigException {
Properties properties = new Properties();
InputStream in = EmbeddedZookeeper.class.getResourceAsStream("/zoo.cfg");
properties.load(in);

QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
quorumConfiguration.parseProperties(properties);
in.close();

zookeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);

t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
zookeeperServer.runFromConfig(configuration);
} catch (IOException e) {
}
}
});
t1.start();
}
}
Loading

0 comments on commit 83183d6

Please sign in to comment.