Skip to content

Commit

Permalink
1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kwsc98 committed Aug 18, 2022
1 parent c3de1dd commit 3797041
Show file tree
Hide file tree
Showing 39 changed files with 119 additions and 151 deletions.
6 changes: 2 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ plugins {
id 'io.spring.dependency-management' version '1.0.12.RELEASE'
}

group = 'pres.krpc'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
compileOnly {
extendsFrom annotationProcessor
Expand All @@ -25,6 +21,8 @@ allprojects {

subprojects {
group = 'pres.krpc'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '8'
apply plugin: 'java'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
Expand Down
2 changes: 0 additions & 2 deletions krpc-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version '1.2-SNAPSHOT'

dependencies {
implementation 'org.apache.curator:curator-x-discovery:5.3.0'
implementation 'io.netty:netty-all:4.1.79.Final'
Expand Down
3 changes: 1 addition & 2 deletions krpc-core/src/main/java/pers/krpc/core/InterfaceContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* krpc
* krpc服务接口Context
* 2022/7/25 15:38
*
* @author wangsicheng
* @since
**/
public class InterfaceContext {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import java.util.List;

/**
* krpc
* krpc服务接口详细Context
* 2022/7/27 15:20
*
* @author wangsicheng
* @since
**/

public class InterfaceContextDetails {
Expand All @@ -25,12 +24,13 @@ public class InterfaceContextDetails {
@Getter
protected Role role;

protected Object object;
@Getter
protected List<Provider> providerList;
@Getter
protected List<Customer> customerList;

protected Object object;

public Object getObject() {
return this.object;
}
Expand All @@ -49,14 +49,12 @@ public InterfaceContextDetails setObject(Object object) {
return this;
}

public InterfaceContextDetails setProviderList(List<Provider> providerList) {
public void setProviderList(List<Provider> providerList) {
this.providerList = providerList;
return this;
}

public InterfaceContextDetails setCustomerList(List<Customer> customerList) {
public void setCustomerList(List<Customer> customerList) {
this.customerList = customerList;
return this;
}

public InterfaceContextDetails setRole(Role role) {
Expand Down
3 changes: 1 addition & 2 deletions krpc-core/src/main/java/pers/krpc/core/InterfaceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
import lombok.Data;

/**
* krpc
* krpc接口信息
* 2022/7/28 14:12
*
* @author wangsicheng
* @since
**/
@Data
public class InterfaceInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import pers.krpc.core.registry.RegistryBuilderFactory;

/**
* krpc
* krpc构建工厂
* 2022/7/28 15:32
*
* @author lanhaifeng
* @since
**/
public class KrpcBuilderFactory {

Expand Down
2 changes: 2 additions & 0 deletions krpc-core/src/main/java/pers/krpc/core/protocol/KrpcMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import lombok.Getter;
import lombok.ToString;

import java.lang.reflect.Method;
import java.util.UUID;
Expand All @@ -14,6 +15,7 @@
* @since
**/
@Getter
@ToString
public class KrpcMsg {

private String uniqueIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import pers.krpc.core.protocol.netty.NettyApplicationContext;
import pers.krpc.core.role.Provider;
import pers.krpc.core.role.ServerInfo;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -23,11 +21,10 @@
import java.util.concurrent.TimeUnit;

/**
* krpc
* krpc请求代理方法
* 2022/8/12 14:57
*
* @author wangsicheng
* @since
**/
@Slf4j
public class NettyInvokerProxy implements InvocationHandler {
Expand All @@ -45,7 +42,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
InterfaceInfo interfaceInfo = interfaceContextDetails.getInterfaceInfo();
KrpcMsg krpcMsg = KrpcMsg.build(interfaceInfo.getInterfaceClass(), method, args, interfaceInfo.getVersion());
Promise<Object> promise = new DefaultPromise<>(new DefaultEventLoop());
NettyApplicationContext.getMSG_CACHE().put(krpcMsg.getUniqueIdentifier(), promise);
NettyApplicationContext.MSG_CACHE.put(krpcMsg.getUniqueIdentifier(), promise);
getChannel(interfaceContextDetails.getProviderList()).writeAndFlush(krpcMsg);
return promise.get(Long.parseLong(interfaceInfo.getTimeout()), TimeUnit.MILLISECONDS);
}
Expand All @@ -55,6 +52,7 @@ private Channel getChannel(List<Provider> providerList) {
log.info("该接口未找到生产者");
throw new RuntimeException();
}
//简单的负载均衡算法
int index = new Random().nextInt(providerList.size());
Provider provider = providerList.get(index);
Channel channel = channelMap.get(provider.toChannelKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@

import com.fasterxml.jackson.databind.json.JsonMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.internal.ObjectUtil;
import pers.krpc.core.protocol.KrpcMsg;

import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;

/**
* krpc解码器
* @author kwsc98
*/
public class KrpcDecoder extends MessageToMessageDecoder<ByteBuf> {

private static final JsonMapper JSON_MAPPER;
Expand All @@ -30,15 +31,15 @@ public class KrpcDecoder extends MessageToMessageDecoder<ByteBuf> {
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String msg = in.toString(CHARSET);
KrpcMsg krpcMsg = JSON_MAPPER.readValue(msg, KrpcMsg.class);
Class<?> clazz = Class.forName(krpcMsg.getClassName());
Method method = clazz.getMethod(krpcMsg.getMethodName(), krpcMsg.getParameterTypes());
Object[] objects = krpcMsg.getParams();
Class<?>[] parameterTypes = krpcMsg.getParameterTypes();
if (Objects.nonNull(objects)) {
for (int i = 0; i < objects.length; i++) {
objects[i] = JSON_MAPPER.convertValue(objects[i], parameterTypes[i]);
}
}
Class<?> clazz = Class.forName(krpcMsg.getClassName());
Method method = clazz.getMethod(krpcMsg.getMethodName(), krpcMsg.getParameterTypes());
krpcMsg.setObject(JSON_MAPPER.convertValue(krpcMsg.getObject(), method.getReturnType()));
out.add(krpcMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import java.nio.charset.Charset;
import java.util.List;

/**
* krpc编码器
* @author kwsc98
*/
public class KrpcEncoder extends MessageToMessageEncoder<KrpcMsg> {

private static final JsonMapper JSON_MAPPER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,47 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.netty.channel.Channel;
import io.netty.util.concurrent.Promise;
import lombok.Getter;
import pers.krpc.core.KrpcApplicationContext;


import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* krpc
* krpcNettyNettyApplicationContext
* 2022/8/12 15:37
*
* @author wangsicheng
**/
public class NettyApplicationContext {

/**
* netty客户端
**/
private final static NettyClient NETTY_CLIENT;

/**
* netty服务端
**/
private NettyService nettyService;


@Getter
private static final Cache<String, Promise<Object>> MSG_CACHE = Caffeine.newBuilder().expireAfterAccess(1000L, TimeUnit.MILLISECONDS).build();
/**
* 采用共享变量的方式,进行异步回调处理
**/
public static final Cache<String, Promise<Object>> MSG_CACHE = Caffeine.newBuilder().expireAfterAccess(10000L, TimeUnit.MILLISECONDS).build();


static {
NETTY_CLIENT = new NettyClient();
}

/**
* 通过客户端获取Channel
**/
public static Channel getChannel(String host, int port) {
return NETTY_CLIENT.getChannel(host, port);
}

/**
* 初始化服务端
**/
public void initService(int port, KrpcApplicationContext krpcApplicationContext) {
//防止初始化多个netty服务
NettyService tempNettyService = this.nettyService;
this.nettyService = new NettyService(port,krpcApplicationContext);
if (Objects.nonNull(tempNettyService)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package pers.krpc.core.protocol.netty;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
Expand All @@ -25,15 +23,14 @@


/**
* 聊天室客户端
* Netty客户端
* @author kwsc98
*/
@Slf4j
public final class NettyClient {


private final Bootstrap bootstrap;


NettyClient() {
EventLoopGroup group = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@

/**
* 客户端处理器
*
* @author kwsc98
*/
@Sharable
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<KrpcMsg> {


@Override
protected void channelRead0(ChannelHandlerContext ctx, KrpcMsg krpcMsg) throws Exception {
log.info("接收到消息" + krpcMsg);
Promise<Object> promise = NettyApplicationContext.getMSG_CACHE().getIfPresent(krpcMsg.getUniqueIdentifier());
protected void channelRead0(ChannelHandlerContext ctx, KrpcMsg krpcMsg) {
log.debug("客户端获取响应:[{}]",krpcMsg);
//从共享变量中获取Promise
Promise<Object> promise = NettyApplicationContext.MSG_CACHE.getIfPresent(krpcMsg.getUniqueIdentifier());
if (Objects.nonNull(promise)) {
promise.setSuccess(krpcMsg.getObject());
}
log.info("client accepted channel: {}", ctx.channel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pers.krpc.core.protocol.codec.KrpcEncoder;

/**
* Netty协议初始化
* @author kwsc98
*/
public class NettyInitializer<T> extends ChannelInitializer<SocketChannel> {
Expand All @@ -42,10 +43,9 @@ public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加行分割器
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// 添加String Decoder和String Encoder,用来进行字符串的转换
pipeline.addLast(new KrpcDecoder());
pipeline.addLast(new KrpcEncoder());
// 最后添加真正的处理器
// 添加处理器
pipeline.addLast(this.channelInboundHandler);
}
}
Loading

0 comments on commit 3797041

Please sign in to comment.