Skip to content

Commit

Permalink
KV服务器与客户端
Browse files Browse the repository at this point in the history
  • Loading branch information
jrjsjtu committed Dec 25, 2017
1 parent 21eeac5 commit 2907d13
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 2 deletions.
25 changes: 25 additions & 0 deletions src/main/java/Client/Handler/ResponseHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package Client.Handler;

import Client.KVChannel;
import KVDatabase.KVHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
* Created by jrj on 17-12-25.
*/
public class ResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
long ackIndex = buf.readLong();
//System.out.println(ackIndex);
KVChannel.awaitClient(ackIndex);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
KVChannel.setChannelHandlerContext(ctx);
}
}
79 changes: 79 additions & 0 deletions src/main/java/Client/KVChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package Client;

import Client.Handler.ResponseHandler;
import KVDatabase.RequestDecoder;
import KVDatabase.RequestHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;
import java.util.HashMap;

/**
* Created by jrj on 17-12-25.
*/
public class KVChannel implements KVProtocol{

//这里的index是客户端分配的每一条消息的id,ack返回之后根据ack中的long找到线程然后唤醒
//这边的String其实起到的就是lock的作用,然客户线程wait在lock上,然后在netty线程中用await唤醒
private static ChannelHandlerContext channelHandlerContext;
private static HashMap<Long,String> indexThreadMap;
//这里的index是一致性hash之后的index
private static HashMap<Integer,ChannelHandlerContext> indexContextMap;

static{
indexThreadMap = new HashMap<Long, String>();
indexContextMap = new HashMap<Integer, ChannelHandlerContext>();
}
EventLoopGroup group;
KVChannel() throws Exception{
Bootstrap b = new Bootstrap();
group = new NioEventLoopGroup(1);
b.group(group).channel(NioSocketChannel.class).
remoteAddress(new InetSocketAddress("127.0.0.1", 30303)).
handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDecoder());
ch.pipeline().addLast(new ResponseHandler());
}
});
b.connect().sync();
}

public static void awaitClient(long index){
String key;
synchronized (indexThreadMap){
key = indexThreadMap.get(index);
}
synchronized (key){
key.notify();
}
}

public static void setChannelHandlerContext(ChannelHandlerContext ctx){
channelHandlerContext = ctx;
}

public void put(long requestIndex, String key, String value) throws Exception {
int payLoadSize = key.length() + value.length() + 8 + 8;
ByteBuf byteBuf = Unpooled.buffer(payLoadSize+4);
byteBuf.writeInt(payLoadSize).writeLong(requestIndex);
byteBuf.writeInt(key.length()).writeBytes(key.getBytes());
byteBuf.writeInt(value.length()).writeBytes(value.getBytes());
synchronized (channelHandlerContext){
indexThreadMap.put(requestIndex,key);
}
synchronized (key){
channelHandlerContext.writeAndFlush(byteBuf);
key.wait();
}
}
}
10 changes: 10 additions & 0 deletions src/main/java/Client/KVProtocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package Client;

import java.io.InputStream;

/**
* Created by jrj on 17-12-25.
*/
public interface KVProtocol {
void put(long requestIndex,String key,String value) throws Exception;
}
7 changes: 7 additions & 0 deletions src/main/java/Client/ShardingManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package Client;

/**
* Created by jrj on 17-12-25.
*/
public class ShardingManager {
}
23 changes: 23 additions & 0 deletions src/main/java/Client/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package Client;

/**
* Created by jrj on 17-12-25.
*/
public class Utils {
public static int byteArrayToInt(byte[] b) {
return b[3] & 0xFF |
(b[2] & 0xFF) << 8 |
(b[1] & 0xFF) << 16 |
(b[0] & 0xFF) << 24;
}

public static byte[] intToByteArray(int a) {
return new byte[] {
(byte) ((a >> 24) & 0xFF),
(byte) ((a >> 16) & 0xFF),
(byte) ((a >> 8) & 0xFF),
(byte) (a & 0xFF)
};
}

}
14 changes: 14 additions & 0 deletions src/main/java/KVDatabase/KVHashMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package KVDatabase;

import javax.print.attribute.HashAttributeSet;
import java.util.HashMap;

/**
* Created by jrj on 17-12-24.
*/
public class KVHashMap {
public static HashMap<String,String> hashMap;
static{
hashMap = new HashMap<String, String>();
}
}
48 changes: 48 additions & 0 deletions src/main/java/KVDatabase/RequestDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package KVDatabase;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.ByteBuffer;

/**
* Created by jrj on 17-12-24.
*/
public class RequestDecoder extends ChannelInboundHandlerAdapter {
ByteBuf header,payLoad;
boolean readHeaderCompleted;
int payLoadSize;
public RequestDecoder(){
readHeaderCompleted = false;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
if (header==null){
header = Unpooled.buffer(4);
}
if (header.writableBytes()>0){
buf.readBytes(header);
//这里是当writeable>0 的情况下writeable = 0,一次request有且仅有一次发生
if (header.writableBytes()==0){
payLoadSize = header.readInt();
if (payLoadSize<0){
throw new Exception("invalid message size");
}else{
payLoad = Unpooled.buffer(payLoadSize);
}
}
}
if (payLoad != null && payLoad.writableBytes()>0){
buf.readBytes(payLoad);
if (payLoad.writableBytes() == 0){
ctx.fireChannelRead(payLoad);
header.resetWriterIndex();
header.resetReaderIndex();
}
}
}
}
32 changes: 32 additions & 0 deletions src/main/java/KVDatabase/RequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package KVDatabase;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
* Created by jrj on 17-12-24.
*/
public class RequestHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
long index = buf.readLong();
int keySize = buf.readInt();
byte[] keyBytes = new byte[keySize];
buf.readBytes(keyBytes);

int valueSize = buf.readInt();
byte[] valueBytes = new byte[valueSize];
buf.readBytes(valueBytes);

KVHashMap.hashMap.put(new String(keyBytes),new String(valueBytes));
ByteBuf byteBuf = Unpooled.buffer(12);
byteBuf.writeInt(8);
byteBuf.writeLong(index);

ctx.writeAndFlush(byteBuf);
}
}
38 changes: 38 additions & 0 deletions src/main/java/KVDatabase/Server.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package KVDatabase;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* Created by jrj on 17-12-23.
*/
public class Server {
public Server(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,64).
childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDecoder());
ch.pipeline().addLast(new RequestHandler());
}
});

try {
b.bind(30303).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
Server server = new Server();
}
}
7 changes: 5 additions & 2 deletions src/main/java/state/Leader.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,11 @@ public void processAppendRpcResult(RspList rspList){
}

private void checkMatchIndexAndCommitForMajority(long remoteIndex){
for(;lastApplied<=remoteIndex;lastApplied++){
System.out.println("now execute " + lastApplied);
for(long i = lastApplied+1;i<=remoteIndex;i++){
System.out.println("now execute " + i);
}
if (remoteIndex>lastApplied){
lastApplied = remoteIndex;
}
}

Expand Down

0 comments on commit 2907d13

Please sign in to comment.