Skip to content

Commit

Permalink
merge dev
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 23, 2016
2 parents 8289c91 + 4e19454 commit a3245a1
Show file tree
Hide file tree
Showing 32 changed files with 519 additions and 134 deletions.
16 changes: 15 additions & 1 deletion conf/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,24 @@ mp {
gateway-client-multicast="239.239.239.99" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效
ws-server-port=0 //websocket对外端口, 公网端口, 0表示禁用websocket
ws-path="/" //websocket path

snd_buf { //tcp/udp 发送缓冲区大小
connect-server=32k
gateway-server=0
gateway-client=0 //0表示使用操作系统默认值
}

rcv_buf { //tcp/udp 接收缓冲区大小
connect-server=32k
gateway-server=0
gateway-client=0 //0表示使用操作系统默认值
}

public-host-mapping { //本机局域网IP和公网IP的映射关系
//"10.0.10.156":"111.1.32.137"
//"10.0.10.166":"111.1.33.138"
}

traffic-shaping { //流量整形配置
gateway-client {
enabled:false
Expand Down Expand Up @@ -186,7 +200,7 @@ mp {
dump-stack=false //是否定时dump堆栈
dump-period=1m //多久监控一次
print-log=true //是否打印监控日志
profile-enabled=true //开启性能监控
profile-enabled=false //开启性能监控
profile-slowly-duration=10ms //耗时超过10ms打印日志
}

Expand Down
2 changes: 2 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ public interface Constants {
byte[] EMPTY_BYTES = new byte[0];
String HTTP_HEAD_READ_TIMEOUT = "readTimeout";
String EMPTY_STRING = "";
String PARAM_PERSISTENT = "persistent";
String ANY_HOST = "0.0.0.0";
}
1 change: 1 addition & 0 deletions mpush-api/src/main/java/com/mpush/api/protocol/Packet.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public static void encodePacket(Packet packet, ByteBuf out) {
out.writeBytes(packet.body);
}
}
packet.body = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@

public final class Subscriber extends JedisPubSub {

private ListenerDispatcher dispatcher = ListenerDispatcher.I();

@Override
public void onMessage(String channel, String message) {
Logs.REDIS.info("onMessage:{},{}", channel, message);
dispatcher.onMessage(channel, message);
ListenerDispatcher.I().onMessage(channel, message);
super.onMessage(channel, message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import com.mpush.client.gateway.handler.GatewayOKHandler;
import com.mpush.common.MessageDispatcher;
import com.mpush.netty.client.NettyTCPClient;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.CC.mp.net.rcv_buf;
import com.mpush.tools.config.CC.mp.net.snd_buf;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;

Expand Down Expand Up @@ -89,4 +94,11 @@ protected void doStop(Listener listener) throws Throwable {
}
super.doStop(listener);
}

@Override
protected void initOptions(Bootstrap b) {
super.initOptions(b);
if (snd_buf.gateway_client > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_client);
if (rcv_buf.gateway_client > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.mpush.netty.udp.NettyUDPConnector;
import com.mpush.tools.Utils;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.CC.mp.net.rcv_buf;
import com.mpush.tools.config.CC.mp.net.snd_buf;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -84,6 +86,8 @@ protected void initOptions(Bootstrap b) {
super.initOptions(b);
b.option(ChannelOption.IP_MULTICAST_LOOP_DISABLED, true);
b.option(ChannelOption.IP_MULTICAST_TTL, 255);
if (snd_buf.gateway_client > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_client);
if (rcv_buf.gateway_client > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_client);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by ohun on 2015/12/28.
Expand All @@ -40,7 +41,7 @@
public abstract class BaseMessage implements Message {
private static final byte STATUS_DECODED = 1;
private static final byte STATUS_ENCODED = 2;
private static final LongAdder ID_SEQ = new LongAdder();
private static final AtomicInteger ID_SEQ = new AtomicInteger();
transient protected Packet packet;
transient protected Connection connection;
transient private byte status = 0;
Expand Down Expand Up @@ -101,6 +102,7 @@ private void decodeBinaryBody0() {
Profiler.enter("time cost on [body decode]");
decode(packet.body);
Profiler.release();
packet.body = null;// 释放内存
}

private void encodeBinaryBody0() {
Expand Down Expand Up @@ -198,8 +200,7 @@ public void close() {
}

protected static int genSessionId() {
ID_SEQ.increment();
return ID_SEQ.intValue();
return ID_SEQ.incrementAndGet();
}

public int getSessionId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.JsonPacket;
import com.mpush.api.protocol.Packet;
import io.netty.channel.ChannelFutureListener;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -92,6 +93,12 @@ public PushMessage setContent(byte[] content) {



@Override
public void send(ChannelFutureListener listener) {
super.send(listener);
this.content = null;//释放内存
}

@Override
public String toString() {
return "PushMessage{" +
Expand Down
4 changes: 2 additions & 2 deletions mpush-core/src/main/java/com/mpush/core/ack/AckCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @author [email protected] (夜色)
*/
public interface AckCallback {
void onSuccess(AckContext context);
void onSuccess(AckTask context);

void onTimeout(AckContext context);
void onTimeout(AckTask context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,58 @@

package com.mpush.core.ack;

import com.mpush.api.Message;

import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

/**
* Created by ohun on 16/9/5.
*
* @author [email protected] (夜色)
*/
public final class AckContext implements Runnable {
private AckCallback callback;
public final class AckTask implements Runnable {
final int ackMessageId;
private final Message message;

private int sessionId;
private Future<?> future;
private AckCallback callback;
private Future<?> timeoutFuture;

public AckContext() {
public AckTask(Message message, int ackMessageId) {
this.message = message;
this.ackMessageId = ackMessageId;
}

public void setSessionId(int sessionId) {
this.sessionId = sessionId;
public static AckTask from(Message message, int ackMessageId) {
return new AckTask(message, ackMessageId);
}

public void setFuture(Future<?> future) {
this.future = future;
this.timeoutFuture = future;
}

public ScheduledExecutorService getExecutor() {
return message.getConnection().getChannel().eventLoop();
}

public AckContext setCallback(AckCallback callback) {
public AckTask setCallback(AckCallback callback) {
this.callback = callback;
return this;
}

private boolean tryDone() {
return future.cancel(true);
return timeoutFuture.cancel(true);
}

public void success() {
public void onResponse() {
if (tryDone()) {
callback.onSuccess(this);
callback = null;
}
}

public void timeout() {
AckContext context = AckMessageQueue.I.getAndRemove(sessionId);
public void onTimeout() {
AckTask context = AckTaskQueue.I.getAndRemove(ackMessageId);
if (context != null && tryDone()) {
callback.onTimeout(this);
callback = null;
Expand All @@ -69,13 +79,13 @@ public void timeout() {

@Override
public String toString() {
return "AckContext{" +
", sessionId=" + sessionId +
return "{" +
", ackMessageId=" + ackMessageId +
'}';
}

@Override
public void run() {
timeout();
onTimeout();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,31 @@
*
* @author [email protected] (夜色)
*/
public final class AckMessageQueue extends BaseService {
private final Logger logger = LoggerFactory.getLogger(AckMessageQueue.class);
public final class AckTaskQueue extends BaseService {
private final Logger logger = LoggerFactory.getLogger(AckTaskQueue.class);

private static final int DEFAULT_TIMEOUT = 3000;
public static final AckMessageQueue I = new AckMessageQueue();
public static final AckTaskQueue I = new AckTaskQueue();

private final ConcurrentMap<Integer, AckContext> queue = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, AckTask> queue = new ConcurrentHashMap<>();
private ScheduledExecutorService scheduledExecutor;

private AckMessageQueue() {
private AckTaskQueue() {
}

public void add(int sessionId, AckContext context, int timeout) {
queue.put(sessionId, context);
context.setSessionId(sessionId);
context.setFuture(scheduledExecutor.schedule(context,
public void add(AckTask task, int timeout) {
queue.put(task.ackMessageId, task);

//使用 task.getExecutor() 并没更快
task.setFuture(scheduledExecutor.schedule(task,
timeout > 0 ? timeout : DEFAULT_TIMEOUT,
TimeUnit.MILLISECONDS
));

logger.debug("one ack task add to queue, task={}, timeout={}", task, timeout);
}

public AckContext getAndRemove(int sessionId) {
public AckTask getAndRemove(int sessionId) {
return queue.remove(sessionId);
}

Expand Down
15 changes: 6 additions & 9 deletions mpush-core/src/main/java/com/mpush/core/handler/AckHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import com.mpush.api.protocol.Packet;
import com.mpush.common.handler.BaseMessageHandler;
import com.mpush.common.message.AckMessage;
import com.mpush.common.message.OkMessage;
import com.mpush.core.ack.AckContext;
import com.mpush.core.ack.AckMessageQueue;
import com.mpush.core.ack.AckTask;
import com.mpush.core.ack.AckTaskQueue;
import com.mpush.tools.log.Logs;

import static com.mpush.api.protocol.Command.OK;

/**
* Created by ohun on 16/9/5.
*
Expand All @@ -44,12 +41,12 @@ public AckMessage decode(Packet packet, Connection connection) {

@Override
public void handle(AckMessage message) {
AckContext context = AckMessageQueue.I.getAndRemove(message.getSessionId());
if (context == null) {
Logs.PUSH.info("receive client ack, but timeout message={}", message);
AckTask task = AckTaskQueue.I.getAndRemove(message.getSessionId());
if (task == null) {//ack 超时了
Logs.PUSH.info("receive client ack, but task timeout message={}", message);
return;
}

context.success();
task.onResponse();//成功收到客户的ACK响应
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -54,6 +55,7 @@ public final class BroadcastPushTask implements PushTask, ChannelFutureListener

private final Condition condition;

//使用Iterator, 记录任务遍历到的位置,因为有流控,一次任务可能会被分批发送,而且还有在推送过程中上/下线的用户
private final Iterator<Map.Entry<String, Map<Integer, LocalRouter>>> iterator;

public BroadcastPushTask(GatewayPushMessage message, FlowControl flowControl) {
Expand All @@ -67,12 +69,12 @@ public BroadcastPushTask(GatewayPushMessage message, FlowControl flowControl) {
public void run() {
flowControl.reset();
boolean done = broadcast();
if (done) {//done
if (done) {//done 广播结束
if (finishTasks.addAndGet(flowControl.total()) == 0) {
report();
}
} else {//没有结束,就延时进行下次任务 TODO 考虑优先级问题
PushCenter.I.delayTask(flowControl.getRemaining(), this);
PushCenter.I.delayTask(flowControl.getDelay(), this);
}
flowControl.end();
}
Expand All @@ -94,6 +96,7 @@ private boolean broadcast() {
.build(connection)
.setContent(message.content)
.send(this);
//4. 检测qps, 是否超过流控限制,如果超过则结束当前循环直接进入catch
if (!flowControl.checkQps()) {
throw new OverFlowException(false);
}
Expand Down Expand Up @@ -143,4 +146,9 @@ public void operationComplete(ChannelFuture future) throws Exception {
report();
}
}

@Override
public ScheduledExecutorService getExecutor() {
return message.getConnection().getChannel().eventLoop();
}
}
Loading

0 comments on commit a3245a1

Please sign in to comment.