Skip to content

Commit

Permalink
整个系统的基本功能已经实现,完善一些细节
Browse files Browse the repository at this point in the history
  • Loading branch information
jrjsjtu committed Feb 6, 2018
1 parent ac65061 commit 528ab08
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 81 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>

<build>
Expand Down
23 changes: 16 additions & 7 deletions src/main/java/EasyRaft/ServerIO/RaftKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ public void executeAfterCommit(String log) {

private static void processSayHello(final ChannelHandlerContext ctx,final String request){
leaderWatcher.add(ctx);
strCtxMap.put(request,ctx);
ctxStrMap.put(ctx,request);
//只有是leader才发
if(stateManager.isLeader()){
if(request.equals(currentLeader.address)){
Expand All @@ -213,11 +211,19 @@ private static void processSayHello(final ChannelHandlerContext ctx,final String
ctx.writeAndFlush(byteBuf);
return;
}
for(String tmp:aliveList){
ChannelHandlerContext tmpCTX = strCtxMap.get(tmp);
if(!leaderWatcher.contains(tmpCTX)){
ByteBuf byteBuf = getInfoWrapped(tmp,NotifyMemberDown);
currentLeader.ctx.writeAndFlush(byteBuf);
if(currentLeader.ctx == ctx){
for(String tmp:aliveList){
ChannelHandlerContext tmpCTX = strCtxMap.get(tmp);
//这里可能因为aliveList中加入了ctx,而ctxStrMap,strCtxMap没有来得及加入ctx导致错误的memberDown
if(!leaderWatcher.contains(tmpCTX)){
if(tmpCTX == null){
System.out.println("null tmpCtx");
}
System.out.println("tmp is " + tmp);
System.out.println("request is " + request);
ByteBuf byteBuf = getInfoWrapped(tmp,NotifyMemberDown);
currentLeader.ctx.writeAndFlush(byteBuf);
}
}
}
}
Expand Down Expand Up @@ -285,6 +291,9 @@ public void run() {
String[] info = request.split("\\|");
//ctxStrMap.put(ctx,info[1]);
aliveList.add(info[1]);
System.out.println("now join cluster named"+info[1]);
strCtxMap.put(info[1],ctx);
ctxStrMap.put(ctx,info[1]);
if (currentLeader.ctx!= null){
//通知leader有人上线
ByteBuf byteBuf =getInfoWrapped(info[1], NotifyMemberUp);
Expand Down
26 changes: 20 additions & 6 deletions src/main/java/EasyRaft/client/CtxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import EasyRaft.client.callBack.RaftCallBack;
import EasyRaft.requests.JoinClusterRequest;
import EasyRaft.requests.QuerySlotRequest;
import EasyRaft.requests.RaftRequest;
import EasyRaft.requests.SelectLeaderRequest;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -40,6 +41,10 @@ public CtxProxy(String hostIp,String info){

}

public CtxProxy(){
group = new NioEventLoopGroup(1);
}

public void addCtx(RaftClient ctx){
synchronized (successClient){
successClient.add(ctx);
Expand All @@ -64,21 +69,30 @@ public void tryToConnect(String ipAddress,int port) throws Exception{
final RaftCallBack raftCallBack = callBackClass.newInstance();
final RaftClient raftClient = new RaftClient(raftCallBack);
raftClients.add(raftClient);
raftClient.joinRaft(group,ipAddress,port,appendInfo,this);
raftClient.joinRaft(group,ipAddress,port,appendInfo,this,true);
//raftClient.electLeader(stringBuilder.toString());
//raftClient.joinCLuster(stringBuilder.toString());
}

public void electLeader(){
int epoch = RaftClient.getEpoch()+1;
public void connectWithoutHeartbeat(String ipAddress,int port) throws Exception{
final RaftClient raftClient = new RaftClient();
raftClients.add(raftClient);
raftClient.joinRaft(group,ipAddress,port,appendInfo,this,false);
//raftClient.electLeader(stringBuilder.toString());
//raftClient.joinCLuster(stringBuilder.toString());
}


public ArrayList<String> querySlot(){
int requestIdx = RaftClient.requestOrder.getAndIncrement();
SelectLeaderRequest selectLeaderRequest = new SelectLeaderRequest(requestIdx,epoch,appendInfo);
QuerySlotRequest querySlotRequest = new QuerySlotRequest(requestIdx);
synchronized (successClient){
for(RaftClient raftClient:successClient){
raftClient.sendRequest(selectLeaderRequest,requestIdx);
raftClient.sendRequest(querySlotRequest,requestIdx);
}
}
selectLeaderRequest.waitForResponse();
querySlotRequest.waitForResponse();
return querySlotRequest.getResult();
}

public void joinCluster(){
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/EasyRaft/client/RaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,26 @@ public void onMemberLeaveWhenLeader(String address) {
String ipAddress;
int port;
CtxProxy ctxProx;
boolean needHeartbeat;

public boolean getHeartbeat(){
return needHeartbeat;
}
public void joinRaft() throws Exception{
joinRaft(theGroup,ipAddress,port,leaderInfo,ctxProx);
joinRaft(theGroup,ipAddress,port,leaderInfo,ctxProx,needHeartbeat);
}
public void joinRaft(EventLoopGroup theGroup,String ipAddress, int port,String appendInfo, final CtxProxy ctxProxy) throws Exception{
public void joinRaft(EventLoopGroup theGroup,String ipAddress, int port,String appendInfo, final CtxProxy ctxProxy,final boolean needHeartbeat) throws Exception{
setLeaderInfo(appendInfo);
this.theGroup = theGroup;this.ipAddress = ipAddress;this.port = port;this.ctxProx = ctxProxy;
this.theGroup = theGroup;this.ipAddress = ipAddress;this.port = port;this.ctxProx = ctxProxy;this.needHeartbeat = needHeartbeat;
Bootstrap b = new Bootstrap();
b.group(theGroup).channel(NioSocketChannel.class).
remoteAddress(new InetSocketAddress(ipAddress, port)).
handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
localRaftClient.ch = ch;
ch.pipeline().addLast(new IdleStateHandler(0,0,1));
if(needHeartbeat){
ch.pipeline().addLast(new IdleStateHandler(0,0,1));
}
ch.pipeline().addLast(new RequestDecoder(localRaftClient,raftCallBack,ctxProxy));
}
}).option(ChannelOption.TCP_NODELAY,true);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/EasyRaft/client/RequestDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ private void processSetSlot(byte[] bytes){

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
raftClient.electLeader(0);
if (raftClient.getHeartbeat()){
raftClient.electLeader(0);
}
ctxProxy.addCtx(raftClient);
this.ctx = ctx;
}
Expand Down
17 changes: 3 additions & 14 deletions src/main/java/EasyRaft/worker/config.xml
Original file line number Diff line number Diff line change
@@ -1,15 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<hosts>
<host>
<ip>192.168.1.22</ip>
</host>
<clusterSize>
5
</clusterSize>
<leaderPort>
<port>30303</port>
</leaderPort>
<persistencyPath>

</persistencyPath>
</hosts>
<host>
<ip>127.0.0.1:50000</ip>
</host>
45 changes: 45 additions & 0 deletions src/main/java/KV/Client/ClientConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package KV.Client;

import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;

/**
* Created by jrj on 18-2-6.
*/
public class ClientConfig {
ArrayList<String> ipPortList = new ArrayList<String>();
ClientConfig(String filePath) throws Exception{
SAXReader reader = new SAXReader();
Document document = reader.read(new File(filePath));
Element root = document.getRootElement();
initMap(root);
}

private void initMap(Element node){
Element element = node.element("host");
Iterator<Element> iterator = element.elementIterator();
while(iterator.hasNext()){
Element elementInside = iterator.next();
ipPortList.add(elementInside.getText());
}
}

public ArrayList<String> getIpPortList(){
return ipPortList;
}

public static void main(String[] args){
try {
ClientConfig clientConfig = new ClientConfig("/home/jrj/Desktop/idea-IU-163.12024.16/java_learn/EasyRaft/src/main/java/KV/Client/config.xml");
} catch (Exception e) {
e.printStackTrace();
}
}

}
50 changes: 35 additions & 15 deletions src/main/java/KV/Client/KVClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package KV.Client;

import EasyRaft.client.CtxProxy;
import EasyRaft.client.RaftClient;
import EasyRaft.client.callBack.RaftCallBack;

Expand All @@ -11,10 +12,32 @@
* Created by jrj on 17-12-24.
*/
public class KVClient {
RaftClient raftClient;
CtxProxy ctxProxy;
KVChannel kvChannel;
AtomicInteger atomicInteger = new AtomicInteger(0);
KVClient(){
KVClient(ClientConfig clientConfig){
try {
ctxProxy = new CtxProxy();
ArrayList<String> raftCluster = clientConfig.getIpPortList();
for(String tmp:raftCluster){
String[] info = tmp.split(":");
ctxProxy.connectWithoutHeartbeat(info[0],Integer.parseInt(info[1]));
}
ArrayList<String> arrayList = ctxProxy.querySlot();

try {
kvChannel = new KVChannel(2);
for(String tmp:arrayList){
String[] info = tmp.split(":");
kvChannel.connectServer(info[0],Integer.parseInt(info[1]),Integer.parseInt(info[2]));
}
kvChannel.waitForConnection();
} catch (Exception e) {
e.printStackTrace();
}

} catch (Exception e) {
e.printStackTrace();
}
/*
raftClient = new RaftClient();
raftClient.joinRaft();
Expand All @@ -40,25 +63,22 @@ public String get(String key) throws Exception{
return kvChannel.get(key);
}
public static void main(String[] args){
KVClient kvClient = new KVClient();
long start = System.currentTimeMillis();

ClientConfig clientConfig = null;
try {
kvClient.put("aaa1","ccc");
System.out.println(kvClient.get("aaa1"));
clientConfig = new ClientConfig("/home/jrj/Desktop/idea-IU-163.12024.16/java_learn/EasyRaft/src/main/java/KV/Client/config.xml");
} catch (Exception e) {
e.printStackTrace();
}
/*

KVClient kvClient = new KVClient(clientConfig);
long start = System.currentTimeMillis();
try {
for (long i=0;i<150000;i++){
kvClient.put("aaa","bbb");
kvClient.put("aaa1","bbb");
}
}catch (Exception e){
kvClient.put("aaa1","ccc");
System.out.println(kvClient.get("aaa1"));
} catch (Exception e) {
e.printStackTrace();
}
*/

System.out.println("finish uses " + (System.currentTimeMillis() -start));
}
}
7 changes: 7 additions & 0 deletions src/main/java/KV/Client/config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<hosts>
<host>
<ip>127.0.0.1:50000</ip>
<ip>127.0.0.1:50001</ip>
<ip>127.0.0.1:50002</ip>
</host>
</hosts>
38 changes: 16 additions & 22 deletions src/main/java/KV/KVDatabase/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,31 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
}
public static void main(String[] args){
int serverPort = 30000;
int shard = 0;

String appendInfo = serverPort + ":" + shard;
CtxProxy ctxProxy = new CtxProxy("127.0.0.1", appendInfo);
ctxProxy.setCallBackClass(KVServerCallBack.class);

ServerConfig serverConfig = null;
try {
ctxProxy.tryToConnect("127.0.0.1",50000);
serverConfig = new ServerConfig("//home/jrj/Desktop/idea-IU-163.12024.16/java_learn/EasyRaft/src/main/java/KV/KVDatabase/config.xml");
} catch (Exception e) {
System.out.println("127.0.0.1:50000 failed");
e.printStackTrace();
}

try {
ctxProxy.tryToConnect("127.0.0.1",50001);
} catch (Exception e) {
System.out.println("127.0.0.1:50001 failed");
}
String appendInfo = serverConfig.getserverPort() + ":" + serverConfig.getshard();
CtxProxy ctxProxy = new CtxProxy("127.0.0.1", appendInfo);
ctxProxy.setCallBackClass(KVServerCallBack.class);
ArrayList<String> strings = serverConfig.getIpPortList();

try {
ctxProxy.tryToConnect("127.0.0.1",50002);
} catch (Exception e) {
System.out.println("127.0.0.1:50002 failed");
for(String tmp:strings){
String[] infos = tmp.split(":");
try {
ctxProxy.tryToConnect(infos[0],Integer.parseInt(infos[1]));
} catch (Exception e) {
System.out.println(infos + " connect failed");
}
}
System.out.println("stuck");
//ctxProxy.electLeader();
//ctxProxy.joinCluster();

Server kvServer;

try{
kvServer = new Server(serverPort+shard);
kvServer = new Server(serverConfig.getserverPort());
}catch (Exception e){
System.out.println("?????");
e.printStackTrace();
Expand Down
Loading

0 comments on commit 528ab08

Please sign in to comment.