From 6c96b8ff34953a38b63323a987d7979fcdbe503c Mon Sep 17 00:00:00 2001 From: Velichko Stoykov Date: Tue, 26 Jun 2018 12:41:52 +0300 Subject: [PATCH] Ported QueueingThreadPoolExecutor synchronization improvements from Eclipse Smarthome (#96) * Ported synchronization improvements from https://github.com/eclipse/smarthome/pull/5113/ Signed-off-by: velichko.stoykov --- .../org/jupnp/QueueingThreadPoolExecutor.java | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/bundles/org.jupnp/src/main/java/org/jupnp/QueueingThreadPoolExecutor.java b/bundles/org.jupnp/src/main/java/org/jupnp/QueueingThreadPoolExecutor.java index c2bf1f569..6d0f4853a 100644 --- a/bundles/org.jupnp/src/main/java/org/jupnp/QueueingThreadPoolExecutor.java +++ b/bundles/org.jupnp/src/main/java/org/jupnp/QueueingThreadPoolExecutor.java @@ -21,6 +21,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,16 +60,17 @@ */ public class QueueingThreadPoolExecutor extends ThreadPoolExecutor { - private Logger logger = LoggerFactory.getLogger(QueueingThreadPoolExecutor.class); + private final Logger logger = LoggerFactory.getLogger(QueueingThreadPoolExecutor.class); /** we will use a core pool size of 1 since we allow to timeout core threads. */ final static int CORE_THREAD_POOL_SIZE = 1; /** Our queue for queueing tasks that wait for a thread to become available */ - private LinkedTransferQueue taskQueue = new LinkedTransferQueue<>(); + private final BlockingQueue taskQueue = new LinkedTransferQueue<>(); /** The thread for processing the queued tasks */ private Thread queueThread; + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); final private Object semaphore = new Object(); @@ -109,18 +112,30 @@ public static QueueingThreadPoolExecutor createInstance(String name, int threadP * @param runnable the task to add */ protected void addToQueue(Runnable runnable) { - taskQueue.add(runnable); + lock.readLock().lock(); if (queueThread == null || !queueThread.isAlive()) { - synchronized (this) { + lock.readLock().unlock(); + lock.writeLock().lock(); + try { // check again to make sure it has not been created by another thread if (queueThread == null || !queueThread.isAlive()) { - logger.warn("Thread pool '{}' exhausted, queueing tasks now.", threadPoolName); + logger.info("Thread pool '{}' exhausted, queueing tasks now.", threadPoolName); queueThread = createNewQueueThread(); queueThread.start(); } + + lock.readLock().lock(); + } finally { + lock.writeLock().unlock(); } } + + try { + taskQueue.add(runnable); + } finally { + lock.readLock().unlock(); + } } @Override @@ -181,7 +196,15 @@ public void run() { logger.debug("Executing queued task of thread pool '{}'.", threadPoolName); QueueingThreadPoolExecutor.super.execute(runnable); } else { - break; + lock.writeLock().lock(); + try { + if (taskQueue.isEmpty()) { + queueThread = null; + break; + } + } finally { + lock.writeLock().unlock(); + } } } catch (InterruptedException e) { }