Skip to content

Commit

Permalink
增加TCP/UDP缓冲区大小配置
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 22, 2016
1 parent aed0f53 commit 227e063
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 32 deletions.
14 changes: 14 additions & 0 deletions conf/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,24 @@ mp {
gateway-server-net=udp //网关服务使用的网络类型tcp/udp
gateway-server-multicast="239.239.239.88" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效
gateway-client-multicast="239.239.239.99" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效

snd_buf { //tcp/udp 发送缓冲区大小
connect-server=32k
gateway-server=0
gateway-client=0 //0表示使用操作系统默认值
}

rcv_buf { //tcp/udp 接收缓冲区大小
connect-server=32k
gateway-server=0
gateway-client=0 //0表示使用操作系统默认值
}

public-host-mapping { //本机局域网IP和公网IP的映射关系
//"10.0.10.156":"111.1.32.137"
//"10.0.10.166":"111.1.33.138"
}

traffic-shaping { //流量整形配置
gateway-client {
enabled:false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import com.mpush.client.gateway.handler.GatewayOKHandler;
import com.mpush.common.MessageDispatcher;
import com.mpush.netty.client.NettyTCPClient;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.CC.mp.net.rcv_buf;
import com.mpush.tools.config.CC.mp.net.snd_buf;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;

Expand Down Expand Up @@ -89,4 +94,11 @@ protected void doStop(Listener listener) throws Throwable {
}
super.doStop(listener);
}

@Override
protected void initOptions(Bootstrap b) {
super.initOptions(b);
if (snd_buf.gateway_client > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_client);
if (rcv_buf.gateway_client > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.mpush.netty.udp.NettyUDPConnector;
import com.mpush.tools.Utils;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.CC.mp.net.rcv_buf;
import com.mpush.tools.config.CC.mp.net.snd_buf;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -84,7 +86,8 @@ protected void initOptions(Bootstrap b) {
super.initOptions(b);
b.option(ChannelOption.IP_MULTICAST_LOOP_DISABLED, true);
b.option(ChannelOption.IP_MULTICAST_TTL, 255);
//b.option(ChannelOption.SO_SNDBUF, 32*1024);
if (snd_buf.gateway_client > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_client);
if (rcv_buf.gateway_client > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_client);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.mpush.core.handler.*;
import com.mpush.netty.server.NettyTCPServer;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.CC.mp.net.rcv_buf;
import com.mpush.tools.config.CC.mp.net.snd_buf;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import com.mpush.tools.thread.ThreadNames;
import com.mpush.tools.thread.pool.ThreadPoolManager;
Expand Down Expand Up @@ -153,8 +155,8 @@ protected void initOptions(ServerBootstrap b) {
* 在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF,
* 需要根据推送消息的大小,合理设置,对于海量长连接,通常32K是个不错的选择。
*/
b.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
b.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
if (snd_buf.connect_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.connect_server);
if (rcv_buf.connect_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.connect_server);

/**
* 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.mpush.core.handler.GatewayPushHandler;
import com.mpush.netty.server.NettyTCPServer;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.CC.mp.net.rcv_buf;
import com.mpush.tools.config.CC.mp.net.snd_buf;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import com.mpush.tools.thread.ThreadNames;
import io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -119,8 +121,8 @@ protected void initPipeline(ChannelPipeline pipeline) {
@Override
protected void initOptions(ServerBootstrap b) {
super.initOptions(b);
//b.childOption(ChannelOption.SO_SNDBUF, 64 * 1024);
//b.childOption(ChannelOption.SO_RCVBUF, 64 * 1024);
if (snd_buf.gateway_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.gateway_server);
if (rcv_buf.gateway_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.gateway_server);
/**
* 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。
* 异步化是提高吞吐量的一个很好的手段。但是,与异步相比,同步有天然的负反馈机制,也就是如果后端慢了,前面也会跟着慢起来,可以自动的调节。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.mpush.netty.udp.NettyUDPConnector;
import com.mpush.tools.Utils;
import com.mpush.tools.config.CC;
import com.mpush.tools.config.CC.mp.net.rcv_buf;
import com.mpush.tools.config.CC.mp.net.snd_buf;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -79,8 +81,8 @@ protected void initOptions(Bootstrap b) {
b.option(ChannelOption.IP_MULTICAST_TTL, 255);//选项IP_MULTICAST_TTL允许设置超时TTL,范围为0~255之间的任何值
//b.option(ChannelOption.IP_MULTICAST_IF, null);//选项IP_MULTICAST_IF用于设置组播的默认网络接口,会从给定的网络接口发送,另一个网络接口会忽略此数据,参数addr是希望多播输出接口的IP地址,使用INADDR_ANY地址回送到默认接口。
//b.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 1024 * 1024));
//b.option(ChannelOption.SO_RCVBUF, 64 * 1024);

if (snd_buf.gateway_server > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_server);
if (rcv_buf.gateway_server > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_server);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.mpush.tools.thread.ThreadNames;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
Expand Down Expand Up @@ -65,15 +66,14 @@ private void createClient(Listener listener, EventLoopGroup workerGroup, Class<?
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_REUSEADDR, true)//
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000)
.channel(clazz);
b.handler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
initPipeline(ch.pipeline());
}
});

initOptions(b);
ChannelFuture future = b.connect(new InetSocketAddress(host, port));
future.addListener(f -> {
if (f.isSuccess()) {
Expand Down Expand Up @@ -152,6 +152,10 @@ protected void doStop(Listener listener) throws Throwable {
listener.onSuccess();
}

protected void initOptions(Bootstrap b) {
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
}

public String getHost() {
return host;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public ChannelFuture send(Packet packet, final ChannelFutureListener listener) {
//return channel.newPromise().setFailure(new RuntimeException("send data too busy"));
return future.awaitUninterruptibly();
} else {
if (listener != null) {
/*if (listener != null) {
channel.newPromise()
.addListener(listener)
.setFailure(new RuntimeException("connection is disconnected"));
}
}*/
return this.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ public void testPush() throws Exception {
PushSender sender = PushSender.create();
sender.start().join();
Thread.sleep(1000);


Statistics statistics = new Statistics();
FlowControl flowControl = new GlobalFlowControl(3000, Integer.MAX_VALUE, 1000);// qps=1000
FlowControl flowControl = new GlobalFlowControl(100000, Integer.MAX_VALUE, 1000);// qps=1000

ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(4);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("time=" + LocalTime.now()
+ ", flowControl=" + flowControl.report()
Expand All @@ -60,8 +62,25 @@ public void testPush() throws Exception {
}, 1, 1, TimeUnit.SECONDS);

for (int k = 0; k < 100; k++) {
for (int i = 0; i < 1000; i++) {
service.execute(new PushTask(sender, i, service, flowControl, statistics));
for (int i = 0; i < 1; i++) {

while (service.getQueue().size() > 1000) Thread.sleep(1); // 防止内存溢出

PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push.");
msg.setMsgId("msgId_" + i);

PushContext context = PushContext.build(msg)
.setAckModel(AckModel.NO_ACK)
.setUserId("user-" + i)
.setBroadcast(false)
.setTimeout(60000)
.setCallback(new PushCallback() {
@Override
public void onResult(PushResult result) {
statistics.add(result.resultCode);
}
});
service.execute(new PushTask(sender, context, service, flowControl, statistics));
}
}

Expand All @@ -70,14 +89,18 @@ public void testPush() throws Exception {

private static class PushTask implements Runnable {
PushSender sender;
private int i;
FlowControl flowControl;
Statistics statistics;
ScheduledExecutorService executor;
PushContext context;

public PushTask(PushSender sender, int i, ScheduledExecutorService executor, FlowControl flowControl, Statistics statistics) {
public PushTask(PushSender sender,
PushContext context,
ScheduledExecutorService executor,
FlowControl flowControl,
Statistics statistics) {
this.sender = sender;
this.i = i;
this.context = context;
this.flowControl = flowControl;
this.executor = executor;
this.statistics = statistics;
Expand All @@ -86,20 +109,6 @@ public PushTask(PushSender sender, int i, ScheduledExecutorService executor, Flo
@Override
public void run() {
if (flowControl.checkQps()) {
PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push.");
msg.setMsgId("msgId_" + i);

PushContext context = PushContext.build(msg)
.setAckModel(AckModel.NO_ACK)
.setUserId("user-" + i)
.setBroadcast(false)
.setTimeout(60000)
.setCallback(new PushCallback() {
@Override
public void onResult(PushResult result) {
statistics.add(result.resultCode);
}
});
FutureTask<PushResult> future = sender.send(context);
} else {
executor.schedule(this, flowControl.getRemaining(), TimeUnit.NANOSECONDS);
Expand Down
14 changes: 14 additions & 0 deletions mpush-tools/src/main/java/com/mpush/tools/config/CC.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ static String getString(String localIp) {
}
}

interface snd_buf {
Config cfg = net.cfg.getObject("snd_buf").toConfig();
int connect_server = (int) cfg.getMemorySize("connect-server").toBytes();
int gateway_server = (int) cfg.getMemorySize("gateway-server").toBytes();
int gateway_client = (int) cfg.getMemorySize("gateway-client").toBytes();
}

interface rcv_buf {
Config cfg = net.cfg.getObject("rcv_buf").toConfig();
int connect_server = (int) cfg.getMemorySize("connect-server").toBytes();
int gateway_server = (int) cfg.getMemorySize("gateway-server").toBytes();
int gateway_client = (int) cfg.getMemorySize("gateway-client").toBytes();
}

interface traffic_shaping {
Config cfg = net.cfg.getObject("traffic-shaping").toConfig();

Expand Down

0 comments on commit 227e063

Please sign in to comment.