Skip to content

Commit

Permalink
Allow maximum size to affect HystrixThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Oct 4, 2016
1 parent ef6b201 commit c21d586
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;
import rx.functions.Func0;

Expand Down Expand Up @@ -158,6 +160,8 @@ public interface HystrixThreadPool {
* @ThreadSafe
*/
/* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

private final HystrixThreadPoolProperties properties;
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
Expand All @@ -171,7 +175,7 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
this.queue = concurrencyStrategy.getBlockingQueue(queueSize);
this.metrics = HystrixThreadPoolMetrics.getInstance(
threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();

Expand Down Expand Up @@ -205,11 +209,19 @@ public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// allow us to change things via fast-properties by setting it each time
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
int dynamicMaximumSize = properties.maximumSize().get();

if (dynamicMaximumSize < dynamicCoreSize) {
logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is using coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
dynamicMaximumSize = dynamicCoreSize;
}

// In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
if (threadPool.getCorePoolSize() != dynamicCoreSize) {
if (threadPool.getCorePoolSize() != dynamicCoreSize || threadPool.getMaximumPoolSize() != dynamicMaximumSize) {
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicCoreSize); // we always want maxSize the same as coreSize, we are not using a dynamically resizing pool
threadPool.setMaximumPoolSize(dynamicMaximumSize);
}

threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); // this doesn't really matter since we're not resizing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@
*
* It is OK to leave maximumSize unset using any version of Hystrix. If you do, then maximum size will default to
* core size and you'll have a fixed-size threadpool.
*
* If you accidentally set maximumSize < coreSize, then maximum will be raised to coreSize
* (this prioritizes keeping extra threads around rather than inducing threadpool rejections)
*/
public abstract class HystrixThreadPoolProperties {



/* defaults */
private Integer default_coreSize = 10; // core size of thread pool
private Integer default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive (though in practice this doesn't get used as by default we set a fixed size)
private Integer default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
static int default_coreSize = 10; // core size of thread pool
static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive (though in practice this doesn't get used as by default we set a fixed size)
static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
// -1 turns if off and makes us use SynchronousQueue
private Integer default_queueSizeRejectionThreshold = 5; // number of items in queue
private Integer default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
private Integer default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)
static int default_queueSizeRejectionThreshold = 5; // number of items in queue
static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)

private final HystrixProperty<Integer> corePoolSize;
private final HystrixProperty<Integer> maximumPoolSize;
Expand All @@ -67,9 +72,13 @@ protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder)
}

protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder, String propertyPrefix) {
//we allow maximum pool size to be configured lower than core size here
//however, at runtime, if this configuration gets applied, we will always ensure that maximumSize >= coreSize
this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize);
//if maximum size is not explicitly set, then default it to the core size of that pool.
this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), builder.getCoreSize());

//if left unset, maxiumumSize will default to coreSize
this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), corePoolSize.get());

this.keepAliveTime = getProperty(propertyPrefix, key, "keepAliveTimeMinutes", builder.getKeepAliveTimeMinutes(), default_keepAliveTimeMinutes);
this.maxQueueSize = getProperty(propertyPrefix, key, "maxQueueSize", builder.getMaxQueueSize(), default_maxQueueSize);
this.queueSizeRejectionThreshold = getProperty(propertyPrefix, key, "queueSizeRejectionThreshold", builder.getQueueSizeRejectionThreshold(), default_queueSizeRejectionThreshold);
Expand Down Expand Up @@ -260,71 +269,8 @@ public Setter withMetricsRollingStatisticalWindowBuckets(int value) {
return this;
}

/**
* Base properties for unit testing.
*/
/* package */static Setter getUnitTestPropertiesBuilder() {
return new Setter()
.withCoreSize(10)// core size of thread pool
.withMaximumSize(15) //maximum size of thread pool
.withKeepAliveTimeMinutes(1)// minutes to keep a thread alive (though in practice this doesn't get used as by default we set a fixed size)
.withMaxQueueSize(100)// size of queue (but we never allow it to grow this big ... this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
.withQueueSizeRejectionThreshold(10)// number of items in queue at which point we reject (this can be dyamically changed)
.withMetricsRollingStatisticalWindowInMilliseconds(10000)// milliseconds for rolling number
.withMetricsRollingStatisticalWindowBuckets(10);// number of buckets in rolling number (10 1-second buckets)
}

/**
* Return a static representation of the properties with values from the Builder so that UnitTests can create properties that are not affected by the actual implementations which pick up their
* values dynamically.
*
* @param builder builder for a {@link HystrixThreadPoolProperties}
* @return HystrixThreadPoolProperties
*/
/* package */static HystrixThreadPoolProperties asMock(final Setter builder) {
return new HystrixThreadPoolProperties(TestThreadPoolKey.TEST) {

@Override
public HystrixProperty<Integer> coreSize() {
return HystrixProperty.Factory.asProperty(builder.coreSize);
}

@Override
public HystrixProperty<Integer> maximumSize() {
return HystrixProperty.Factory.asProperty(builder.maximumSize);
}

@Override
public HystrixProperty<Integer> keepAliveTimeMinutes() {
return HystrixProperty.Factory.asProperty(builder.keepAliveTimeMinutes);
}

@Override
public HystrixProperty<Integer> maxQueueSize() {
return HystrixProperty.Factory.asProperty(builder.maxQueueSize);
}

@Override
public HystrixProperty<Integer> queueSizeRejectionThreshold() {
return HystrixProperty.Factory.asProperty(builder.queueSizeRejectionThreshold);
}

@Override
public HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds() {
return HystrixProperty.Factory.asProperty(builder.rollingStatisticalWindowInMilliseconds);
}

@Override
public HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets() {
return HystrixProperty.Factory.asProperty(builder.rollingStatisticalWindowBuckets);
}

};

}

private static enum TestThreadPoolKey implements HystrixThreadPoolKey {
TEST
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public HystrixCommandProperties getCommandProperties(HystrixCommandKey commandKe
@Override
public HystrixThreadPoolProperties getThreadPoolProperties(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter builder) {
if (builder == null) {
builder = HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder();
builder = HystrixThreadPoolPropertiesTest.getUnitTestPropertiesBuilder();
}
return HystrixThreadPoolProperties.Setter.asMock(builder);
return HystrixThreadPoolPropertiesTest.asMock(builder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5335,7 +5335,7 @@ private static class TestThreadIsolationWithSemaphoreSetSmallCommand extends Tes
private TestThreadIsolationWithSemaphoreSetSmallCommand(TestCircuitBreaker circuitBreaker, int poolSize, Action0 action) {
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(TestThreadIsolationWithSemaphoreSetSmallCommand.class.getSimpleName()))
.setThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder()
.setThreadPoolPropertiesDefaults(HystrixThreadPoolPropertiesTest.getUnitTestPropertiesBuilder()
.withCoreSize(poolSize).withMaxQueueSize(0))
.setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix;

import static org.junit.Assert.assertEquals;

import org.junit.After;
import org.junit.Test;

import com.netflix.config.ConfigurationManager;
import com.netflix.hystrix.strategy.properties.HystrixProperty;

public class HystrixThreadPoolPropertiesTest {

/**
* Base properties for unit testing.
*/
/* package */static HystrixThreadPoolProperties.Setter getUnitTestPropertiesBuilder() {
return HystrixThreadPoolProperties.Setter()
.withCoreSize(10)// core size of thread pool
.withMaximumSize(15) //maximum size of thread pool
.withKeepAliveTimeMinutes(1)// minutes to keep a thread alive (though in practice this doesn't get used as by default we set a fixed size)
.withMaxQueueSize(100)// size of queue (but we never allow it to grow this big ... this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
.withQueueSizeRejectionThreshold(10)// number of items in queue at which point we reject (this can be dyamically changed)
.withMetricsRollingStatisticalWindowInMilliseconds(10000)// milliseconds for rolling number
.withMetricsRollingStatisticalWindowBuckets(10);// number of buckets in rolling number (10 1-second buckets)
}

/**
* Return a static representation of the properties with values from the Builder so that UnitTests can create properties that are not affected by the actual implementations which pick up their
* values dynamically.
*
* @param builder builder for a {@link HystrixThreadPoolProperties}
* @return HystrixThreadPoolProperties
*/
/* package */static HystrixThreadPoolProperties asMock(final HystrixThreadPoolProperties.Setter builder) {
return new HystrixThreadPoolProperties(TestThreadPoolKey.TEST) {

@Override
public HystrixProperty<Integer> coreSize() {
return HystrixProperty.Factory.asProperty(builder.getCoreSize());
}

@Override
public HystrixProperty<Integer> maximumSize() {
return HystrixProperty.Factory.asProperty(builder.getMaximumSize());
}

@Override
public HystrixProperty<Integer> keepAliveTimeMinutes() {
return HystrixProperty.Factory.asProperty(builder.getKeepAliveTimeMinutes());
}

@Override
public HystrixProperty<Integer> maxQueueSize() {
return HystrixProperty.Factory.asProperty(builder.getMaxQueueSize());
}

@Override
public HystrixProperty<Integer> queueSizeRejectionThreshold() {
return HystrixProperty.Factory.asProperty(builder.getQueueSizeRejectionThreshold());
}

@Override
public HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds() {
return HystrixProperty.Factory.asProperty(builder.getMetricsRollingStatisticalWindowInMilliseconds());
}

@Override
public HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets() {
return HystrixProperty.Factory.asProperty(builder.getMetricsRollingStatisticalWindowBuckets());
}

};

}

private static enum TestThreadPoolKey implements HystrixThreadPoolKey {
TEST
}

@After
public void cleanup() {
ConfigurationManager.getConfigInstance().clear();
}

@Test
public void testSetNeitherCoreNorMaximumSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST, HystrixThreadPoolProperties.Setter()) {

};

assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue());
assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue());
}

@Test
public void testSetCoreSizeOnly() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter().withCoreSize(14)) {

};

assertEquals(14, properties.coreSize().get().intValue());
assertEquals(14, properties.maximumSize().get().intValue());
}

@Test
public void testSetMaximumSizeOnlyLowerThanDefaultCoreSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter().withMaximumSize(3)) {

};

assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue());
assertEquals(3, properties.maximumSize().get().intValue());
}

@Test
public void testSetMaximumSizeOnlyEqualToDefaultCoreSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter().withMaximumSize(HystrixThreadPoolProperties.default_coreSize)) {

};

assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue());
assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue());
}

@Test
public void testSetMaximumSizeOnlyGreaterThanDefaultCoreSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter().withMaximumSize(21)) {

};

assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue());
assertEquals(21, properties.maximumSize().get().intValue());
}

@Test
public void testSetCoreSizeLessThanMaximumSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter()
.withCoreSize(2)
.withMaximumSize(8)) {

};

assertEquals(2, properties.coreSize().get().intValue());
assertEquals(8, properties.maximumSize().get().intValue());
}

@Test
public void testSetCoreSizeEqualToMaximumSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter()
.withCoreSize(7)
.withMaximumSize(7)) {

};

assertEquals(7, properties.coreSize().get().intValue());
assertEquals(7, properties.maximumSize().get().intValue());
}

@Test
public void testSetCoreSizeGreaterThanMaximumSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter()
.withCoreSize(12)
.withMaximumSize(8)) {

};

assertEquals(12, properties.coreSize().get().intValue());
assertEquals(8, properties.maximumSize().get().intValue());
}
}
Loading

0 comments on commit c21d586

Please sign in to comment.