Skip to content

Commit

Permalink
消费者端启动时平衡所有服务
Browse files Browse the repository at this point in the history
  • Loading branch information
wephone committed Jan 11, 2018
1 parent 5aa9580 commit d577bfe
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 120 deletions.
16 changes: 15 additions & 1 deletion src/main/java/org/meizhuo/rpc/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.meizhuo.rpc.core.RPC;
import org.meizhuo.rpc.zksupport.LoadBalance.LoadBalance;
import org.meizhuo.rpc.zksupport.LoadBalance.MinConnectRandom;
import org.meizhuo.rpc.zksupport.ZKConnect;
import org.meizhuo.rpc.zksupport.service.ZKClientService;
import org.springframework.beans.BeansException;
Expand All @@ -25,6 +27,7 @@ public class ClientConfig implements ApplicationContextAware {
private long overtime;
//远程调用接口全类名集合 用于启动时向zookeeper注册提供者服务
private Set<String> serviceInterface;
private LoadBalance loadBalance;

public String getZooKeeperHost() {
return zooKeeperHost;
Expand All @@ -50,6 +53,14 @@ public void setServiceInterface(Set serviceInterface) {
this.serviceInterface = serviceInterface;
}

public LoadBalance getLoadBalance() {
return loadBalance;
}

public void setLoadBalance(LoadBalance loadBalance) {
this.loadBalance = loadBalance;
}

/**
* 加载Spring配置文件时,如果Spring配置文件中所定义的Bean类
* 如果该类实现了ApplicationContextAware接口
Expand All @@ -65,7 +76,10 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
ZooKeeper zooKeeper= new ZKConnect().clientConnect();
ZKClientService zkClientService=new ZKClientService(zooKeeper);
zkClientService.createClientService();
//TODO 获取提供者调用者ip及数量 并监听 即对所有服务开启平衡
//获取提供者调用者ip及数量 并监听 即对所有服务开启平衡
//负载均衡类设置prototype作用域
LoadBalance loadBalance=RPC.getClientConfig().getLoadBalance();
loadBalance.balanceAll(zooKeeper);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/meizhuo/rpc/client/IPChannelInfo.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.meizhuo.rpc.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -12,16 +13,16 @@
*/
public class IPChannelInfo {

private ChannelHandlerContext channelHandlerContext;
private Channel channel;
//保证多线程修改时引用计数正确
private AtomicInteger serviceQuoteNum;

public ChannelHandlerContext getChannelHandlerContext() {
return channelHandlerContext;
public Channel getChannel() {
return channel;
}

public void setChannelHandlerContext(ChannelHandlerContext channelHandlerContext) {
this.channelHandlerContext = channelHandlerContext;
public void setChannel(Channel channel) {
this.channel = channel;
}

public Integer getServiceQuoteNum() {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/meizhuo/rpc/client/RPCProxyHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
// Lock lock = new ReentrantLock();
// Condition condition=lock.newCondition();
// System.out.println("Invoke Method Thread:"+Thread.currentThread().getName());
RPCRequestNet.requestLockMap.put(request.getRequestID(),request);
RPCRequestNet.getInstance().requestLockMap.put(request.getRequestID(),request);
// lock.lock();//获取锁
RPCRequestNet.connect().send(request);
RPCRequestNet.getInstance().send(request);
//调用用结束后移除对应的condition映射关系
RPCRequestNet.requestLockMap.remove(request.getRequestID());
RPCRequestNet.getInstance().requestLockMap.remove(request.getRequestID());
// lock.unlock();
return request.getResult();//目标方法的返回结果
}
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/org/meizhuo/rpc/client/RPCRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
*/
public class RPCRequestHandler extends ChannelHandlerAdapter {

public static ChannelHandlerContext channelCtx;
// public static ChannelHandlerContext channelCtx;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("ChannelActive Thread:"+Thread.currentThread().getName());
channelCtx=ctx;
//需要在lock和unlock的包裹下 否则报出IllegalMonitorStateException
RPCRequestNet.getInstance().connectlock.lock();
RPCRequestNet.getInstance().connectCondition.signalAll();
RPCRequestNet.getInstance().connectlock.unlock();
}
// @Override
// public void channelActive(ChannelHandlerContext ctx) throws Exception {
//// System.out.println("ChannelActive Thread:"+Thread.currentThread().getName());
// channelCtx=ctx;
// //需要在lock和unlock的包裹下 否则报出IllegalMonitorStateException
// RPCRequestNet.getInstance().connectlock.lock();
// RPCRequestNet.getInstance().connectCondition.signalAll();
// RPCRequestNet.getInstance().connectlock.unlock();
// }

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Expand Down
97 changes: 52 additions & 45 deletions src/main/java/org/meizhuo/rpc/client/RPCRequestNet.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
*/
public class RPCRequestNet {

public Map requestLockMap=new ConcurrentHashMap<String,RPCRequest>();//全局map 每个请求对应的锁 用于同步等待每个异步的RPC请求
public Lock connectlock=new ReentrantLock();//阻塞等待连接成功的锁
public Condition connectCondition=connectlock.newCondition();
//全局map 每个请求对应的锁 用于同步等待每个异步的RPC请求
public Map requestLockMap=new ConcurrentHashMap<String,RPCRequest>();
//每个IP对应一个锁 防止重复连接一个IP多次
public Map<String,Lock> connectlock=new ConcurrentHashMap<String,Lock>();
//服务名称 映射 服务信息类
public ConcurrentHashMap<String,ServiceInfo> serviceNameInfoMap;
//IP地址 映射 对应的NIO Channel及其引用次数
Expand All @@ -40,38 +41,46 @@ public class RPCRequestNet {
private static RPCRequestNet instance;

private RPCRequestNet() {
//TODO 后续改为根据配置选择负载均衡策略
loadBalance=new MinConnectRandom();
loadBalance=RPC.getClientConfig().getLoadBalance();
}

//负载均衡获取对应IP 端口后发起连接
private void connect(String ip){
String[] IPArr=ip.split(":");
String host=IPArr[0];
Integer port=Integer.valueOf(IPArr[2]);
//netty线程组
EventLoopGroup group=new NioEventLoopGroup();
//启动辅助类 用于配置各种参数
Bootstrap b=new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(2048));//以换行符分包 防止粘包半包 2048为最大长度 到达最大长度没出现换行符则抛出异常
socketChannel.pipeline().addLast(new StringDecoder());//将接收到的对象转为字符串
//添加相应回调处理和编解码器
socketChannel.pipeline().addLast(new RPCRequestHandler());
}
});
try {
//TODO 从自定义标签配置中读取参数 启动网络连接
ChannelFuture f=b.connect(host,port).sync();
// f.channel().closeFuture().sync();//会造成阻塞构造方法
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
private Channel connect(String ip){
Channel channel=IPChannelMap.get(ip).getChannel();
if (channel==null){
//尚未有channel则新连接 每个IP加单独的锁 防止重复连接
Lock lock=new ReentrantLock();
connectlock.putIfAbsent(ip,lock);
connectlock.get(ip).lock();
if (IPChannelMap.get(ip).getChannel()==null){
String[] IPArr=ip.split(":");
String host=IPArr[0];
Integer port=Integer.valueOf(IPArr[1]);
//netty线程组
EventLoopGroup group=new NioEventLoopGroup();
//启动辅助类 用于配置各种参数
Bootstrap b=new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(2048));//以换行符分包 防止粘包半包 2048为最大长度 到达最大长度没出现换行符则抛出异常
socketChannel.pipeline().addLast(new StringDecoder());//将接收到的对象转为字符串
//添加相应回调处理和编解码器
socketChannel.pipeline().addLast(new RPCRequestHandler());
}
});
try {
//同步等待连接成功
ChannelFuture f=b.connect(host,port).sync();
channel=f.channel();
//将连接成功的通道放入Map
IPChannelMap.get(ip).setChannel(channel);
// f.addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture channelFuture) throws Exception {
/*
* sync内部会在NIO线程wait阻塞 导致这个接口回调时阻塞和唤醒可能都在同一线程造成死锁
* 不论是用户直接关闭或者eventLoop的轮询状态关闭,都会在eventLoop的线程内完成notify动作,所以不要在IO线程内调用future对象的sync或者await方法
Expand All @@ -80,12 +89,15 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
所有执行channel.closeFuture().sync()就是执行的CloseFuture的sync方法,从上面的解释可以知道,这步是会将当前线程阻塞在CloseFuture上
*/
// f.channel().closeFuture().sync();//应用程序会一直等待,直到channel关闭 这句会造成死锁异常BlockingOperationException
// }
// });
} catch (InterruptedException e) {
e.printStackTrace();
}
});

} catch (InterruptedException e) {
e.printStackTrace();
}
connectlock.get(ip).unlock();
}
return channel;
}

//单例模式 避免重复连接 构造方法中进行连接操作
Expand All @@ -102,16 +114,11 @@ public static RPCRequestNet getInstance(){

//向实现端发送请求
public void send(RPCRequest request){
String serviceName=request.getClassName();
String ip=loadBalance.chooseIP(serviceName);
Channel channel=connect(ip);
// System.out.println("Send RPC Thread:"+Thread.currentThread().getName());
try {
//判断连接是否已完成 只在连接启动时会产生阻塞
if (RPCRequestHandler.channelCtx==null){
connectlock.lock();
//挂起等待连接成功
System.out.println("正在等待连接实现端");
connectCondition.await();
connectlock.unlock();
}
//编解码对象为json 发送请求
String requestJson= null;
try {
Expand All @@ -120,7 +127,7 @@ public void send(RPCRequest request){
e.printStackTrace();
}
ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes());
RPCRequestHandler.channelCtx.writeAndFlush(requestBuf);
channel.writeAndFlush(requestBuf);
System.out.println("调用"+request.getRequestID()+"已发送");
//挂起等待实现端处理完毕返回 TODO 后续配置超时时间
synchronized (request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
public interface LoadBalance {

// //平衡连接消费者端的所有服务 仅在启动时使用
// void balanceAll();
//平衡连接消费者端的所有服务 仅在启动时使用
void balanceAll(ZooKeeper zooKeeper);

/**
* 负载均衡的连接平衡操作
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package org.meizhuo.rpc.zksupport.LoadBalance;

import io.netty.channel.ChannelHandlerContext;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.meizhuo.rpc.client.IPChannelInfo;
import org.meizhuo.rpc.client.RPCRequest;
import org.meizhuo.rpc.client.RPCRequestNet;
import org.meizhuo.rpc.zksupport.ZKTempZnodes;
import org.meizhuo.rpc.core.RPC;
import org.meizhuo.rpc.zksupport.service.ServiceInfo;
import org.meizhuo.rpc.zksupport.service.ZKClientService;
import org.meizhuo.rpc.zksupport.service.ZKServerService;
Expand All @@ -28,13 +25,33 @@
*/
public class MinConnectRandom implements LoadBalance{

@Override
public void balanceAll(ZooKeeper zookeeper) {
Set<String> allServices= RPC.getClientConfig().getServiceInterface();
ZKClientService zkClientService=new ZKClientService(zookeeper);
ZKServerService zkServerService=new ZKServerService(zookeeper);
try {
for (String service : allServices) {
List<String> clientZnodes = zkClientService.getServiceClients(service);
balance(zookeeper, service, clientZnodes, ZnodeType.consumer);
List<String> serverZnodes = zkServerService.getAllServiceIP(service);
balance(zookeeper, service, serverZnodes, ZnodeType.provider);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

@Override
public void balance(ZooKeeper zooKeeper, String serviceName, List<String> znodes, ZnodeType type) {
final String service=serviceName;
Runnable runnable=new Runnable() {
@Override
public void run() {
ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
System.out.println(serviceName+"正在平衡...已加写锁");
//不存在则新建一个锁键值对 存在则不操作
BalanceThreadPool.serviceLockMap.putIfAbsent(service,readWriteLock);
//上写锁
Expand Down Expand Up @@ -78,6 +95,7 @@ public void run() {
public String chooseIP(String serviceName) {
//获取serviceInfo上读锁
BalanceThreadPool.serviceLockMap.get(serviceName).readLock().lock();
System.out.println(serviceName+"正在选择IP...已加读锁");
ConcurrentSkipListSet<String> IPSet=RPCRequestNet.getInstance().serviceNameInfoMap.get(serviceName)
.getConnectIPSet();
//释放读锁
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ public void createClientService() throws KeeperException, InterruptedException {
ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
Set<String> services= RPC.getClientConfig().getServiceInterface();
for (String serviceName:services){
//创建消费者服务节点 单纯放个1作为数据就行了
zkTempZnodes.createTempSeqZnode(ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.consumersPath+ZKConst.consumerSeqNodePath,"1");
//创建消费者服务节点 不放数据
zkTempZnodes.createTempSeqZnode(ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.consumersPath+ZKConst.consumerSeqNodePath,null);
}
}

public int getClientServiceNum(String serviceName) throws KeeperException, InterruptedException {
//获得这个服务所有的客户端 包含监听的注册
public List<String> getServiceClients(String serviceName) throws KeeperException, InterruptedException {
ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
IPWatcher ipWatcher=new IPWatcher(zooKeeper);
List<String> children=zkTempZnodes.getPathChildren(ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.consumersPath,ipWatcher);
return children.size();
ConsumerWatcher consumerWatcher=new ConsumerWatcher(zooKeeper);
List<String> children=zkTempZnodes.getPathChildren(ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.consumersPath,consumerWatcher);
return children;
}

// private List<String> getAllClients(String serviceName) throws KeeperException, InterruptedException {
Expand All @@ -61,25 +62,25 @@ public int getClientServiceNum(String serviceName) throws KeeperException, Inter
// }

//获得并监听所有服务的信息 包括调用者和提供者
public void getWatchAllServiceInfo() throws KeeperException, InterruptedException {
Set<String> serviceSet=RPC.getClientConfig().getServiceInterface();
for (String serviceName:serviceSet){
System.out.println("MeiZhuoRPC get service:"+serviceName);
ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
IPWatcher providerWatcher=new IPWatcher(zooKeeper);
List<String> providerIP=zkTempZnodes.getPathChildren(
ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.providersPath,
providerWatcher);
ConsumerWatcher consumerWatcher=new ConsumerWatcher();
List<String> consumerNodes=zkTempZnodes.getPathChildren(
ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.consumersPath,
consumerWatcher);
ServiceInfo serviceInfo=new ServiceInfo();
serviceInfo.setClientCount(providerIP.size());
serviceInfo.setServerCount(consumerNodes.size());

}
}
// public void getWatchAllServiceInfo() throws KeeperException, InterruptedException {
// Set<String> serviceSet=RPC.getClientConfig().getServiceInterface();
// for (String serviceName:serviceSet){
// System.out.println("MeiZhuoRPC get service:"+serviceName);
// ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
// IPWatcher providerWatcher=new IPWatcher(zooKeeper);
// List<String> providerIP=zkTempZnodes.getPathChildren(
// ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.providersPath,
// providerWatcher);
// ConsumerWatcher consumerWatcher=new ConsumerWatcher();
// List<String> consumerNodes=zkTempZnodes.getPathChildren(
// ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.consumersPath,
// consumerWatcher);
// ServiceInfo serviceInfo=new ServiceInfo();
// serviceInfo.setClientCount(providerIP.size());
// serviceInfo.setServerCount(consumerNodes.size());
//
// }
// }

/**
* 增加服务端某个服务IP的连接数
Expand Down
Loading

0 comments on commit d577bfe

Please sign in to comment.