diff --git a/src/core/src/main/java/org/tio/core/task/CloseRunnable.java b/src/core/src/main/java/org/tio/core/task/CloseRunnable.java index 66dc12cd..47f0eace 100644 --- a/src/core/src/main/java/org/tio/core/task/CloseRunnable.java +++ b/src/core/src/main/java/org/tio/core/task/CloseRunnable.java @@ -193,6 +193,7 @@ recommend that a file or class name and description of purpose be included on */ package org.tio.core.task; +import java.nio.ByteBuffer; import java.util.concurrent.Executor; import org.slf4j.Logger; @@ -203,6 +204,8 @@ recommend that a file or class name and description of purpose be included on import org.tio.core.ChannelContext; import org.tio.core.maintain.MaintainUtils; import org.tio.utils.SystemTimer; +import org.tio.utils.queue.FullWaitQueue; +import org.tio.utils.queue.TioFullWaitQueue; import org.tio.utils.thread.pool.AbstractQueueRunnable; /** @@ -216,6 +219,7 @@ public class CloseRunnable extends AbstractQueueRunnable { public CloseRunnable(Executor executor) { super(executor); + getMsgQueue(); } // long count = 1; @@ -322,4 +326,19 @@ public void runTask() { public String logstr() { return super.logstr(); } + + /** The msg queue. */ + private FullWaitQueue msgQueue = null; + + @Override + public FullWaitQueue getMsgQueue() { + if (msgQueue == null) { + synchronized (this) { + if (msgQueue == null) { + msgQueue = new TioFullWaitQueue(Integer.getInteger("tio.fullqueue.capacity", null), false); + } + } + } + return msgQueue; + } } diff --git a/src/core/src/main/java/org/tio/core/task/DecodeRunnable.java b/src/core/src/main/java/org/tio/core/task/DecodeRunnable.java index 088f481f..722ea9b7 100644 --- a/src/core/src/main/java/org/tio/core/task/DecodeRunnable.java +++ b/src/core/src/main/java/org/tio/core/task/DecodeRunnable.java @@ -202,14 +202,16 @@ recommend that a file or class name and description of purpose be included on import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import org.tio.core.ChannelContext.CloseCode; -import org.tio.core.TioConfig; import org.tio.core.Tio; +import org.tio.core.TioConfig; import org.tio.core.exception.AioDecodeException; import org.tio.core.intf.Packet; import org.tio.core.stat.ChannelStat; import org.tio.core.stat.IpStat; import org.tio.core.utils.ByteBufferUtils; import org.tio.utils.SystemTimer; +import org.tio.utils.queue.FullWaitQueue; +import org.tio.utils.queue.TioFullWaitQueue; import org.tio.utils.thread.pool.AbstractQueueRunnable; /** @@ -221,7 +223,7 @@ recommend that a file or class name and description of purpose be included on public class DecodeRunnable extends AbstractQueueRunnable { private static final Logger log = LoggerFactory.getLogger(DecodeRunnable.class); private ChannelContext channelContext = null; - private TioConfig tioConfig = null; + private TioConfig tioConfig = null; /** * 上一次解码剩下的数据 */ @@ -259,6 +261,7 @@ public DecodeRunnable(ChannelContext channelContext, Executor executor) { super(executor); this.channelContext = channelContext; this.tioConfig = channelContext.tioConfig; + getMsgQueue(); } /** @@ -447,4 +450,23 @@ public String toString() { public String logstr() { return toString(); } + + /** The msg queue. */ + private FullWaitQueue msgQueue = null; + + @Override + public FullWaitQueue getMsgQueue() { + if (tioConfig.useQueueDecode) { + if (msgQueue == null) { + synchronized (this) { + if (msgQueue == null) { + msgQueue = new TioFullWaitQueue(Integer.getInteger("tio.fullqueue.capacity", null), true); + } + } + } + return msgQueue; + } + return null; + } + } diff --git a/src/core/src/main/java/org/tio/core/task/HandlerRunnable.java b/src/core/src/main/java/org/tio/core/task/HandlerRunnable.java index 75664c2c..b070a91f 100644 --- a/src/core/src/main/java/org/tio/core/task/HandlerRunnable.java +++ b/src/core/src/main/java/org/tio/core/task/HandlerRunnable.java @@ -202,11 +202,14 @@ recommend that a file or class name and description of purpose be included on import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; +import org.tio.core.PacketHandlerMode; import org.tio.core.TioConfig; import org.tio.core.intf.Packet; import org.tio.core.stat.IpStat; import org.tio.utils.SystemTimer; import org.tio.utils.lock.MapWithLock; +import org.tio.utils.queue.FullWaitQueue; +import org.tio.utils.queue.TioFullWaitQueue; import org.tio.utils.thread.pool.AbstractQueueRunnable; /** @@ -219,7 +222,7 @@ public class HandlerRunnable extends AbstractQueueRunnable { private static final Logger log = LoggerFactory.getLogger(HandlerRunnable.class); private ChannelContext channelContext = null; - private TioConfig tioConfig = null; + private TioConfig tioConfig = null; private AtomicLong synFailCount = new AtomicLong(); @@ -227,6 +230,7 @@ public HandlerRunnable(ChannelContext channelContext, Executor executor) { super(executor); this.channelContext = channelContext; tioConfig = channelContext.tioConfig; + getMsgQueue(); } /** @@ -320,4 +324,23 @@ public String toString() { public String logstr() { return toString(); } + + /** The msg queue. */ + private FullWaitQueue msgQueue = null; + + @Override + public FullWaitQueue getMsgQueue() { + if (PacketHandlerMode.QUEUE == tioConfig.packetHandlerMode) { + if (msgQueue == null) { + synchronized (this) { + if (msgQueue == null) { + msgQueue = new TioFullWaitQueue(Integer.getInteger("tio.fullqueue.capacity", null), true); + } + } + } + return msgQueue; + } + return null; + } + } diff --git a/src/core/src/main/java/org/tio/core/task/SendRunnable.java b/src/core/src/main/java/org/tio/core/task/SendRunnable.java index bc697301..aee57cd1 100644 --- a/src/core/src/main/java/org/tio/core/task/SendRunnable.java +++ b/src/core/src/main/java/org/tio/core/task/SendRunnable.java @@ -206,15 +206,17 @@ recommend that a file or class name and description of purpose be included on import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import org.tio.core.ChannelContext.CloseCode; -import org.tio.core.TioConfig; import org.tio.core.TcpConst; import org.tio.core.Tio; +import org.tio.core.TioConfig; import org.tio.core.WriteCompletionHandler.WriteCompletionVo; import org.tio.core.intf.AioHandler; import org.tio.core.intf.Packet; import org.tio.core.ssl.SslUtils; import org.tio.core.ssl.SslVo; import org.tio.core.utils.TioUtils; +import org.tio.utils.queue.FullWaitQueue; +import org.tio.utils.queue.TioFullWaitQueue; import org.tio.utils.thread.pool.AbstractQueueRunnable; /** @@ -258,6 +260,8 @@ public SendRunnable(ChannelContext channelContext, Executor executor) { this.tioConfig = channelContext.tioConfig; this.aioHandler = tioConfig.getAioHandler(); this.isSsl = SslUtils.isSsl(tioConfig); + + getMsgQueue(); } @Override @@ -495,4 +499,19 @@ public String logstr() { return toString(); } + /** The msg queue. */ + private FullWaitQueue msgQueue = null; + + @Override + public FullWaitQueue getMsgQueue() { + if (msgQueue == null) { + synchronized (this) { + if (msgQueue == null) { + msgQueue = new TioFullWaitQueue(Integer.getInteger("tio.fullqueue.capacity", null), false); + } + } + } + return msgQueue; + } + } diff --git a/src/parent/pom.xml b/src/parent/pom.xml index 862c1722..4ac580a6 100644 --- a/src/parent/pom.xml +++ b/src/parent/pom.xml @@ -866,6 +866,12 @@ thumbnailator 0.4.8 + + + com.lmax + disruptor + 3.4.2 + diff --git a/src/utils/pom.xml b/src/utils/pom.xml index 419f65dc..350e5934 100644 --- a/src/utils/pom.xml +++ b/src/utils/pom.xml @@ -25,7 +25,7 @@ com.github.ben-manes.caffeine caffeine - + org.redisson diff --git a/src/utils/src/main/java/org/tio/utils/queue/FullWaitQueue.java b/src/utils/src/main/java/org/tio/utils/queue/FullWaitQueue.java index 77330498..0987807c 100644 --- a/src/utils/src/main/java/org/tio/utils/queue/FullWaitQueue.java +++ b/src/utils/src/main/java/org/tio/utils/queue/FullWaitQueue.java @@ -7,7 +7,8 @@ */ public interface FullWaitQueue { /** - * 向队列尾添加一个元素,如果队列已经满了,则等待一段时间 + * write + * 向队列尾添加一个元素,如果队列已经满了,则等待一段时间 * @param t * @return * @author tanyaowu @@ -15,14 +16,15 @@ public interface FullWaitQueue { public boolean add(T t); /** - * Retrieves and removes the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty - */ + * read + * Retrieves and removes the head of this queue, + * or returns {@code null} if this queue is empty. + * + * @return the head of this queue, or {@code null} if this queue is empty + */ public T poll(); - + public void clear(); - + public int size(); } diff --git a/src/utils/src/main/java/org/tio/utils/queue/TioFullWaitQueue.java b/src/utils/src/main/java/org/tio/utils/queue/TioFullWaitQueue.java index 4b4ce4cd..410ad672 100644 --- a/src/utils/src/main/java/org/tio/utils/queue/TioFullWaitQueue.java +++ b/src/utils/src/main/java/org/tio/utils/queue/TioFullWaitQueue.java @@ -1,124 +1,43 @@ package org.tio.utils.queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.ConcurrentLinkedQueue; /** + * 暂时用ConcurrentLinkedQueue代替 * @author tanyaowu - * 2019年9月28日 上午8:49:47 + * 2019年9月30日 上午9:22:00 */ public class TioFullWaitQueue implements FullWaitQueue { - private static Logger log = LoggerFactory.getLogger(TioFullWaitQueue.class); - private T[] array = null; - private Lock lock = new ReentrantLock(); - private Condition fullWaitCondition = lock.newCondition(); - private long waitTimeoutInSecond = 5L; - private int capacity = 0; - private int size = 0; - private int tailIndex = 0; //队列尾部指针,用于添加元素 - private int headIndex = 0; //队列头部指针,用于删除元素 - @SuppressWarnings("unchecked") - public TioFullWaitQueue(int capacity, long waitTimeoutInSecond) { - this.capacity = capacity; - this.waitTimeoutInSecond = waitTimeoutInSecond; - array = (T[]) new Object[capacity]; - } + private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private void init() { - lock.lock(); - try { - size = 0; - tailIndex = 0; //队列尾部指针,用于添加元素 - headIndex = 0; //队列头部指针,用于删除元素 - for (int i = 0; i < array.length; i++) { - array[i] = null; - } - } finally { - lock.unlock(); - } - } - - public boolean isFull() { - return size == capacity; + /** + * + * @param capacity + * @param useSingleProducer + * @author tanyaowu + */ + public TioFullWaitQueue(Integer capacity, boolean useSingleProducer) { } @Override - public boolean add(T t) { - if (t == null) { - return false; - } - lock.lock(); - try { - if (isFull()) { - boolean f = fullWaitCondition.await(waitTimeoutInSecond, TimeUnit.SECONDS); - if (!f) { - log.error("队列已满, 并且没有等到空位置,数据将被丢弃.{}", t); - return false; - } - log.info("队列已满,不过等到了空位置"); - } - array[tailIndex++] = t; - if (tailIndex == capacity) { - tailIndex = 0; - } - size++; - return true; - } catch (InterruptedException e) { - log.error(e.toString(), e); - return false; - } finally { - lock.unlock(); - } + public boolean add(T e) { + return queue.add(e); } - /** - * Retrieves and removes the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty - */ @Override public T poll() { - lock.lock(); - try { - if (size == 0) { - return null; - } - T t = array[headIndex]; - array[headIndex++] = null; - if (headIndex == capacity) { - headIndex = 0; - } - size--; - fullWaitCondition.signal(); - return t; - } finally { - lock.unlock(); - } + return queue.poll(); } - /** - * - * @author tanyaowu - */ @Override - public void clear() { - init(); + public int size() { + return queue.size(); } - /** - * @return - * @author tanyaowu - */ @Override - public int size() { - return size; + public void clear() { + queue.clear(); } } diff --git a/src/utils/src/main/java/org/tio/utils/queue/TioFullWaitQueueTest.java b/src/utils/src/main/java/org/tio/utils/queue/TioFullWaitQueueTest.java deleted file mode 100644 index 71c99fae..00000000 --- a/src/utils/src/main/java/org/tio/utils/queue/TioFullWaitQueueTest.java +++ /dev/null @@ -1,81 +0,0 @@ -package org.tio.utils.queue; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author tanyaowu - * 2019年9月28日 上午9:01:00 - */ -public class TioFullWaitQueueTest { - private static Logger log = LoggerFactory.getLogger(TioFullWaitQueueTest.class); - - private static java.util.concurrent.atomic.AtomicInteger c = new AtomicInteger(); - - public static void main(String[] args) { - TioFullWaitQueue queue = new TioFullWaitQueue<>(10, 5L); - for (int i = 1; i <= 20; i++) { - Thread thread = new Thread(new Producter(queue), String.valueOf(i)); - thread.start(); - } - - try { - Thread.sleep(1000L * 1L); - } catch (InterruptedException e) { - log.error(e.toString(), e); - } - - for (int i = 1; i <= 10; i++) { - Thread thread = new Thread(new Consumer(queue), String.valueOf(i)); - thread.start(); - } - - try { - Thread.sleep(1000L * 20L); - } catch (InterruptedException e) { - log.error(e.toString(), e); - } - } - - static class Producter implements Runnable { - private TioFullWaitQueue queue; - - public Producter(TioFullWaitQueue queue) { - this.queue = queue; - } - - public void produce() { - queue.add(c.incrementAndGet()); - } - - @Override - public void run() { - for (int i = 0; i < 1000; i++) { - produce(); - } - } - } - - static class Consumer implements Runnable { - private TioFullWaitQueue queue; - - public Consumer(TioFullWaitQueue queue) { - this.queue = queue; - } - - public Integer remove() { - return queue.poll(); - } - - @Override - public void run() { - for (int i = 0; i < 1000; i++) { - remove(); - } - - } - } - -} diff --git a/src/utils/src/main/java/org/tio/utils/thread/pool/AbstractQueueRunnable.java b/src/utils/src/main/java/org/tio/utils/thread/pool/AbstractQueueRunnable.java index b1a6bc3b..1a2feea3 100644 --- a/src/utils/src/main/java/org/tio/utils/thread/pool/AbstractQueueRunnable.java +++ b/src/utils/src/main/java/org/tio/utils/thread/pool/AbstractQueueRunnable.java @@ -198,7 +198,6 @@ recommend that a file or class name and description of purpose be included on import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.utils.queue.FullWaitQueue; -import org.tio.utils.queue.TioFullWaitQueue; /** * @@ -208,21 +207,8 @@ recommend that a file or class name and description of purpose be included on public abstract class AbstractQueueRunnable extends AbstractSynRunnable { private static final Logger log = LoggerFactory.getLogger(AbstractQueueRunnable.class); - /** The msg queue. */ - protected FullWaitQueue msgQueue = null; - - /** - * - * @param executor - * @author tanyaowu - */ public AbstractQueueRunnable(Executor executor) { - this(executor, Integer.getInteger("tio.fullqueue.capacity", 10), Long.getLong("tio.fullqueue.wait.timeout", 10L)); - } - - public AbstractQueueRunnable(Executor executor, int capacity, long waitTimeoutInSecond) { super(executor); - msgQueue = new TioFullWaitQueue<>(capacity, waitTimeoutInSecond); } /** @@ -235,26 +221,26 @@ public boolean addMsg(T t) { return false; } - return msgQueue.add(t); + return getMsgQueue().add(t); } /** * 清空处理的队列消息 */ public void clearMsgQueue() { - msgQueue.clear(); + if (getMsgQueue() != null) { + getMsgQueue().clear(); + } } @Override public boolean isNeededExecute() { - return !this.isCanceled() && msgQueue.size() > 0; + return !this.isCanceled() && (getMsgQueue() != null && getMsgQueue().size() > 0); } /** * 获取消息队列 * @return */ - public FullWaitQueue getMsgQueue() { - return msgQueue; - } + public abstract FullWaitQueue getMsgQueue(); }