Skip to content

Commit

Permalink
增加tcp.aio基础类
Browse files Browse the repository at this point in the history
  • Loading branch information
juzzs6212 committed Aug 9, 2018
1 parent 74cb298 commit 55190cc
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 0 deletions.
201 changes: 201 additions & 0 deletions src/main/java/utils/tcp/aio/AsyncClientHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package utils.tcp.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class AsyncClientHandler implements CompletionHandler<Void, AsynchronousSocketChannel>{
private AsynchronousSocketChannel channel;
private ReentrantReadWriteLock channelLock = new ReentrantReadWriteLock();
private volatile boolean connIng = false;
private volatile boolean connEd = false;

private volatile LinkedList<ByteBuffer> bsList = new LinkedList<>();
private ReentrantLock bsListLock = new ReentrantLock();
private ReentrantLock sendLock = new ReentrantLock();
private volatile boolean sendIng = false;

private String host;
private int port;
private ReadHandler readHandler;
private WriteHandler writeHandler;
private String clientId;


public AsyncClientHandler(String host, int port, ReadHandler readHandler) {
this.host = host;
this.port = port;
this.clientId = UUID.randomUUID().toString();

this.readHandler = readHandler;
this.readHandler.setClient(this);
this.writeHandler = new WriteHandler(this);
}

public String getClientId() {
return clientId;
}

public void beginConnect() {
try {
channelLock.writeLock().lock();
// 正在连接中直接返回
if (connIng || connEd)
return;
connect();
} finally {
channelLock.writeLock().unlock();
}
}

private void connect(){
//发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
//创建异步的客户端通道
AsynchronousSocketChannel client = null;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
return;
}
client.connect(new InetSocketAddress(host, port), client, this);
connIng = true;
}

public void reConnect(){
try {
channelLock.writeLock().lock();
connEd = false;
connIng = false;

connect();
} finally {
channelLock.writeLock().unlock();
}
}

public void closeConnect(AsynchronousSocketChannel socketChannel) {
try {
if (socketChannel.isOpen()) {
System.err.println("通道关闭...");
socketChannel.close();
}
} catch (IOException e) {
}
}

@Override
public void completed(Void result, AsynchronousSocketChannel attachment) {
connResult(true , attachment);
System.out.println("conn ok");
}

@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
connResult(false , attachment);
System.out.println("conn fail");
}

public void connResult(boolean flag, AsynchronousSocketChannel attachment){
try {
channelLock.writeLock().lock();
if (flag) {
this.channel = attachment;
connIng = false;
connEd = true;
} else {
connIng = false;
connEd = false;
}
} finally {
channelLock.writeLock().unlock();
}
if (flag){
readHandler.beginRead(attachment);
beginWrite(attachment);
}
}

public void sendResult(boolean flag, AsynchronousSocketChannel attachment){
// 开始收发
try {
sendLock.lock();
sendIng = false;
if (flag) {
try {
bsListLock.lock();
bsList.removeFirst();
if (!bsList.isEmpty()) {
ByteBuffer writeBuffer = bsList.getFirst();
attachment.write(writeBuffer, attachment, writeHandler);
sendIng = true;
}
} finally {
bsListLock.unlock();
}
}
} finally {
sendLock.unlock();
}
}

private void beginWrite(AsynchronousSocketChannel attachment){
// 开始收发
try {
sendLock.lock();
try {
bsListLock.lock();
if (!bsList.isEmpty()) {
ByteBuffer writeBuffer = bsList.getFirst();
attachment.write(writeBuffer, attachment, writeHandler);
sendIng = true;
}
} finally {
bsListLock.unlock();
}
} finally {
sendLock.unlock();
}
}

private void beginWrite() {
AsynchronousSocketChannel ch = null;
try {
channelLock.readLock().lock();
if (connEd) {
ch = channel;
} else {
return;
}
} finally {
channelLock.readLock().unlock();
}
beginWrite(ch);
}

public void sendBytes(byte[] bs) {
ByteBuffer writeBuffer = ByteBuffer.allocate(bs.length);
writeBuffer.put(bs);
writeBuffer.flip();
try {
bsListLock.lock();
bsList.addLast(writeBuffer);
} finally {
bsListLock.unlock();
}
try {
sendLock.lock();
if (!sendIng){
beginWrite();
}
} finally {
sendLock.unlock();
}
}
}
15 changes: 15 additions & 0 deletions src/main/java/utils/tcp/aio/DefReadHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package utils.tcp.aio;

public class DefReadHandler extends ReadHandler {
@Override
public void handleRecv(byte[] data) {
if (data != null && data.length > 0) {
System.out.println(new String(data));
}
}

@Override
public void connColse() {
//recvBuf = new byte[0];
}
}
70 changes: 70 additions & 0 deletions src/main/java/utils/tcp/aio/ReadHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package utils.tcp.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public abstract class ReadHandler implements CompletionHandler<Integer, AsynchronousSocketChannel> {
private AsyncClientHandler client;
private ByteBuffer readBuffer;// 接收缓冲区
private int analyzeBufferSize;

public ReadHandler() {
this.readBuffer = ByteBuffer.allocate(256);
}

public AsyncClientHandler getClient() {
return client;
}

public void setClient(AsyncClientHandler client) {
this.client = client;
}

public void beginRead(AsynchronousSocketChannel attachment) {
readBuffer.clear();
attachment.read(readBuffer, attachment, this);
}

@Override
public void completed(Integer result, AsynchronousSocketChannel attachment) {
System.out.println("read ok");
// 收到字节为0表示断开连接了
if (result > 0) {
analyzeBufferSize += result;
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
try {
handleRecv(bytes);
} catch (Exception e){
e.printStackTrace();
}
try {
beginRead(attachment);// 收到后继续接收
return;
} catch (Exception e) {
e.printStackTrace();
}
}
closeConnect(attachment);
}

@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
System.out.println("write fail");
closeConnect(attachment);
}

private void closeConnect(AsynchronousSocketChannel attachment){
// 如果走到这里说明有问题 直接关闭链路
client.closeConnect(attachment);
// 开始重新建立链路
client.reConnect();
connColse();
}

public abstract void handleRecv(byte[] data);

public abstract void connColse();
}
19 changes: 19 additions & 0 deletions src/main/java/utils/tcp/aio/TcpClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package utils.tcp.aio;

import java.util.concurrent.atomic.AtomicLong;

public class TcpClientTest {
public static AtomicLong sendCnt = new AtomicLong();
public static AtomicLong recvCnt = new AtomicLong();

public static void main(String[] args) throws Exception {
AsyncClientHandler client = new AsyncClientHandler("192.168.1.98", 45678, new DefReadHandler());
client.beginConnect();
while (true) {
System.in.read();
System.out.println("I am coming.");
byte[] bs = "Hello World".getBytes();
client.sendBytes(bs);
}
}
}
26 changes: 26 additions & 0 deletions src/main/java/utils/tcp/aio/WriteHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package utils.tcp.aio;

import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class WriteHandler implements CompletionHandler<Integer, AsynchronousSocketChannel> {
private AsyncClientHandler client;
private long writeByteCnt;

public WriteHandler(AsyncClientHandler client) {
this.client = client;
}

@Override
public void completed(Integer result, AsynchronousSocketChannel attachment) {
writeByteCnt += result;
client.sendResult(true, attachment);
System.out.println("write ok");
}

@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
client.sendResult(false, attachment);
System.out.println("write fail");
}
}

0 comments on commit 55190cc

Please sign in to comment.