Skip to content

Commit

Permalink
服务端节点创建
Browse files Browse the repository at this point in the history
  • Loading branch information
wephone committed Jan 11, 2018
1 parent 4f095d5 commit 6ff44a6
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 3 deletions.
Binary file modified out/artifacts/MeiZhuoRPC_jar/MeiZhuoRPC.jar
Binary file not shown.
1 change: 1 addition & 0 deletions src/main/java/org/meizhuo/rpc/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
try {
ZooKeeper zooKeeper= new ZKConnect().clientConnect();
ZKClientService zkClientService=new ZKClientService(zooKeeper);
zkClientService.initZnode();
zkClientService.createClientService();
//获取提供者调用者ip及数量 并监听 即对所有服务开启平衡
//负载均衡类设置prototype作用域
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/meizhuo/rpc/core/RPC.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static void start() throws InterruptedException, IOException {
ZooKeeper zooKeeper= new ZKConnect().serverConnect();
ZKServerService zkServerService=new ZKServerService(zooKeeper);
try {
zkServerService.initZnode();
//创建所有提供者服务的znode
zkServerService.createServerService();
} catch (KeeperException e) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/meizhuo/rpc/server/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ public String getZooKeeperHost() {
}

public void setZooKeeperHost(String zooKeeperHost) {
zooKeeperHost = zooKeeperHost;
this.zooKeeperHost = zooKeeperHost;
}

public String getServerHost() {
return serverHost;
//获取时带上端口
return serverHost+":"+port;
}

public void setServerHost(String serverHost) {
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/org/meizhuo/rpc/zksupport/ZKTempZnodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,25 @@ public Stat exists(String path) throws KeeperException, InterruptedException {

//创建临时顺序节点
public void createTempSeqZnode(String path,String data) throws KeeperException, InterruptedException {
byte[] bytes=data.getBytes();
byte[] bytes=null;
if (data!=null){
bytes=data.getBytes();
}
zooKeeper.create(path,bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}

//创建普通持久节点 已存在则不创建
public void createSimpleZnode(String path,String data) throws KeeperException, InterruptedException {
Stat stat=exists(path);
if (stat==null) {
byte[] bytes = null;
if (data != null) {
bytes = data.getBytes();
}
zooKeeper.create(path, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}

//获得某一服务的所有提供者ip
public List<String> getPathChildren(String path, Watcher watcher) throws KeeperException, InterruptedException {
List<String> children= zooKeeper.getChildren(path,watcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -85,6 +86,20 @@ public List<String> getServiceClients(String serviceName) throws KeeperException
// }
// }

//初始化根节点及服务消费者者节点 均为持久节点
public void initZnode() throws KeeperException, InterruptedException {
ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
String path=ZKConst.rootPath;
zkTempZnodes.createSimpleZnode(path,null);
path=path+ZKConst.servicePath;
zkTempZnodes.createSimpleZnode(path,null);
Set<String> set=RPC.getClientConfig().getServiceInterface();
for (String service:set){
zkTempZnodes.createSimpleZnode(path+"/"+service,null);
zkTempZnodes.createSimpleZnode(path+"/"+service+"/"+ZKConst.consumersPath,null);
}
}

/**
* 增加服务端某个服务IP的连接数
* 检测到zk异常 例如版本乐观锁报错时 循环尝试
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ public List<String> getAllServiceIP(String serviceName) throws KeeperException,
IPWatcher ipWatcher=new IPWatcher(zooKeeper);
return zkTempZnodes.getPathChildren(ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.providersPath,ipWatcher);
}

//初始化根节点及服务提供者节点 均为持久节点
public void initZnode() throws KeeperException, InterruptedException {
ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
String path=ZKConst.rootPath;
String balancePath=ZKConst.rootPath;
zkTempZnodes.createSimpleZnode(path,null);
balancePath=balancePath+ZKConst.balancePath;
zkTempZnodes.createSimpleZnode(balancePath,null);
path=path+ZKConst.servicePath;
zkTempZnodes.createSimpleZnode(path,null);
Map<String,String> serverImplMap=RPC.getServerConfig().getServerImplMap();
for (Map.Entry<String,String> entry:serverImplMap.entrySet()){
zkTempZnodes.createSimpleZnode(balancePath+"/"+entry.getKey(),null);
zkTempZnodes.createSimpleZnode(path+"/"+entry.getKey(),null);
zkTempZnodes.createSimpleZnode(path+"/"+entry.getKey()+ZKConst.providersPath,null);
}
}
//
// //设置提供者数量并监听
// public void watchAllServerService() throws KeeperException, InterruptedException {
Expand Down

0 comments on commit 6ff44a6

Please sign in to comment.