Skip to content

Commit

Permalink
并发优化,不兼容以前的客户端
Browse files Browse the repository at this point in the history
  • Loading branch information
ffay committed Nov 1, 2017
1 parent 44e38a6 commit d247ab0
Show file tree
Hide file tree
Showing 16 changed files with 438 additions and 252 deletions.
127 changes: 68 additions & 59 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,59 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.fengfei</groupId>
<artifactId>lanproxy</artifactId>
<packaging>pom</packaging>
<version>0.1</version>
<name>lanproxy</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<modules>
<module>proxy-common</module>
<module>proxy-protocol</module>
<module>proxy-server</module>
<module>proxy-client</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.fengfei</groupId>
<artifactId>lanproxy</artifactId>
<packaging>pom</packaging>
<version>0.1</version>
<name>lanproxy</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<modules>
<module>proxy-common</module>
<module>proxy-protocol</module>
<module>proxy-server</module>
<module>proxy-client</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<nonFilteredFileExtensions>
<nonFilteredFileExtension>bat</nonFilteredFileExtension>
<nonFilteredFileExtension>sh</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.fengfei.lanproxy.client.listener.ProxyChannelBorrowListener;
import org.fengfei.lanproxy.common.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.util.AttributeKey;
Expand All @@ -24,32 +29,68 @@ public class ClientChannelMannager {

private static Logger logger = LoggerFactory.getLogger(ClientChannelMannager.class);

private static final AttributeKey<String> USER_ID = AttributeKey.newInstance("user_id");
private static final AttributeKey<Boolean> USER_CHANNEL_WRITEABLE = AttributeKey.newInstance("user_channel_writeable");

private static final AttributeKey<Boolean> USER_CHANNEL_WRITEABLE = AttributeKey
.newInstance("user_channel_writeable");
private static final AttributeKey<Boolean> CLIENT_CHANNEL_WRITEABLE = AttributeKey.newInstance("client_channel_writeable");

private static final AttributeKey<Boolean> CLIENT_CHANNEL_WRITEABLE = AttributeKey
.newInstance("client_channel_writeable");
private static final int MAX_POOL_SIZE = 100;

private static Map<String, Channel> realServerChannels = new ConcurrentHashMap<String, Channel>();

private static volatile Channel channel;
private static ConcurrentLinkedQueue<Channel> proxyChannelPool = new ConcurrentLinkedQueue<Channel>();

public static void setChannel(Channel channel) {
ClientChannelMannager.channel = channel;
private static volatile Channel cmdChannel;

private static Config config = Config.getInstance();

public static void borrowProxyChanel(Bootstrap bootstrap, final ProxyChannelBorrowListener borrowListener) {
Channel channel = proxyChannelPool.poll();
if (channel != null) {
borrowListener.success(channel);
return;
}

bootstrap.connect(config.getStringValue("server.host"), config.getIntValue("server.port")).addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
borrowListener.success(future.channel());
} else {
logger.warn("connect proxy server failed", future.cause());
borrowListener.error(future.cause());
}
}
});
}

public static Channel getChannel() {
return channel;
public static void returnProxyChanel(Channel proxyChanel) {
if (proxyChannelPool.size() > MAX_POOL_SIZE) {
proxyChanel.close();
} else {
proxyChannelPool.offer(proxyChanel);
logger.debug("return ProxyChanel to the pool, channel is {}, pool size is {} ", proxyChanel, proxyChannelPool.size());
}
}

public static void removeProxyChanel(Channel proxyChanel) {
proxyChannelPool.remove(proxyChanel);
}

public static void setCmdChannel(Channel cmdChannel) {
ClientChannelMannager.cmdChannel = cmdChannel;
}

public static Channel getCmdChannel() {
return cmdChannel;
}

public static void setRealServerChannelUserId(Channel realServerChannel, String userId) {
realServerChannel.attr(USER_ID).set(userId);
realServerChannel.attr(Constants.USER_ID).set(userId);
}

public static String getRealServerChannelUserId(Channel realServerChannel) {
return realServerChannel.attr(USER_ID).get();
return realServerChannel.attr(Constants.USER_ID).get();
}

public static Channel getRealServerChannel(String userId) {
Expand All @@ -65,8 +106,7 @@ public static Channel removeRealServerChannel(String userId) {
}

public static boolean isRealServerReadable(Channel realServerChannel) {
return realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get()
&& realServerChannel.attr(USER_CHANNEL_WRITEABLE).get();
return realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get();
}

public static void setRealServerChannelReadability(Channel realServerChannel, Boolean client, Boolean user) {
Expand All @@ -83,21 +123,22 @@ public static void setRealServerChannelReadability(Channel realServerChannel, Bo
realServerChannel.attr(USER_CHANNEL_WRITEABLE).set(user);
}

if (realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get()
&& realServerChannel.attr(USER_CHANNEL_WRITEABLE).get()) {
if (realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get()) {
realServerChannel.config().setOption(ChannelOption.AUTO_READ, true);
} else {
realServerChannel.config().setOption(ChannelOption.AUTO_READ, false);
}
}

public static void notifyChannelWritabilityChanged(Channel channel) {
logger.debug("channel writability changed, {}", channel.isWritable());
Iterator<Entry<String, Channel>> entryIte = realServerChannels.entrySet().iterator();
while (entryIte.hasNext()) {
setRealServerChannelReadability(entryIte.next().getValue(), channel.isWritable(), null);
}
}
// public static void notifyChannelWritabilityChanged(Channel channel) {
// logger.debug("channel writability changed, {}", channel.isWritable());
// Iterator<Entry<String, Channel>> entryIte =
// realServerChannels.entrySet().iterator();
// while (entryIte.hasNext()) {
// setRealServerChannelReadability(entryIte.next().getValue(),
// channel.isWritable(), null);
// }
// }

public static void clearRealServerChannels() {
logger.warn("channel closed, clear real server channels");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.fengfei.lanproxy.client;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;

public interface Constants {

public static final AttributeKey<Channel> NEXT_CHANNEL = AttributeKey.newInstance("nxt_channel");

public static final AttributeKey<String> USER_ID = AttributeKey.newInstance("user_id");

public static final AttributeKey<String> CLIENT_KEY = AttributeKey.newInstance("client_key");
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProxyMessageDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
ch.pipeline().addLast(new ProxyMessageEncoder());
ch.pipeline().addLast(new IdleCheckHandler(IdleCheckHandler.READ_IDLE_TIME, IdleCheckHandler.WRITE_IDLE_TIME, 0));
ch.pipeline().addLast(new ClientChannelHandler(realServerBootstrap, ProxyClientContainer.this));
ch.pipeline().addLast(new ClientChannelHandler(realServerBootstrap, bootstrap, ProxyClientContainer.this));
}
});
}
Expand All @@ -110,7 +110,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {

// 连接成功,向服务器发送客户端认证信息(clientKey)
ClientChannelMannager.setChannel(future.channel());
ClientChannelMannager.setCmdChannel(future.channel());
ProxyMessage proxyMessage = new ProxyMessage();
proxyMessage.setType(ProxyMessage.TYPE_AUTH);
proxyMessage.setUri(config.getStringValue("client.key"));
Expand Down
Loading

0 comments on commit d247ab0

Please sign in to comment.