Skip to content

Commit

Permalink
remove redis
Browse files Browse the repository at this point in the history
  • Loading branch information
jin716 committed Jun 23, 2016
1 parent 4be36d0 commit 5b63eb8
Show file tree
Hide file tree
Showing 53 changed files with 2,562 additions and 529 deletions.
1 change: 1 addition & 0 deletions .idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,996 changes: 1,996 additions & 0 deletions .idea/workspace.xml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions a.data
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"jin*ling2asdfadfjlkasdjfzjkcv
1 change: 1 addition & 0 deletions draft
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
protoc ./Message.proto --java_out=./
28 changes: 28 additions & 0 deletions io/srcmain/resources/protobuf/Message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto3";
option java_package = "org.mchat.io.message";
option java_outer_classname = "MessageBuf";

message Message{

MessageType type=1;
int32 version=2;
int64 from = 3;
int64 to = 4;

enum MessageType{
TEXT = 0;
LOCATION = 1;
VOICE = 2;
IMAGE = 3;
ACK = 4;
ERROR = 5;
}


repeated bytes data=5;

int32 flag = 6;
int32 hash = 7;


}
22 changes: 22 additions & 0 deletions io/srctest/org/mchat/io/object/TestMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.mchat.io.object;
import com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mchat.io.message.MessageBuf;

import java.io.*;

/**
* Created by jingli on 16/5/25.
*/
public class TestMessage {








}
11 changes: 11 additions & 0 deletions io/src/main/java/a.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import java.math.BigDecimal;

/**
* Created by jingli on 16/6/18.
*/
public class a {

public void main(){
BigDecimal d = null;
}
}
5 changes: 2 additions & 3 deletions io/src/main/java/org/mchat/io/ChatUtil.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.mchat.io;

import static org.mchat.io.redis.RedisKeyNaming.*;

/**
* Created by jingli on 16/5/26.
Expand All @@ -16,10 +15,10 @@ public class ChatUtil {
* @return
*/
public static String getChatChannel(String from, String to){
return CHAT_CHANNEL_HEADER + (from.compareTo(to) > 0 ? from+to : to + from);
return null;
}

public static String getRecieverChannel(String to){
return CHAT_RECIEVER_CHANNEL_HEADER + to;
return null;
}
}
11 changes: 11 additions & 0 deletions io/src/main/java/org/mchat/io/a.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.mchat.io;

/**
* Created by jingli on 16/6/22.
*/
public class a {

public static void main(String[] args){
System.out.println(Long.MAX_VALUE);
}
}
59 changes: 59 additions & 0 deletions io/src/main/java/org/mchat/io/chatServer/BigPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.mchat.io.chatServer;

import io.netty.channel.Channel;
import org.mchat.io.chatServer.message.Message;
import org.mchat.util.StringUtil;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Created by jingli on 16/6/22.
*/
public class BigPool {

final LinkedBlockingQueue<Message> messageQueue;
final ConcurrentHashMap<String,Channel> hostChannelMap;
final ConcurrentHashMap<Long,Channel> localUserChannelMap;
final ConcurrentHashMap<Long,String> remoteUserHostMap;



public BigPool(){
this.messageQueue = new LinkedBlockingQueue();
hostChannelMap = new ConcurrentHashMap<String, Channel>();
localUserChannelMap = new ConcurrentHashMap<Long, Channel>();
remoteUserHostMap = new ConcurrentHashMap<Long, String>();
}

public Channel getLocalUserChannel(Long user){
return localUserChannelMap.get(user);
}

public Channel getRemoteUserChannel(Long user){
String hostKey = remoteUserHostMap.get(user);
if(hostKey!=null)
return hostChannelMap.get(hostKey);
return null;
}

public Channel getBackUpChannel(){
return null;
}

public LinkedBlockingQueue<Message> getMessageQueue(){ return this.messageQueue;}


public Channel getChannel(String host,int port){
String key = StringUtil.hostKey(host, port);
return hostChannelMap.get(key);
}

public Channel getChannel(String host, String port) {
String key = StringUtil.hostKey(host, port);
return hostChannelMap.get(key);
}



}
61 changes: 4 additions & 57 deletions io/src/main/java/org/mchat/io/chatServer/ChatHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,83 +2,30 @@

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.mchat.io.message.MessageBuf;
import org.mchat.io.message.MessageFlagCodeNaming;
import org.mchat.io.message.ServerMessageBuilder;
import org.mchat.io.redis.RedisService;
import static org.mchat.io.message.MessageFlagCodeNaming.*;
import org.mchat.io.chatServer.message.Message;
import static org.mchat.io.chatServer.message.MessageFlagCodeNaming.*;
/**
* Created by jingli on 16/5/26.
*/
public class ChatHandler extends ChannelHandlerAdapter {

RedisService service = new RedisService();
//RedisService service = new RedisService();

private static int RETRY_TIME = 3;
private static int RETRY_INTEVAL = 300;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if(msg instanceof MessageBuf.Message){
MessageBuf.Message message = (MessageBuf.Message) msg;
if(message.getType() == MessageBuf.Message.MessageType.ACK) {
//客户端确认上线消息
String channel = null; //TODO
service.subscribe(new ChatSubscribeListener(ctx), channel);
}
else send(ctx,message);//客户端发送信息
}

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
askClientOnline(ctx); //跟客户确认上线消息
}




private void send(ChannelHandlerContext ctx, MessageBuf.Message message){
Long reciever = 0L; //订阅者计数
int retry = 0 ; //重试计数
while (reciever == 0 && retry < RETRY_TIME ){//自动重试
reciever = service.speakTo(message);//发送到缓存
retry++;
try {
Thread.sleep(RETRY_INTEVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//重试也无法送达,返回服务错误给客户端
if(retry >= RETRY_TIME) {
tellClientServerError(ctx, message);
return;
}
//发送到缓存不会通知客户端,直到收到客户端确认回馈.
}

/**
* 跟客户确认上线消息
*/
private void askClientOnline(ChannelHandlerContext ctx){
MessageBuf.Message ask_online = ServerMessageBuilder
.buildServerAckMessage(SERVER_ASK_ACK);
ctx.writeAndFlush(ask_online);
}


/**
* 通知客户端,信息发送失败
*/
private void tellClientServerError(ChannelHandlerContext ctx, MessageBuf.Message message) {
System.out.println("重试过多:"+message.toString());
MessageBuf.Message error_reply = ServerMessageBuilder
.buildErrorMessage(message, SERVER_INNER_ERROR);
ctx.writeAndFlush(error_reply);
}




}

This file was deleted.

11 changes: 11 additions & 0 deletions io/src/main/java/org/mchat/io/chatServer/message/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.mchat.io.chatServer.message;

/**
* Created by jingli on 16/6/22.
*/
public interface Message {

public Long getFrom();
public Long getto();

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.mchat.io.message;
package org.mchat.io.chatServer.message;

/**
* Created by jingli on 16/5/28.
Expand Down
27 changes: 27 additions & 0 deletions io/src/main/java/org/mchat/io/chatServer/router/DefaultRouter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.mchat.io.chatServer.router;

import io.netty.channel.Channel;
import org.mchat.io.chatServer.message.Message;
import org.mchat.io.chatServer.BigPool;

/**
* Created by jingli on 16/6/23.
*/
public class DefaultRouter extends Router {

public DefaultRouter(BigPool pool) {
super(pool);
}

void forward(Message message){
long to = message.getto();
Channel channel = pool.getLocalUserChannel(to);
if(channel == null)
channel = pool.getRemoteUserChannel(to);
if(channel == null)
channel = pool.getBackUpChannel();
if(channel == null)
throw new RuntimeException("fetal error , no channel is available");
channel.writeAndFlush(message);
}
}
33 changes: 33 additions & 0 deletions io/src/main/java/org/mchat/io/chatServer/router/Router.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.mchat.io.chatServer.router;

import org.mchat.io.chatServer.message.Message;
import org.mchat.io.chatServer.BigPool;

import java.util.concurrent.Callable;

/**
* Created by jingli on 16/6/22.
*/
public abstract class Router implements Callable<java.lang.Boolean> {

final BigPool pool;
private boolean state;

public Router(BigPool pool){
this.pool = pool;
state = true;
}

@Override
public java.lang.Boolean call() throws Exception {
while(state){
Message message = pool.getMessageQueue().take();
if(message != null){
forward(message);
}
}
return true;
}

abstract void forward(Message message);
}
Loading

0 comments on commit 5b63eb8

Please sign in to comment.