diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java index 140b86504..a705cb023 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java @@ -172,19 +172,12 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); - this.queue = concurrencyStrategy.getBlockingQueue(queueSize); - if (properties.getAllowMaximumSizeToDivergeFromCoreSize().get()) { - this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, - concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), - properties); - this.threadPool = this.metrics.getThreadPool(); - } else { - this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, - concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), - properties); - this.threadPool = this.metrics.getThreadPool(); - } + this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, + concurrencyStrategy.getThreadPool(threadPoolKey, properties), + properties); + this.threadPool = this.metrics.getThreadPool(); + this.queue = this.threadPool.getQueue(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java index 3405cec68..f4bc5dc25 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java @@ -18,6 +18,7 @@ import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixThreadPool; import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolProperties; import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.properties.HystrixProperty; import com.netflix.hystrix.util.PlatformSpecific; @@ -75,9 +76,49 @@ public abstract class HystrixConcurrencyStrategy { * @return instance of {@link ThreadPoolExecutor} */ public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty corePoolSize, HystrixProperty maximumPoolSize, HystrixProperty keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); + + final int dynamicCoreSize = corePoolSize.get(); + final int dynamicMaximumSize = maximumPoolSize.get(); + + if (dynamicCoreSize > dynamicMaximumSize) { + logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); + } else { + return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); + } + } + + public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { + final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); + + final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); + final int dynamicCoreSize = threadPoolProperties.coreSize().get(); + final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); + final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); + final BlockingQueue workQueue = getBlockingQueue(maxQueueSize); + + if (allowMaximumSizeToDivergeFromCoreSize) { + final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); + if (dynamicCoreSize > dynamicMaximumSize) { + logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); + } else { + return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); + } + } else { + return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); + } + } + + private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { ThreadFactory threadFactory = null; if (!PlatformSpecific.isAppEngineStandardEnvironment()) { - threadFactory = new ThreadFactory() { + return new ThreadFactory() { protected final AtomicInteger threadNumber = new AtomicInteger(0); @Override @@ -89,19 +130,7 @@ public Thread newThread(Runnable r) { }; } else { - threadFactory = PlatformSpecific.getAppEngineThreadFactory(); - } - - final int dynamicCoreSize = corePoolSize.get(); - final int dynamicMaximumSize = maximumPoolSize.get(); - - if (dynamicCoreSize > dynamicMaximumSize) { - logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + - dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + - dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); - return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); - } else { - return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); + return PlatformSpecific.getAppEngineThreadFactory(); } }