Skip to content

Commit

Permalink
压测代码优化,压测心跳优化
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 8, 2016
1 parent f0584d8 commit cad727b
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public interface Connection {

boolean isConnected();

boolean heartbeatTimeout();
boolean isReadTimeout();

boolean isWriteTimeout();

void updateLastReadTime();

long getLastReadTime();
void updateLastWriteTime();

Channel getChannel();

Expand Down
2 changes: 1 addition & 1 deletion mpush-boot/src/main/resources/mpush.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mp.log-level=${log.level}
mp.min-heartbeat=${min.hb}
mp.core.min-heartbeat=${min.hb}
mp.security.private-key="${rsa.privateKey}"
mp.security.public-key="${rsa.publicKey}"
mp.zk.server-address="127.0.0.1:2181"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,13 @@ public <T> void publish(String channel, T message) {
call(jedis -> ((MultiKeyCommands) jedis).publish(channel, message instanceof String ? (String) message : Jsons.toJson(message)));
}

public void subscribe(final JedisPubSub pubsub, final String... channels) {
ThreadPoolManager.I.newThread(Arrays.toString(channels),
public void subscribe(final JedisPubSub pubsub, final String channel) {
ThreadPoolManager.I.newThread(channel,
() -> call(jedis -> {
if (jedis instanceof MultiKeyCommands) {
Arrays.stream(channels).forEach(channel -> ((MultiKeyCommands) jedis).subscribe(pubsub, channel));
((MultiKeyCommands) jedis).subscribe(pubsub, channel);
} else if (jedis instanceof MultiKeyJedisClusterCommands) {
Arrays.stream(channels).forEach(channel -> ((MultiKeyJedisClusterCommands) jedis).subscribe(pubsub, channel));
((MultiKeyJedisClusterCommands) jedis).subscribe(pubsub, channel);
}
})
).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.concurrent.TimeUnit;


/**
* Created by ohun on 2015/12/19.
*
Expand All @@ -57,6 +60,7 @@ public final class ConnClientChannelHandler extends ChannelInboundHandlerAdapter
private final Connection connection = new NettyConnection();
private ClientConfig clientConfig;
private boolean perfTest;
private int hbTimeoutTimes;

public ConnClientChannelHandler() {
perfTest = true;
Expand All @@ -82,7 +86,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
HandshakeOkMessage message = new HandshakeOkMessage(packet, connection);
byte[] sessionKey = CipherBox.I.mixKey(clientConfig.getClientKey(), message.serverKey);
connection.getSessionContext().changeCipher(new AesCipher(sessionKey, clientConfig.getIv()));
startHeartBeat(message.heartbeat);
connection.getSessionContext().setHeartbeat(message.heartbeat);
startHeartBeat(message.heartbeat - 1000);
LOGGER.info(">>> handshake success, clientConfig={}, connectedNum={}", clientConfig, connectedNum);
bindUser(clientConfig);
if (!perfTest) {
Expand All @@ -97,7 +102,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
connection.getSessionContext().changeCipher(new AesCipher(key, iv));

FastConnectOkMessage message = new FastConnectOkMessage(packet, connection);
startHeartBeat(message.heartbeat);
connection.getSessionContext().setHeartbeat(message.heartbeat);
startHeartBeat(message.heartbeat - 1000);
bindUser(clientConfig);
LOGGER.info(">>> fast connect success, clientConfig={}, connectedNum={}", clientConfig, connectedNum);
} else if (command == Command.KICK) {
Expand Down Expand Up @@ -150,8 +156,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int clientNum = STATISTICS.clientNum.incrementAndGet();
LOGGER.info("client connect channel={}, clientNum={}", ctx.channel(), clientNum);
if (clientConfig == null) {
while (clientConfig == null) {
clientConfig = ctx.channel().attr(CONFIG_KEY).getAndRemove();
if (clientConfig == null) TimeUnit.SECONDS.sleep(1);
}
connection.init(ctx.channel(), true);
if (perfTest) {
Expand Down Expand Up @@ -212,6 +219,7 @@ private void bindUser(ClientConfig client) {
message.userId = client.getUserId();
message.tags = "test";
message.send();
connection.getSessionContext().setUserId(client.getUserId());
LOGGER.debug("<<< send bind user message={}", message);
}

Expand Down Expand Up @@ -247,22 +255,34 @@ private void startHeartBeat(final int heartbeat) throws Exception {
HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
final TimerTask self = this;
final Channel channel = connection.getChannel();
if (channel.isActive()) {
ChannelFuture channelFuture = channel.writeAndFlush(Packet.getHBPacket());
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.debug("<<< send heartbeat ping... " + channel.remoteAddress().toString());
} else {
LOGGER.warn("client send msg hb false:" + channel.remoteAddress().toString(), future.cause());
}
HASHED_WHEEL_TIMER.newTimeout(self, heartbeat, TimeUnit.MILLISECONDS);
});
} else {
LOGGER.error("connection was closed, connection={}", connection);
if (connection.isConnected() && healthCheck()) {
HASHED_WHEEL_TIMER.newTimeout(this, heartbeat, TimeUnit.MILLISECONDS);
}
}
}, heartbeat, TimeUnit.MILLISECONDS);
}

private boolean healthCheck() {

if (connection.isReadTimeout()) {
hbTimeoutTimes++;
LOGGER.warn("heartbeat timeout times={}, client={}", hbTimeoutTimes, connection);
} else {
hbTimeoutTimes = 0;
}

if (hbTimeoutTimes >= 2) {
LOGGER.warn("heartbeat timeout times={} over limit={}, client={}", hbTimeoutTimes, 2, connection);
hbTimeoutTimes = 0;
connection.close();
return false;
}

if (connection.isWriteTimeout()) {
LOGGER.info("<<< send heartbeat ping...");
connection.send(Packet.HB_PACKET);
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ public void run(Timeout timeout) throws Exception {
Logs.HB.info("connection was disconnected, heartbeat timeout times={}, connection={}", timeoutTimes, connection);
return;
}
if (connection.heartbeatTimeout()) {
if (connection.isReadTimeout()) {
if (++timeoutTimes > CC.mp.core.max_hb_timeout_times) {
connection.close();
Logs.HB.info("client heartbeat timeout times={}, do close connection={}", timeoutTimes, connection);
Logs.HB.warn("client heartbeat timeout times={}, do close connection={}", timeoutTimes, connection);
return;
} else {
Logs.HB.info("client heartbeat timeout times={}, connection={}", timeoutTimes, connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,18 @@ public boolean isConnected() {
}

@Override
public boolean heartbeatTimeout() {
long between = System.currentTimeMillis() - lastReadTime;
return context.heartbeat > 0 && between > context.heartbeat;
public boolean isReadTimeout() {
return System.currentTimeMillis() - lastReadTime > context.heartbeat + 1000;
}

@Override
public void updateLastReadTime() {
lastReadTime = System.currentTimeMillis();
public boolean isWriteTimeout() {
return System.currentTimeMillis() - lastWriteTime > context.heartbeat - 1000;
}

@Override
public long getLastReadTime() {
return lastReadTime;
public void updateLastReadTime() {
lastReadTime = System.currentTimeMillis();
}

@Override
Expand All @@ -137,11 +136,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}

@Override
public void updateLastWriteTime() {
lastWriteTime = System.currentTimeMillis();
}


@Override
public String toString() {
return "NettyConnection [context=" + context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.mpush.client.connect.ConnClientChannelHandler.CONFIG_KEY;

public final class ConnClientBoot extends BaseService {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnClientBoot.class);
Expand Down Expand Up @@ -105,9 +105,10 @@ public List<ZKServerNode> getServers() {

public ChannelFuture connect(String host, int port, ClientConfig clientConfig) {
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
if (future.channel() != null) future.channel().attr(CONFIG_KEY).set(clientConfig);
future.addListener(f -> {
if (f.isSuccess()) {
future.channel().attr(ConnClientChannelHandler.CONFIG_KEY).set(clientConfig);
future.channel().attr(CONFIG_KEY).set(clientConfig);
LOGGER.info("start netty client success, host={}, port={}", host, port);
} else {
LOGGER.error("start netty client failure, host={}, port={}", host, port, f.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.mpush.tools.log.Logs;
import com.mpush.zk.node.ZKServerNode;
import io.netty.channel.ChannelFuture;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.Test;

Expand All @@ -37,7 +36,7 @@
public class ConnClientTestMain {

public static void main(String[] args) throws Exception {
int count = 5000, printDelay = 1;
int count = 10, printDelay = 1;
boolean sync = true;
if (args.length > 0) {
count = NumberUtils.toInt(args[0], count);
Expand Down Expand Up @@ -70,9 +69,9 @@ private static void testConnClient(int count, int printDelay, boolean sync) thro
return;
}

Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> System.err.println(ConnClientChannelHandler.STATISTICS)
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
() -> System.err.println(ConnClientChannelHandler.STATISTICS)
, 3, printDelay, TimeUnit.SECONDS
);

Expand Down
2 changes: 1 addition & 1 deletion mpush-tools/src/main/java/com/mpush/tools/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static String getInetAddress(boolean getLocal) {
}
}
}
LOGGER.warn("getInetAddress is null");
LOGGER.debug("getInetAddress is null");
return getLocal ? "127.0.0.1" : null;
} catch (Throwable e) {
LOGGER.error("getInetAddress exception", e);
Expand Down

0 comments on commit cad727b

Please sign in to comment.