Skip to content

Commit

Permalink
small modify
Browse files Browse the repository at this point in the history
  • Loading branch information
tywo45 committed Sep 30, 2019
1 parent 7a572d3 commit b10bb31
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 213 deletions.
19 changes: 19 additions & 0 deletions src/core/src/main/java/org/tio/core/task/CloseRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ recommend that a file or class name and description of purpose be included on
*/
package org.tio.core.task;

import java.nio.ByteBuffer;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
Expand All @@ -203,6 +204,8 @@ recommend that a file or class name and description of purpose be included on
import org.tio.core.ChannelContext;
import org.tio.core.maintain.MaintainUtils;
import org.tio.utils.SystemTimer;
import org.tio.utils.queue.FullWaitQueue;
import org.tio.utils.queue.TioFullWaitQueue;
import org.tio.utils.thread.pool.AbstractQueueRunnable;

/**
Expand All @@ -216,6 +219,7 @@ public class CloseRunnable extends AbstractQueueRunnable<ChannelContext> {

public CloseRunnable(Executor executor) {
super(executor);
getMsgQueue();
}
// long count = 1;

Expand Down Expand Up @@ -322,4 +326,19 @@ public void runTask() {
public String logstr() {
return super.logstr();
}

/** The msg queue. */
private FullWaitQueue<ChannelContext> msgQueue = null;

@Override
public FullWaitQueue<ChannelContext> getMsgQueue() {
if (msgQueue == null) {
synchronized (this) {
if (msgQueue == null) {
msgQueue = new TioFullWaitQueue<ChannelContext>(Integer.getInteger("tio.fullqueue.capacity", null), false);
}
}
}
return msgQueue;
}
}
26 changes: 24 additions & 2 deletions src/core/src/main/java/org/tio/core/task/DecodeRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,16 @@ recommend that a file or class name and description of purpose be included on
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.ChannelContext.CloseCode;
import org.tio.core.TioConfig;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.core.stat.ChannelStat;
import org.tio.core.stat.IpStat;
import org.tio.core.utils.ByteBufferUtils;
import org.tio.utils.SystemTimer;
import org.tio.utils.queue.FullWaitQueue;
import org.tio.utils.queue.TioFullWaitQueue;
import org.tio.utils.thread.pool.AbstractQueueRunnable;

/**
Expand All @@ -221,7 +223,7 @@ recommend that a file or class name and description of purpose be included on
public class DecodeRunnable extends AbstractQueueRunnable<ByteBuffer> {
private static final Logger log = LoggerFactory.getLogger(DecodeRunnable.class);
private ChannelContext channelContext = null;
private TioConfig tioConfig = null;
private TioConfig tioConfig = null;
/**
* 上一次解码剩下的数据
*/
Expand Down Expand Up @@ -259,6 +261,7 @@ public DecodeRunnable(ChannelContext channelContext, Executor executor) {
super(executor);
this.channelContext = channelContext;
this.tioConfig = channelContext.tioConfig;
getMsgQueue();
}

/**
Expand Down Expand Up @@ -447,4 +450,23 @@ public String toString() {
public String logstr() {
return toString();
}

/** The msg queue. */
private FullWaitQueue<ByteBuffer> msgQueue = null;

@Override
public FullWaitQueue<ByteBuffer> getMsgQueue() {
if (tioConfig.useQueueDecode) {
if (msgQueue == null) {
synchronized (this) {
if (msgQueue == null) {
msgQueue = new TioFullWaitQueue<ByteBuffer>(Integer.getInteger("tio.fullqueue.capacity", null), true);
}
}
}
return msgQueue;
}
return null;
}

}
25 changes: 24 additions & 1 deletion src/core/src/main/java/org/tio/core/task/HandlerRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,14 @@ recommend that a file or class name and description of purpose be included on
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.PacketHandlerMode;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.stat.IpStat;
import org.tio.utils.SystemTimer;
import org.tio.utils.lock.MapWithLock;
import org.tio.utils.queue.FullWaitQueue;
import org.tio.utils.queue.TioFullWaitQueue;
import org.tio.utils.thread.pool.AbstractQueueRunnable;

/**
Expand All @@ -219,14 +222,15 @@ public class HandlerRunnable extends AbstractQueueRunnable<Packet> {
private static final Logger log = LoggerFactory.getLogger(HandlerRunnable.class);

private ChannelContext channelContext = null;
private TioConfig tioConfig = null;
private TioConfig tioConfig = null;

private AtomicLong synFailCount = new AtomicLong();

public HandlerRunnable(ChannelContext channelContext, Executor executor) {
super(executor);
this.channelContext = channelContext;
tioConfig = channelContext.tioConfig;
getMsgQueue();
}

/**
Expand Down Expand Up @@ -320,4 +324,23 @@ public String toString() {
public String logstr() {
return toString();
}

/** The msg queue. */
private FullWaitQueue<Packet> msgQueue = null;

@Override
public FullWaitQueue<Packet> getMsgQueue() {
if (PacketHandlerMode.QUEUE == tioConfig.packetHandlerMode) {
if (msgQueue == null) {
synchronized (this) {
if (msgQueue == null) {
msgQueue = new TioFullWaitQueue<Packet>(Integer.getInteger("tio.fullqueue.capacity", null), true);
}
}
}
return msgQueue;
}
return null;
}

}
21 changes: 20 additions & 1 deletion src/core/src/main/java/org/tio/core/task/SendRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,17 @@ recommend that a file or class name and description of purpose be included on
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.ChannelContext.CloseCode;
import org.tio.core.TioConfig;
import org.tio.core.TcpConst;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.WriteCompletionHandler.WriteCompletionVo;
import org.tio.core.intf.AioHandler;
import org.tio.core.intf.Packet;
import org.tio.core.ssl.SslUtils;
import org.tio.core.ssl.SslVo;
import org.tio.core.utils.TioUtils;
import org.tio.utils.queue.FullWaitQueue;
import org.tio.utils.queue.TioFullWaitQueue;
import org.tio.utils.thread.pool.AbstractQueueRunnable;

/**
Expand Down Expand Up @@ -258,6 +260,8 @@ public SendRunnable(ChannelContext channelContext, Executor executor) {
this.tioConfig = channelContext.tioConfig;
this.aioHandler = tioConfig.getAioHandler();
this.isSsl = SslUtils.isSsl(tioConfig);

getMsgQueue();
}

@Override
Expand Down Expand Up @@ -495,4 +499,19 @@ public String logstr() {
return toString();
}

/** The msg queue. */
private FullWaitQueue<Packet> msgQueue = null;

@Override
public FullWaitQueue<Packet> getMsgQueue() {
if (msgQueue == null) {
synchronized (this) {
if (msgQueue == null) {
msgQueue = new TioFullWaitQueue<Packet>(Integer.getInteger("tio.fullqueue.capacity", null), false);
}
}
}
return msgQueue;
}

}
6 changes: 6 additions & 0 deletions src/parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,12 @@
<artifactId>thumbnailator</artifactId>
<version>0.4.8</version>
</dependency>

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
2 changes: 1 addition & 1 deletion src/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>


<dependency>
<groupId>org.redisson</groupId>
Expand Down
18 changes: 10 additions & 8 deletions src/utils/src/main/java/org/tio/utils/queue/FullWaitQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,24 @@
*/
public interface FullWaitQueue<T> {
/**
* 向队列尾添加一个元素,如果队列已经满了,则等待一段时间
* write
* 向队列尾添加一个元素,如果队列已经满了,则等待一段时间
* @param t
* @return
* @author tanyaowu
*/
public boolean add(T t);

/**
* Retrieves and removes the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
* read
* Retrieves and removes the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
public T poll();

public void clear();

public int size();
}
Loading

0 comments on commit b10bb31

Please sign in to comment.