Skip to content

Commit

Permalink
fix: 注册中心层模块的开发
Browse files Browse the repository at this point in the history
1.基于zookeeper作为注册中心进行了统一的访问接口封装与实现
2.统一将节点的更新后的相关操作通过事件的机制来实现代码解偶
3.将对于netty连接的管理操作统一封装在了ConnectionHandler类中
  • Loading branch information
shaogezhu committed Mar 1, 2023
1 parent d1be8cb commit daa8711
Show file tree
Hide file tree
Showing 23 changed files with 1,407 additions and 39 deletions.
22 changes: 22 additions & 0 deletions easy-rpc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
<jboss-marshalling-serial.version>1.4.11.Final</jboss-marshalling-serial.version>
<slf4j-api.version>1.7.13</slf4j-api.version>
<javassist.version>3.21.0-GA</javassist.version>
<zookeeper.version>3.4.14</zookeeper.version>
<zkClient.version>0.7</zkClient.version>
<curator.version>2.12.0</curator.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down Expand Up @@ -64,6 +67,25 @@
<version>${jboss-marshalling-serial.version}</version>
</dependency>

<!--Zookeeper 组件-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkClient.version}</version>
</dependency>

<dependency>
<groupId>com.shaogezhu</groupId>
<artifactId>easy-rpc-interface</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
import com.shaogezhu.easy.rpc.core.common.RpcInvocation;
import com.shaogezhu.easy.rpc.core.common.RpcProtocol;
import com.shaogezhu.easy.rpc.core.common.config.ClientConfig;
import com.shaogezhu.easy.rpc.core.common.event.RpcListenerLoader;
import com.shaogezhu.easy.rpc.core.common.utils.CommonUtil;
import com.shaogezhu.easy.rpc.core.proxy.javassist.JavassistProxyFactory;
import com.shaogezhu.easy.rpc.core.proxy.jdk.JDKProxyFactory;
import com.shaogezhu.easy.rpc.core.registy.AbstractRegister;
import com.shaogezhu.easy.rpc.core.registy.URL;
import com.shaogezhu.easy.rpc.core.registy.zookeeper.ZookeeperRegister;
import com.shaogezhu.easy.rpc.interfaces.DataService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
Expand All @@ -18,6 +24,7 @@
import java.util.List;

import static com.shaogezhu.easy.rpc.core.common.cache.CommonClientCache.SEND_QUEUE;
import static com.shaogezhu.easy.rpc.core.common.cache.CommonClientCache.SUBSCRIBE_SERVICE_LIST;

/**
* @Author peng
Expand All @@ -27,6 +34,14 @@ public class Client {

private ClientConfig clientConfig;

private AbstractRegister abstractRegister;

private final Bootstrap bootstrap = new Bootstrap();

public Bootstrap getBootstrap() {
return bootstrap;
}

public ClientConfig getClientConfig() {
return clientConfig;
}
Expand All @@ -35,7 +50,7 @@ public void setClientConfig(ClientConfig clientConfig) {
this.clientConfig = clientConfig;
}

public RpcReference startClientApplication() throws InterruptedException {
public RpcReference initClientApplication() throws InterruptedException {
NioEventLoopGroup clientGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup);
Expand All @@ -50,20 +65,72 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
});

ChannelFuture channelFuture = bootstrap.connect(clientConfig.getServerAddr(), clientConfig.getPort()).sync();
System.out.println("========== Client start success ==========");
this.startClient(channelFuture);
//初始化连接器
ConnectionHandler.setBootstrap(bootstrap);

//初始化监听器
RpcListenerLoader rpcListenerLoader = new RpcListenerLoader();
rpcListenerLoader.init();

//初始化代理工厂
RpcReference rpcReference;
if ("javassist".equals(clientConfig.getProxyType())) {
rpcReference = new RpcReference(new JavassistProxyFactory());
} else {
rpcReference = new RpcReference(new JDKProxyFactory());
}

return rpcReference;
}

public void initClientConfig() {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setRegisterAddr("localhost:2181");
clientConfig.setApplicationName("easy-rpc-client");
clientConfig.setProxyType("JDK");
this.setClientConfig(clientConfig);
}


/**
* 启动服务之前需要预先订阅对应的dubbo服务
*/
public void doSubscribeService(Class<?> serviceBean) {
if (abstractRegister == null) {
abstractRegister = new ZookeeperRegister(clientConfig.getRegisterAddr());
}
URL url = new URL();
url.setApplicationName(clientConfig.getApplicationName());
url.setServiceName(serviceBean.getName());
url.addParameter("host", CommonUtil.getIpAddress());
abstractRegister.subscribe(url);
}

//注入代理工厂
return new RpcReference(new JDKProxyFactory());
// return new RpcReference(new JavassistProxyFactory());
/**
* 开始和各个provider建立连接
*/
public void doConnectServer() {
for (String providerServiceName : SUBSCRIBE_SERVICE_LIST) {
List<String> providerIps = abstractRegister.getProviderIps(providerServiceName);
for (String providerIp : providerIps) {
try {
ConnectionHandler.connect(providerServiceName, providerIp);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
URL url = new URL();
url.setServiceName(providerServiceName);
//客户端在此新增一个订阅的功能
abstractRegister.doAfterSubscribe(url);
}
}

/**
* 开启发送线程,专门从事将数据包发送给服务端
*/
private void startClient(ChannelFuture channelFuture) {
Thread asyncSendJob = new Thread(new AsyncSendJob(channelFuture));
private void startClient() {
Thread asyncSendJob = new Thread(new AsyncSendJob());
asyncSendJob.start();
}

Expand All @@ -72,10 +139,7 @@ private void startClient(ChannelFuture channelFuture) {
*/
class AsyncSendJob implements Runnable {

private final ChannelFuture channelFuture;

public AsyncSendJob(ChannelFuture channelFuture) {
this.channelFuture = channelFuture;
public AsyncSendJob() {
}

@Override
Expand All @@ -87,6 +151,8 @@ public void run() {
String json = JSON.toJSONString(data);
//将RpcInvocation封装到RpcProtocol对象中,然后发送给服务端
RpcProtocol rpcProtocol = new RpcProtocol(json.getBytes());
//获取netty通道
ChannelFuture channelFuture = ConnectionHandler.getChannelFuture(data.getTargetServiceName());
//netty的通道负责发送数据给服务端
channelFuture.channel().writeAndFlush(rpcProtocol);
} catch (InterruptedException e) {
Expand All @@ -98,22 +164,28 @@ public void run() {

public static void main(String[] args) throws Throwable {
//初始化客户端
ClientConfig clientConfig = new ClientConfig();
clientConfig.setServerAddr("localhost");
clientConfig.setPort(8010);
Client client = new Client();
client.setClientConfig(clientConfig);
RpcReference rpcReference = client.startClientApplication();
DataService dataService = rpcReference.get(DataService.class);
client.initClientConfig();
RpcReference rpcReference = client.initClientApplication();

//订阅服务
client.doSubscribeService(DataService.class);
//建立连接
client.doConnectServer();
client.startClient();
System.out.println("========== Client start success ==========");

//生成代理对象
DataService dataService = rpcReference.get(DataService.class);
//调用远程方法
List<String> list = dataService.getList();
System.out.println(list);

for (int i = 100; i < 999; ++i){
Thread.sleep(1000);
String msg = i+":msg from client.";
dataService.sendData(msg);
String s = dataService.sendData(msg);
System.out.println(i+":"+s);
}
// dataService.testError();
// dataService.testErrorV2();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.shaogezhu.easy.rpc.core.client;

import com.shaogezhu.easy.rpc.core.common.ChannelFutureWrapper;
import com.shaogezhu.easy.rpc.core.common.utils.CommonUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import static com.shaogezhu.easy.rpc.core.common.cache.CommonClientCache.CONNECT_MAP;
import static com.shaogezhu.easy.rpc.core.common.cache.CommonClientCache.SERVER_ADDRESS;

/**
* @Author peng
* @Date 2023/2/28
* @description: 按照单一职责的设计原则,将与连接(建立、断开)有关的功能都统一封装在了一起.
*/
public class ConnectionHandler {

/**
* 核心的连接处理器
* 专门用于负责和服务端构建连接通信
*/
private static Bootstrap bootstrap;

public static void setBootstrap(Bootstrap bootstrap) {
ConnectionHandler.bootstrap = bootstrap;
}

/**
* 构建单个连接通道 元操作,既要处理连接,还要统一将连接进行内存存储管理
*
* @param providerIp
* @return
* @throws InterruptedException
*/
public static void connect(String providerServiceName, String providerIp) throws InterruptedException {
if (bootstrap == null) {
throw new RuntimeException("bootstrap can not be null");
}
//格式错误类型的信息
if(!providerIp.contains(":")){
return;
}
String[] providerAddress = providerIp.split(":");
String ip = providerAddress[0];
int port = Integer.parseInt(providerAddress[1]);
//到底这个channelFuture里面是什么
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
ChannelFutureWrapper channelFutureWrapper = new ChannelFutureWrapper();
channelFutureWrapper.setChannelFuture(channelFuture);
channelFutureWrapper.setHost(ip);
channelFutureWrapper.setPort(port);
SERVER_ADDRESS.add(providerIp);
List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.getOrDefault(providerServiceName, new ArrayList<>());
channelFutureWrappers.add(channelFutureWrapper);
CONNECT_MAP.put(providerServiceName, channelFutureWrappers);
}

/**
* 构建ChannelFuture
* @param ip
* @param port
* @return
* @throws InterruptedException
*/
public static ChannelFuture createChannelFuture(String ip,Integer port) throws InterruptedException {
return bootstrap.connect(ip, port).sync();
}

/**
* 断开连接
*
* @param providerServiceName
* @param providerIp
*/
public static void disConnect(String providerServiceName, String providerIp) {
SERVER_ADDRESS.remove(providerIp);
List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.get(providerServiceName);
if (CommonUtil.isNotEmptyList(channelFutureWrappers)) {
channelFutureWrappers.removeIf(channelFutureWrapper ->
providerIp.equals(channelFutureWrapper.getHost() + ":" + channelFutureWrapper.getPort()));
}
}

/**
* 默认走随机策略获取ChannelFuture
*
* @param providerServiceName
* @return
*/
public static ChannelFuture getChannelFuture(String providerServiceName) {
List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.get(providerServiceName);
if (CommonUtil.isEmptyList(channelFutureWrappers)) {
throw new RuntimeException("no provider exist for " + providerServiceName);
}
return channelFutureWrappers.get(new Random().nextInt(channelFutureWrappers.size())).getChannelFuture();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.shaogezhu.easy.rpc.core.common;

import io.netty.channel.ChannelFuture;

/**
* @Author peng
* @Date 2023/2/27
* @description: 自定义包装类,将netty建立好的ChannelFuture做了一些封装
*/
public class ChannelFutureWrapper {

private String host;

private Integer port;

private ChannelFuture channelFuture;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public Integer getPort() {
return port;
}

public void setPort(Integer port) {
this.port = port;
}

public ChannelFuture getChannelFuture() {
return channelFuture;
}

public void setChannelFuture(ChannelFuture channelFuture) {
this.channelFuture = channelFuture;
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.shaogezhu.easy.rpc.core.common.cache;

import com.shaogezhu.easy.rpc.core.common.ChannelFutureWrapper;
import com.shaogezhu.easy.rpc.core.common.RpcInvocation;
import com.shaogezhu.easy.rpc.core.registy.URL;

import java.util.Map;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -17,5 +19,17 @@ public class CommonClientCache {
* 发送队列
*/
public static BlockingQueue<RpcInvocation> SEND_QUEUE = new ArrayBlockingQueue<>(100);
/**
* 保存处理结果<key:UUID,value:对象>
*/
public static Map<String,Object> RESP_MAP = new ConcurrentHashMap<>();

//provider名称 --> 该服务有哪些集群URL
public static List<String> SUBSCRIBE_SERVICE_LIST = new ArrayList<>();
public static Map<String, List<URL>> URL_MAP = new ConcurrentHashMap<>();
public static Set<String> SERVER_ADDRESS = new HashSet<>();

//每次进行远程调用的时候都是从这里面去选择服务提供者
public static Map<String, List<ChannelFutureWrapper>> CONNECT_MAP = new ConcurrentHashMap<>();

}
Loading

0 comments on commit daa8711

Please sign in to comment.