Skip to content

Commit

Permalink
抽象代码优化,删除空实现的抽象方法
Browse files Browse the repository at this point in the history
  • Loading branch information
Lihuanghe committed Mar 22, 2023
1 parent b85d346 commit 6b2f431
Show file tree
Hide file tree
Showing 18 changed files with 154 additions and 231 deletions.
4 changes: 2 additions & 2 deletions src/main/java/com/zx/sms/codec/cmpp/wap/UniqueLongMsgId.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class UniqueLongMsgId implements Serializable {

}

public UniqueLongMsgId(EndpointEntity entity, LongSMSMessage lmsg) {
UniqueLongMsgId(EndpointEntity entity, LongSMSMessage lmsg) {
this(entity, null, lmsg,DefaultSequenceNumberUtil.getSequenceNo(), false);
}

Expand All @@ -117,7 +117,7 @@ public UniqueLongMsgId(UniqueLongMsgId id, Channel ch) {
this.pkseq = id.getPkseq();
}

public UniqueLongMsgId(UniqueLongMsgId id, LongMessageFrame frame) {
UniqueLongMsgId(UniqueLongMsgId id, LongMessageFrame frame) {
this.id = id.getId();
this.entityId = id.getEntityId();
this.channelId = id.getChannelId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ protected void decode(ChannelHandlerContext ctx, DeliverSm msg, List<Object> out
if(StringUtils.isNumeric(id)) {
try {
Long t = Long.valueOf(id);
int signedInteger = (int)(t.longValue() & 0x0ffffffffL);
pdu.setId(Integer.toHexString(signedInteger));
pdu.setId(Long.toHexString(t));
}catch(NumberFormatException ex) {
logger.warn("java.lang.NumberFormatException For input id.{}",pdu);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package com.zx.sms.connect.manager;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zx.sms.common.NotSupportedException;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Expand All @@ -22,9 +30,11 @@ public abstract class AbstractClientEndpointConnector extends AbstractEndpointCo

private static final Logger logger = LoggerFactory.getLogger(AbstractClientEndpointConnector.class);
private Bootstrap bootstrap = new Bootstrap();
private SslContext sslCtx = null;

public AbstractClientEndpointConnector(EndpointEntity endpoint) {
super(endpoint);
this.sslCtx = createSslCtx();
bootstrap.group(EventLoopGroupFactory.INS.getWorker())
.channel(EventLoopGroupFactory.selectChannelClass())
.option(ChannelOption.TCP_NODELAY, true)
Expand Down Expand Up @@ -79,7 +89,78 @@ public void operationComplete(ChannelFuture f) throws Exception {
return future;
}

@Override
protected ChannelInitializer<?> initPipeLine() {

return new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
EndpointEntity entity = getEndpointEntity();
if ( StringUtils.isNotBlank(entity.getProxy())) {
String uriString = entity.getProxy();
try {
URI uri = URI.create(uriString);
addProxyHandler(ch, uri);
} catch (Exception ex) {
logger.error("parse Proxy URI {} failed.", uriString, ex);
}
}

if (entity.isUseSSL() && sslCtx != null) {
logger.info("EndpointEntity {} Use SSL.",entity);
pipeline.addLast(sslCtx.newHandler(ch.alloc(), entity.getHost(), entity.getPort()));
}
doinitPipeLine(pipeline);
}
};
};

protected abstract void doinitPipeLine(ChannelPipeline pipeline) ;

protected void addProxyHandler(Channel ch, URI proxy) throws NotSupportedException {
if (proxy == null)
return;
String scheme = proxy.getScheme();
String userinfo = proxy.getUserInfo();
String host = proxy.getHost();
int port = proxy.getPort();
String username = null;
String pass = null;

if (StringUtils.isNotBlank(userinfo)) {
int idx = userinfo.indexOf(":");
if (idx > 0) {
username = userinfo.substring(0, idx);
pass = userinfo.substring(idx + 1);
}
}

ChannelPipeline pipeline = ch.pipeline();

if ("HTTP".equalsIgnoreCase(scheme) || "HTTPS".equalsIgnoreCase(scheme) ) {
if (username == null) {
pipeline.addLast(new HttpProxyHandler(new InetSocketAddress(host, port)));
} else {
pipeline.addLast(new HttpProxyHandler(new InetSocketAddress(host, port), username, pass));
}
} else if ("SOCKS5".equalsIgnoreCase(scheme)) {
if (username == null) {
pipeline.addLast(new Socks5ProxyHandler(new InetSocketAddress(host, port)));
} else {
pipeline.addLast(new Socks5ProxyHandler(new InetSocketAddress(host, port), username, pass));
}
} else if ("SOCKS4".equalsIgnoreCase(scheme) || "SOCKS".equalsIgnoreCase(scheme)) {
if (username == null) {
pipeline.addLast(new Socks4ProxyHandler(new InetSocketAddress(host, port)));
} else {
pipeline.addLast(new Socks4ProxyHandler(new InetSocketAddress(host, port), username));
}
} else {
throw new NotSupportedException("not support proxy protocol " + scheme);
}
}

protected SslContext createSslCtx() {

try{
Expand All @@ -93,12 +174,4 @@ protected SslContext createSslCtx() {
}

}
@Override
protected void initSslCtx(Channel ch, EndpointEntity entity) {
ChannelPipeline pipeline = ch.pipeline();
if(entity instanceof ClientEndpoint){
logger.info("EndpointEntity {} Use SSL.",entity);
pipeline.addLast(getSslCtx().newHandler(ch.alloc(), entity.getHost(), entity.getPort()));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
package com.zx.sms.connect.manager;

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zx.sms.BaseMessage;
import com.zx.sms.codec.cmpp.wap.LongMessageMarkerReadHandler;
import com.zx.sms.common.GlobalConstance;
import com.zx.sms.common.NotSupportedException;
import com.zx.sms.common.storedMap.BDBStoredMapFactoryImpl;
import com.zx.sms.common.storedMap.VersionObject;
import com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity;
import com.zx.sms.handler.HAProxyMessageHandler;
import com.zx.sms.handler.MessageLogHandler;
import com.zx.sms.handler.api.AbstractBusinessHandler;
import com.zx.sms.handler.api.BusinessHandlerInterface;
Expand All @@ -30,15 +25,9 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.traffic.WindowSizeChannelTrafficShapingHandler;
import io.netty.util.concurrent.Promise;

Expand All @@ -48,7 +37,6 @@
public abstract class AbstractEndpointConnector implements EndpointConnector<EndpointEntity> {
private static final Logger logger = LoggerFactory.getLogger(AbstractEndpointConnector.class);

private SslContext sslCtx = null;
/**
* 端口
*/
Expand All @@ -60,11 +48,8 @@ public abstract class AbstractEndpointConnector implements EndpointConnector<End

public AbstractEndpointConnector(EndpointEntity endpoint) {
this.endpoint = endpoint;
this.sslCtx = createSslCtx();
}

protected abstract SslContext createSslCtx();

@Override
public EndpointEntity getEndpointEntity() {

Expand Down Expand Up @@ -104,10 +89,6 @@ public Channel fetch() {
return null;
}

public SslContext getSslCtx() {
return sslCtx;
}

@Override
public int getConnectionNum() {

Expand All @@ -122,83 +103,6 @@ private CircularList getChannels() {

protected abstract void doBindHandler(ChannelPipeline pipe, EndpointEntity entity);

protected abstract void doinitPipeLine(ChannelPipeline pipeline);

protected void addProxyHandler(Channel ch, URI proxy) throws NotSupportedException {
if (proxy == null)
return;
String scheme = proxy.getScheme();
String userinfo = proxy.getUserInfo();
String host = proxy.getHost();
int port = proxy.getPort();
String username = null;
String pass = null;

if (StringUtils.isNotBlank(userinfo)) {
int idx = userinfo.indexOf(":");
if (idx > 0) {
username = userinfo.substring(0, idx);
pass = userinfo.substring(idx + 1);
}
}

ChannelPipeline pipeline = ch.pipeline();

if ("HTTP".equalsIgnoreCase(scheme) || "HTTPS".equalsIgnoreCase(scheme) ) {
if (username == null) {
pipeline.addLast(new HttpProxyHandler(new InetSocketAddress(host, port)));
} else {
pipeline.addLast(new HttpProxyHandler(new InetSocketAddress(host, port), username, pass));
}
} else if ("SOCKS5".equalsIgnoreCase(scheme)) {
if (username == null) {
pipeline.addLast(new Socks5ProxyHandler(new InetSocketAddress(host, port)));
} else {
pipeline.addLast(new Socks5ProxyHandler(new InetSocketAddress(host, port), username, pass));
}
} else if ("SOCKS4".equalsIgnoreCase(scheme) || "SOCKS".equalsIgnoreCase(scheme)) {
if (username == null) {
pipeline.addLast(new Socks4ProxyHandler(new InetSocketAddress(host, port)));
} else {
pipeline.addLast(new Socks4ProxyHandler(new InetSocketAddress(host, port), username));
}
} else {
throw new NotSupportedException("not support proxy protocol " + scheme);
}
}

protected ChannelInitializer<?> initPipeLine() {

return new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
EndpointEntity entity = getEndpointEntity();
if (entity instanceof ClientEndpoint && StringUtils.isNotBlank(entity.getProxy())) {
String uriString = entity.getProxy();
try {
URI uri = URI.create(uriString);
addProxyHandler(ch, uri);
} catch (Exception ex) {
logger.error("parse Proxy URI {} failed.", uriString, ex);
}
}

if (entity instanceof ServerEndpoint && entity.isProxyProtocol()) {
logger.info ("add HAProxyMessageHandler .");
pipeline.addLast(new HAProxyMessageDecoder());
pipeline.addLast(new HAProxyMessageHandler());
}

if (entity.isUseSSL() && getSslCtx() != null) {
initSslCtx(ch, entity);
}
doinitPipeLine(pipeline);
}
};
};

public synchronized boolean addChannel(Channel ch) {
int nowConnCnt = getConnectionNum();
EndpointEntity endpoint = getEndpointEntity();
Expand Down Expand Up @@ -311,10 +215,6 @@ protected void bindHandler(ChannelPipeline pipe, EndpointEntity entity) {

}

protected abstract void initSslCtx(Channel ch, EndpointEntity entity);



public Channel[] getallChannel() {
return channels.getall();
}
Expand Down
Loading

0 comments on commit 6b2f431

Please sign in to comment.