From 227e06301c49be92dc301a78139ac82cb374f8b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?= Date: Thu, 22 Dec 2016 17:44:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0TCP/UDP=E7=BC=93=E5=86=B2?= =?UTF-8?q?=E5=8C=BA=E5=A4=A7=E5=B0=8F=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/reference.conf | 14 +++++ .../mpush/client/gateway/GatewayClient.java | 12 +++++ .../client/gateway/GatewayUDPConnector.java | 5 +- .../mpush/core/server/ConnectionServer.java | 6 ++- .../com/mpush/core/server/GatewayServer.java | 6 ++- .../core/server/GatewayUDPConnector.java | 6 ++- .../mpush/netty/client/NettyTCPClient.java | 8 ++- .../netty/connection/NettyConnection.java | 4 +- .../mpush/test/push/PushClientTestMain2.java | 51 +++++++++++-------- .../main/java/com/mpush/tools/config/CC.java | 14 +++++ 10 files changed, 94 insertions(+), 32 deletions(-) diff --git a/conf/reference.conf b/conf/reference.conf index 3dad203f..a983a751 100644 --- a/conf/reference.conf +++ b/conf/reference.conf @@ -47,10 +47,24 @@ mp { gateway-server-net=udp //网关服务使用的网络类型tcp/udp gateway-server-multicast="239.239.239.88" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效 gateway-client-multicast="239.239.239.99" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效 + + snd_buf { //tcp/udp 发送缓冲区大小 + connect-server=32k + gateway-server=0 + gateway-client=0 //0表示使用操作系统默认值 + } + + rcv_buf { //tcp/udp 接收缓冲区大小 + connect-server=32k + gateway-server=0 + gateway-client=0 //0表示使用操作系统默认值 + } + public-host-mapping { //本机局域网IP和公网IP的映射关系 //"10.0.10.156":"111.1.32.137" //"10.0.10.166":"111.1.33.138" } + traffic-shaping { //流量整形配置 gateway-client { enabled:false diff --git a/mpush-client/src/main/java/com/mpush/client/gateway/GatewayClient.java b/mpush-client/src/main/java/com/mpush/client/gateway/GatewayClient.java index cefc1f6f..ecb96af1 100644 --- a/mpush-client/src/main/java/com/mpush/client/gateway/GatewayClient.java +++ b/mpush-client/src/main/java/com/mpush/client/gateway/GatewayClient.java @@ -27,8 +27,13 @@ import com.mpush.client.gateway.handler.GatewayOKHandler; import com.mpush.common.MessageDispatcher; import com.mpush.netty.client.NettyTCPClient; +import com.mpush.tools.config.CC; +import com.mpush.tools.config.CC.mp.net.rcv_buf; +import com.mpush.tools.config.CC.mp.net.snd_buf; import com.mpush.tools.thread.NamedPoolThreadFactory; +import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler; @@ -89,4 +94,11 @@ protected void doStop(Listener listener) throws Throwable { } super.doStop(listener); } + + @Override + protected void initOptions(Bootstrap b) { + super.initOptions(b); + if (snd_buf.gateway_client > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_client); + if (rcv_buf.gateway_client > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_client); + } } diff --git a/mpush-client/src/main/java/com/mpush/client/gateway/GatewayUDPConnector.java b/mpush-client/src/main/java/com/mpush/client/gateway/GatewayUDPConnector.java index a194af7d..1a68c632 100644 --- a/mpush-client/src/main/java/com/mpush/client/gateway/GatewayUDPConnector.java +++ b/mpush-client/src/main/java/com/mpush/client/gateway/GatewayUDPConnector.java @@ -29,6 +29,8 @@ import com.mpush.netty.udp.NettyUDPConnector; import com.mpush.tools.Utils; import com.mpush.tools.config.CC; +import com.mpush.tools.config.CC.mp.net.rcv_buf; +import com.mpush.tools.config.CC.mp.net.snd_buf; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; @@ -84,7 +86,8 @@ protected void initOptions(Bootstrap b) { super.initOptions(b); b.option(ChannelOption.IP_MULTICAST_LOOP_DISABLED, true); b.option(ChannelOption.IP_MULTICAST_TTL, 255); - //b.option(ChannelOption.SO_SNDBUF, 32*1024); + if (snd_buf.gateway_client > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_client); + if (rcv_buf.gateway_client > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_client); } @Override diff --git a/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java b/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java index 4260cf30..9108037d 100644 --- a/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java +++ b/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java @@ -28,6 +28,8 @@ import com.mpush.core.handler.*; import com.mpush.netty.server.NettyTCPServer; import com.mpush.tools.config.CC; +import com.mpush.tools.config.CC.mp.net.rcv_buf; +import com.mpush.tools.config.CC.mp.net.snd_buf; import com.mpush.tools.thread.NamedPoolThreadFactory; import com.mpush.tools.thread.ThreadNames; import com.mpush.tools.thread.pool.ThreadPoolManager; @@ -153,8 +155,8 @@ protected void initOptions(ServerBootstrap b) { * 在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF, * 需要根据推送消息的大小,合理设置,对于海量长连接,通常32K是个不错的选择。 */ - b.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); - b.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); + if (snd_buf.connect_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.connect_server); + if (rcv_buf.connect_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.connect_server); /** * 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。 diff --git a/mpush-core/src/main/java/com/mpush/core/server/GatewayServer.java b/mpush-core/src/main/java/com/mpush/core/server/GatewayServer.java index 3c8e16c7..5e73f0eb 100644 --- a/mpush-core/src/main/java/com/mpush/core/server/GatewayServer.java +++ b/mpush-core/src/main/java/com/mpush/core/server/GatewayServer.java @@ -25,6 +25,8 @@ import com.mpush.core.handler.GatewayPushHandler; import com.mpush.netty.server.NettyTCPServer; import com.mpush.tools.config.CC; +import com.mpush.tools.config.CC.mp.net.rcv_buf; +import com.mpush.tools.config.CC.mp.net.snd_buf; import com.mpush.tools.thread.NamedPoolThreadFactory; import com.mpush.tools.thread.ThreadNames; import io.netty.bootstrap.ServerBootstrap; @@ -119,8 +121,8 @@ protected void initPipeline(ChannelPipeline pipeline) { @Override protected void initOptions(ServerBootstrap b) { super.initOptions(b); - //b.childOption(ChannelOption.SO_SNDBUF, 64 * 1024); - //b.childOption(ChannelOption.SO_RCVBUF, 64 * 1024); + if (snd_buf.gateway_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.gateway_server); + if (rcv_buf.gateway_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.gateway_server); /** * 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。 * 异步化是提高吞吐量的一个很好的手段。但是,与异步相比,同步有天然的负反馈机制,也就是如果后端慢了,前面也会跟着慢起来,可以自动的调节。 diff --git a/mpush-core/src/main/java/com/mpush/core/server/GatewayUDPConnector.java b/mpush-core/src/main/java/com/mpush/core/server/GatewayUDPConnector.java index 1ca5f830..96feaae1 100644 --- a/mpush-core/src/main/java/com/mpush/core/server/GatewayUDPConnector.java +++ b/mpush-core/src/main/java/com/mpush/core/server/GatewayUDPConnector.java @@ -29,6 +29,8 @@ import com.mpush.netty.udp.NettyUDPConnector; import com.mpush.tools.Utils; import com.mpush.tools.config.CC; +import com.mpush.tools.config.CC.mp.net.rcv_buf; +import com.mpush.tools.config.CC.mp.net.snd_buf; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; @@ -79,8 +81,8 @@ protected void initOptions(Bootstrap b) { b.option(ChannelOption.IP_MULTICAST_TTL, 255);//选项IP_MULTICAST_TTL允许设置超时TTL,范围为0~255之间的任何值 //b.option(ChannelOption.IP_MULTICAST_IF, null);//选项IP_MULTICAST_IF用于设置组播的默认网络接口,会从给定的网络接口发送,另一个网络接口会忽略此数据,参数addr是希望多播输出接口的IP地址,使用INADDR_ANY地址回送到默认接口。 //b.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 1024 * 1024)); - //b.option(ChannelOption.SO_RCVBUF, 64 * 1024); - + if (snd_buf.gateway_server > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_server); + if (rcv_buf.gateway_server > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_server); } @Override diff --git a/mpush-netty/src/main/java/com/mpush/netty/client/NettyTCPClient.java b/mpush-netty/src/main/java/com/mpush/netty/client/NettyTCPClient.java index ee16175f..11d5ce97 100644 --- a/mpush-netty/src/main/java/com/mpush/netty/client/NettyTCPClient.java +++ b/mpush-netty/src/main/java/com/mpush/netty/client/NettyTCPClient.java @@ -29,6 +29,7 @@ import com.mpush.tools.thread.ThreadNames; import com.mpush.tools.thread.pool.ThreadPoolManager; import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -65,7 +66,6 @@ private void createClient(Listener listener, EventLoopGroup workerGroup, Class() { // (4) @Override @@ -73,7 +73,7 @@ public void initChannel(SocketChannel ch) throws Exception { initPipeline(ch.pipeline()); } }); - + initOptions(b); ChannelFuture future = b.connect(new InetSocketAddress(host, port)); future.addListener(f -> { if (f.isSuccess()) { @@ -152,6 +152,10 @@ protected void doStop(Listener listener) throws Throwable { listener.onSuccess(); } + protected void initOptions(Bootstrap b) { + b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000); + } + public String getHost() { return host; } diff --git a/mpush-netty/src/main/java/com/mpush/netty/connection/NettyConnection.java b/mpush-netty/src/main/java/com/mpush/netty/connection/NettyConnection.java index a648473b..86edde22 100644 --- a/mpush-netty/src/main/java/com/mpush/netty/connection/NettyConnection.java +++ b/mpush-netty/src/main/java/com/mpush/netty/connection/NettyConnection.java @@ -99,11 +99,11 @@ public ChannelFuture send(Packet packet, final ChannelFutureListener listener) { //return channel.newPromise().setFailure(new RuntimeException("send data too busy")); return future.awaitUninterruptibly(); } else { - if (listener != null) { + /*if (listener != null) { channel.newPromise() .addListener(listener) .setFailure(new RuntimeException("connection is disconnected")); - } + }*/ return this.close(); } } diff --git a/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain2.java b/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain2.java index 8706d009..565df8d5 100644 --- a/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain2.java +++ b/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain2.java @@ -48,10 +48,12 @@ public void testPush() throws Exception { PushSender sender = PushSender.create(); sender.start().join(); Thread.sleep(1000); + + Statistics statistics = new Statistics(); - FlowControl flowControl = new GlobalFlowControl(3000, Integer.MAX_VALUE, 1000);// qps=1000 + FlowControl flowControl = new GlobalFlowControl(100000, Integer.MAX_VALUE, 1000);// qps=1000 - ScheduledExecutorService service = Executors.newScheduledThreadPool(2); + ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(4); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { System.out.println("time=" + LocalTime.now() + ", flowControl=" + flowControl.report() @@ -60,8 +62,25 @@ public void testPush() throws Exception { }, 1, 1, TimeUnit.SECONDS); for (int k = 0; k < 100; k++) { - for (int i = 0; i < 1000; i++) { - service.execute(new PushTask(sender, i, service, flowControl, statistics)); + for (int i = 0; i < 1; i++) { + + while (service.getQueue().size() > 1000) Thread.sleep(1); // 防止内存溢出 + + PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push."); + msg.setMsgId("msgId_" + i); + + PushContext context = PushContext.build(msg) + .setAckModel(AckModel.NO_ACK) + .setUserId("user-" + i) + .setBroadcast(false) + .setTimeout(60000) + .setCallback(new PushCallback() { + @Override + public void onResult(PushResult result) { + statistics.add(result.resultCode); + } + }); + service.execute(new PushTask(sender, context, service, flowControl, statistics)); } } @@ -70,14 +89,18 @@ public void testPush() throws Exception { private static class PushTask implements Runnable { PushSender sender; - private int i; FlowControl flowControl; Statistics statistics; ScheduledExecutorService executor; + PushContext context; - public PushTask(PushSender sender, int i, ScheduledExecutorService executor, FlowControl flowControl, Statistics statistics) { + public PushTask(PushSender sender, + PushContext context, + ScheduledExecutorService executor, + FlowControl flowControl, + Statistics statistics) { this.sender = sender; - this.i = i; + this.context = context; this.flowControl = flowControl; this.executor = executor; this.statistics = statistics; @@ -86,20 +109,6 @@ public PushTask(PushSender sender, int i, ScheduledExecutorService executor, Flo @Override public void run() { if (flowControl.checkQps()) { - PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push."); - msg.setMsgId("msgId_" + i); - - PushContext context = PushContext.build(msg) - .setAckModel(AckModel.NO_ACK) - .setUserId("user-" + i) - .setBroadcast(false) - .setTimeout(60000) - .setCallback(new PushCallback() { - @Override - public void onResult(PushResult result) { - statistics.add(result.resultCode); - } - }); FutureTask future = sender.send(context); } else { executor.schedule(this, flowControl.getRemaining(), TimeUnit.NANOSECONDS); diff --git a/mpush-tools/src/main/java/com/mpush/tools/config/CC.java b/mpush-tools/src/main/java/com/mpush/tools/config/CC.java index c9c0c14c..b7cdd85e 100644 --- a/mpush-tools/src/main/java/com/mpush/tools/config/CC.java +++ b/mpush-tools/src/main/java/com/mpush/tools/config/CC.java @@ -109,6 +109,20 @@ static String getString(String localIp) { } } + interface snd_buf { + Config cfg = net.cfg.getObject("snd_buf").toConfig(); + int connect_server = (int) cfg.getMemorySize("connect-server").toBytes(); + int gateway_server = (int) cfg.getMemorySize("gateway-server").toBytes(); + int gateway_client = (int) cfg.getMemorySize("gateway-client").toBytes(); + } + + interface rcv_buf { + Config cfg = net.cfg.getObject("rcv_buf").toConfig(); + int connect_server = (int) cfg.getMemorySize("connect-server").toBytes(); + int gateway_server = (int) cfg.getMemorySize("gateway-server").toBytes(); + int gateway_client = (int) cfg.getMemorySize("gateway-client").toBytes(); + } + interface traffic_shaping { Config cfg = net.cfg.getObject("traffic-shaping").toConfig();