Skip to content

Commit

Permalink
feat: 序列化层的开发
Browse files Browse the repository at this point in the history
添加了常见得序列化技术(hessian2、kryo、jdk、fastjson)
  • Loading branch information
shaogezhu committed Mar 4, 2023
1 parent d9e3675 commit b869fb4
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 12 deletions.
16 changes: 16 additions & 0 deletions easy-rpc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
<zookeeper.version>3.4.14</zookeeper.version>
<zkClient.version>0.7</zkClient.version>
<curator.version>2.12.0</curator.version>
<hessian.version>4.0.62</hessian.version>
<kryo.version>3.0.3</kryo.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
Expand All @@ -46,6 +48,20 @@
<version>${slf4j-api.version}</version>
</dependency>

<!--kryo序列化工具-->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>${kryo.version}</version>
</dependency>

<!--hessian序列化工具-->
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>${hessian.version}</version>
</dependency>

<!--fastjson序列化工具-->
<dependency>
<groupId>com.alibaba</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import com.shaogezhu.easy.rpc.core.registy.zookeeper.ZookeeperRegister;
import com.shaogezhu.easy.rpc.core.router.RandomRouterImpl;
import com.shaogezhu.easy.rpc.core.router.RotateRouterImpl;
import com.shaogezhu.easy.rpc.core.serialize.fastjson.FastJsonSerializeFactory;
import com.shaogezhu.easy.rpc.core.serialize.hessian.HessianSerializeFactory;
import com.shaogezhu.easy.rpc.core.serialize.jdk.JdkSerializeFactory;
import com.shaogezhu.easy.rpc.core.serialize.kryo.KryoSerializeFactory;
import com.shaogezhu.easy.rpc.interfaces.DataService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -75,6 +79,25 @@ protected void initChannel(SocketChannel ch) throws Exception {
RpcListenerLoader rpcListenerLoader = new RpcListenerLoader();
rpcListenerLoader.init();

//初始化序列化器
String clientSerialize = clientConfig.getClientSerialize();
switch (clientSerialize) {
case JDK_SERIALIZE_TYPE:
CLIENT_SERIALIZE_FACTORY = new JdkSerializeFactory();
break;
case FAST_JSON_SERIALIZE_TYPE:
CLIENT_SERIALIZE_FACTORY = new FastJsonSerializeFactory();
break;
case HESSIAN2_SERIALIZE_TYPE:
CLIENT_SERIALIZE_FACTORY = new HessianSerializeFactory();
break;
case KRYO_SERIALIZE_TYPE:
CLIENT_SERIALIZE_FACTORY = new KryoSerializeFactory();
break;
default:
throw new RuntimeException("no match serialize type for" + clientSerialize);
}

//初始化代理工厂
RpcReference rpcReference;
if (JAVASSIST_PROXY_TYPE.equals(clientConfig.getProxyType())) {
Expand All @@ -92,6 +115,7 @@ public void initClientConfig() {
clientConfig.setApplicationName("easy-rpc-client");
clientConfig.setProxyType("JDK");
clientConfig.setRouterStrategy("random");
clientConfig.setClientSerialize("kryo");
this.setClientConfig(clientConfig);
}

Expand Down Expand Up @@ -162,9 +186,10 @@ public void run() {
try {
//阻塞模式
RpcInvocation data = SEND_QUEUE.take();
String json = JSON.toJSONString(data);
//进行序列化
byte[] serialize = CLIENT_SERIALIZE_FACTORY.serialize(data);
//将RpcInvocation封装到RpcProtocol对象中,然后发送给服务端
RpcProtocol rpcProtocol = new RpcProtocol(json.getBytes());
RpcProtocol rpcProtocol = new RpcProtocol(serialize);
//获取netty通道
ChannelFuture channelFuture = ConnectionHandler.getChannelFuture(data.getTargetServiceName());
//netty的通道负责发送数据给服务端
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.shaogezhu.easy.rpc.core.client;

import com.alibaba.fastjson.JSON;
import com.shaogezhu.easy.rpc.core.common.RpcInvocation;
import com.shaogezhu.easy.rpc.core.common.RpcProtocol;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import static com.shaogezhu.easy.rpc.core.common.cache.CommonClientCache.CLIENT_SERIALIZE_FACTORY;
import static com.shaogezhu.easy.rpc.core.common.cache.CommonClientCache.RESP_MAP;

/**
Expand All @@ -18,9 +18,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcProtocol rpcProtocol = (RpcProtocol) msg;
byte[] reqContent = rpcProtocol.getContent();
String json = new String(reqContent,0,reqContent.length);
RpcInvocation rpcInvocation = JSON.parseObject(json, RpcInvocation.class);
RpcInvocation rpcInvocation = CLIENT_SERIALIZE_FACTORY.deserialize(rpcProtocol.getContent(), RpcInvocation.class);
//通过之前发送的uuid来注入匹配的响应数值
if(!RESP_MAP.containsKey(rpcInvocation.getUuid())){
throw new IllegalArgumentException("server response is error!");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package com.shaogezhu.easy.rpc.core.common;

import java.io.Serializable;
import java.util.Arrays;

/**
* @Author peng
* @Date 2023/2/24
* @description:
*/
public class RpcInvocation {
public class RpcInvocation implements Serializable {

private static final long serialVersionUID = 2951293262547830249L;

/**
* 请求的目标方法, 例如sendData
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.shaogezhu.easy.rpc.core.common.RpcInvocation;
import com.shaogezhu.easy.rpc.core.registy.URL;
import com.shaogezhu.easy.rpc.core.router.Router;
import com.shaogezhu.easy.rpc.core.serialize.SerializeFactory;

import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -38,6 +39,8 @@ public class CommonClientCache {
//每次进行远程调用的时候都是从这里面去选择服务提供者
public static Map<String, ChannelFutureWrapper[]> SERVICE_ROUTER_MAP = new ConcurrentHashMap<>();
public static ChannelFuturePollingRef CHANNEL_FUTURE_POLLING_REF = new ChannelFuturePollingRef();

//路由组件
public static Router ROUTER;
//客户端序列化工厂
public static SerializeFactory CLIENT_SERIALIZE_FACTORY;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.shaogezhu.easy.rpc.core.registy.RegistryService;
import com.shaogezhu.easy.rpc.core.registy.URL;
import com.shaogezhu.easy.rpc.core.serialize.SerializeFactory;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -26,4 +27,8 @@ public class CommonServerCache {
* 注册中心:用于服务端 服务的注册url和下线
*/
public static RegistryService REGISTRY_SERVICE;
/**
* 服务端序列化工厂
*/
public static SerializeFactory SERVER_SERIALIZE_FACTORY;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ public class ClientConfig {
*/
private String routerStrategy;

/**
* 客户端序列化方式 example: hessian2,kryo,jdk,fastjson
*/
private String clientSerialize;

public String getRegisterAddr() {
return registerAddr;
}
Expand Down Expand Up @@ -52,4 +57,12 @@ public String getRouterStrategy() {
public void setRouterStrategy(String routerStrategy) {
this.routerStrategy = routerStrategy;
}

public String getClientSerialize() {
return clientSerialize;
}

public void setClientSerialize(String clientSerialize) {
this.clientSerialize = clientSerialize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public class ServerConfig {

private String applicationName;

/**
* 服务端序列化方式 example: hessian2,kryo,jdk,fastjson
*/
private String serverSerialize;

public Integer getPort() {
return port;
}
Expand All @@ -36,4 +41,12 @@ public String getApplicationName() {
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}

public String getServerSerialize() {
return serverSerialize;
}

public void setServerSerialize(String serverSerialize) {
this.serverSerialize = serverSerialize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ public class RpcConstants {
public static final String RANDOM_ROUTER_TYPE = "random";

public static final String ROTATE_ROUTER_TYPE = "rotate";

public static final String JDK_SERIALIZE_TYPE = "jdk";

public static final String FAST_JSON_SERIALIZE_TYPE = "fastJson";

public static final String HESSIAN2_SERIALIZE_TYPE = "hessian2";

public static final String KRYO_SERIALIZE_TYPE = "kryo";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.shaogezhu.easy.rpc.core.serialize;

/**
* @Author peng
* @Date 2023/3/4
* @description: 序列化的接口
*/
public interface SerializeFactory {


/**
* 序列化
*
* @param t
* @param <T>
* @return
*/
<T> byte[] serialize(T t);

/**
* 反序列化
*
* @param data
* @param clazz
* @param <T>
* @return
*/
<T> T deserialize(byte[] data, Class<T> clazz);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.shaogezhu.easy.rpc.core.serialize.fastjson;

import com.alibaba.fastjson.JSON;
import com.shaogezhu.easy.rpc.core.serialize.SerializeFactory;

/**
* @Author peng
* @Date 2023/3/4
* @description: FastJson序列化工厂
*/
public class FastJsonSerializeFactory implements SerializeFactory {

@Override
public <T> byte[] serialize(T t) {
String jsonStr = JSON.toJSONString(t);
return jsonStr.getBytes();
}

@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
return JSON.parseObject(new String(data),clazz);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.shaogezhu.easy.rpc.core.serialize.hessian;

import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import com.shaogezhu.easy.rpc.core.serialize.SerializeFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
* @Author peng
* @Date 2023/3/4
* @description: Hessian序列化工厂
*/
public class HessianSerializeFactory implements SerializeFactory {

@Override
public <T> byte[] serialize(T t) {
byte[] data = null;
try {
ByteArrayOutputStream os = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(os);
output.writeObject(t);
output.getBytesOutputStream().flush();
output.completeMessage();
output.close();
data = os.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}
return data;
}

@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
if (data == null) {
return null;
}
Object result = null;
try {
ByteArrayInputStream is = new ByteArrayInputStream(data);
Hessian2Input input = new Hessian2Input(is);
result = input.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
return (T) result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.shaogezhu.easy.rpc.core.serialize.jdk;

import com.shaogezhu.easy.rpc.core.serialize.SerializeFactory;

import java.io.*;

/**
* @Author peng
* @Date 2023/3/4
* @description: JDK序列化工厂
*/
public class JdkSerializeFactory implements SerializeFactory {


@Override
public <T> byte[] serialize(T t) {
byte[] data = null;
try {
ByteArrayOutputStream os = new ByteArrayOutputStream();
ObjectOutputStream output = new ObjectOutputStream(os);
output.writeObject(t);
output.flush();
output.close();
data = os.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}
return data;
}

@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
ByteArrayInputStream is = new ByteArrayInputStream(data);
try {
ObjectInputStream input = new ObjectInputStream(is);
Object result = input.readObject();
return ((T) result);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit b869fb4

Please sign in to comment.