Skip to content

Commit

Permalink
KV服务器与客户端
Browse files Browse the repository at this point in the history
  • Loading branch information
jrjsjtu committed Jan 11, 2018
1 parent bfdb398 commit 7ebe6f3
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 26 deletions.
74 changes: 50 additions & 24 deletions src/main/java/EasyRaft/ServerIO/RaftKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import sun.nio.ch.Net;

import java.util.ArrayList;

Expand All @@ -14,15 +15,15 @@
public class RaftKeeper {
private static StateManager stateManager;


public static void setStateManager(StateManager stateManager0){
stateManager = stateManager0;
}
private static final char LeaderSelection = '0';
private static final char RegisterWatcher = '1';
private static final char AllocateSlot = '2';
private static final char RegisterMember = '3';
private static final char AppendLog = '4';
public static final char LeaderSelection = '0';
public static final char RegisterWatcher = '1';
public static final char AllocateSlot = '2';
public static final char RegisterMember = '3';
public static final char AppendLog = '4';
public static final char LeaveCluster = '5';

private static ArrayList<String> memberList;
private static ArrayList<ChannelHandlerContext> wathcerList;
Expand Down Expand Up @@ -53,7 +54,26 @@ public static String processRequest(ChannelHandlerContext ctx,ByteBuf byteBuf){
return null;
}

public static void processRegisterWatcher(ChannelHandlerContext ctx,String entry){
public static String processRequest(char c,ChannelHandlerContext ctx){
String address = ctx.channel().remoteAddress().toString();
switch (c){
case LeaveCluster:
/*
if(memberList.contains(address)){
memberList.remove(address);
System.out.println("remove memberList " + address);
}
*/
if(wathcerList.contains(ctx)){
wathcerList.remove(ctx);
System.out.println("remove ctx " + address);
}
break;
}
return null;
}

private static void processRegisterWatcher(ChannelHandlerContext ctx,String entry){
synchronized (wathcerList){
wathcerList.add(ctx);
ByteBuf byteBuf = Unpooled.buffer();
Expand All @@ -72,7 +92,7 @@ public static void processRegisterWatcher(ChannelHandlerContext ctx,String entry
}
}

public static void processRegisterMember(final ChannelHandlerContext ctx){
private static void processRegisterMember(final ChannelHandlerContext ctx){
String entryId = "add:" + ctx.pipeline().channel().remoteAddress().toString();
Runnable callBack = new Runnable() {
public void run() {
Expand Down Expand Up @@ -101,27 +121,33 @@ public void run() {
stateManager.commit(entryId,callBack);
}

public static void processLeaderInsertLog(ChannelHandlerContext ctx,String entry){
private static void processLeaderInsertLog(ChannelHandlerContext ctx,String entry){
stateManager.commit(entry,ctx);
}

public ByteBuf processRegisterWatcher(ChannelHandlerContext ctx){
return null;
}

public ByteBuf processAllocateSlot(ChannelHandlerContext ctx){
return null;
public static void initCheckThread(){
checkLeaderThread.start();
}


public static void main(String[] args){
try {
StateManager stateManager = new StateManager();
RaftKeeper.setStateManager(stateManager);
//raftKeeper.processLeaderSelection("test log");
} catch (Exception e) {
e.printStackTrace();
}

public static void setLeaderPort(int leaderPort1){
leaderPort = leaderPort1;
}
private static int leaderPort = -1;
private static Thread checkLeaderThread = new Thread(new Runnable() {
public void run() {
while(true){
try {
if (stateManager.isLeader() && leaderPort != -1){
//默认情况下,会阻塞在这个NetworkIO的初始化方法上.所以不用担心多次重复地初始化客户端.
//但要是出现端口没有及时释放或者,leader角色在集群中震荡,NetworkIO的初始化会报错,这时候就让程序等一秒.
NetworkIO networkIO = new NetworkIO(leaderPort);
}
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"checkLeaderThread");
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String address = ctx.channel().remoteAddress().toString();
RaftKeeper.processRequest(RaftKeeper.LeaveCluster,ctx);
}
}
4 changes: 3 additions & 1 deletion src/main/java/EasyRaft/worker/MainWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
public class MainWorker{
public static void main(String[] args){
try {
XMLReader xmlReader = new XMLReader("/home/jrj/Desktop/idea-IU-163.12024.16/java_learn/EasyRaft/src/main/java/EasyRaft/worker/config.xml");
StateManager stateManager = new StateManager();
RaftKeeper.setLeaderPort(xmlReader.getLeaderPort());
RaftKeeper.setStateManager(stateManager);
new NetworkIO(30304);
RaftKeeper.initCheckThread();
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/EasyRaft/worker/XMLReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package EasyRaft.worker;

import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

/**
* Created by jrj on 18-1-11.
*/
public class XMLReader {
private int leaderPort;
XMLReader(String path){
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();

try {
DocumentBuilder db = dbf.newDocumentBuilder();
Document document = db.parse(path);
NodeList hostList = document.getElementsByTagName("port");

//遍历每一个book节点
for (int i = 0; i < hostList.getLength(); i++) {
Node host = hostList.item(i);
leaderPort = Integer.parseInt(host.getFirstChild().getNodeValue());
}
} catch (Exception e) {
e.printStackTrace();
}
}

public int getLeaderPort(){
return leaderPort;
}
public static void main(String[] args){
new XMLReader("/home/jrj/Desktop/idea-IU-163.12024.16/java_learn/EasyRaft/src/main/java/EasyRaft/worker/config.xml");
}
}

12 changes: 12 additions & 0 deletions src/main/java/EasyRaft/worker/config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<hosts>
<host>
<ip>192.168.1.22</ip>
</host>
<clusterSize>
5
</clusterSize>
<leaderPort>
<port>30303</port>
</leaderPort>
</hosts>

0 comments on commit 7ebe6f3

Please sign in to comment.