Skip to content

Commit

Permalink
Merge pull request akka#19211 from akka/wip-19201-max-pools-size-patr…
Browse files Browse the repository at this point in the history
…iknw

=act akka#19201 improve configuration of thread-pool-executor
  • Loading branch information
patriknw committed Dec 21, 2015
2 parents 377748b + a1c3dbe commit 290f402
Show file tree
Hide file tree
Showing 19 changed files with 119 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ object LocalActorRefProviderSpec {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 16
core-pool-size-max = 16
fixed-pool-size = 16
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ object TypedActorSpec {
type = "akka.dispatch.BalancingDispatcherConfigurator"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 60
core-pool-size-max = 60
max-pool-size-min = 60
max-pool-size-max = 60
fixed-pool-size = 60
}
}
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ object DispatcherActorSpec {
throughput = 101
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
fixed-pool-size = 1
}
}
test-throughput-deadline-dispatcher {
throughput = 2
throughput-deadline-time = 100 milliseconds
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
fixed-pool-size = 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getInt("task-queue-size") should ===(-1)
getString("task-queue-type") should ===("linked")
getBoolean("allow-core-timeout") should ===(true)
getString("fixed-pool-size") should ===("off")
}

// Debug config
Expand Down
30 changes: 20 additions & 10 deletions akka-actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ akka {
}

# This will be used if you have set "executor = "fork-join-executor""
# Underlying thread pool implementation is scala.concurrent.forkjoin.ForkJoinPool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 8
Expand All @@ -351,32 +352,41 @@ akka {
}

# This will be used if you have set "executor = "thread-pool-executor""
# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
thread-pool-executor {
# Keep alive time for threads
keep-alive-time = 60s

# Define a fixed thread pool size with this property. The corePoolSize
# and the maximumPoolSize of the ThreadPoolExecutor will be set to this
# value, if it is defined. Then the other pool-size properties will not
# be used.
fixed-pool-size = off

# Min number of threads to cap factor-based core number to
# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 8

# The core pool size factor is used to determine thread pool core size
# using the following formula: ceil(available processors * factor).
# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 3.0

# Max number of threads to cap factor-based number to
# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 64

# Minimum number of threads to cap factor-based max number to
# (if using a bounded task queue)
# Minimum number of threads to cap factor-based maximumPoolSize number to
max-pool-size-min = 8

# Max no of threads (if using a bounded task queue) is determined by
# calculating: ceil(available processors * factor)
# The max-pool-size-factor is used to determine maximumPoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor)
# The maximumPoolSize will not be less than corePoolSize.
# It is only used if using a bounded task queue.
max-pool-size-factor = 3.0

# Max number of threads to cap factor-based max number to
# (if using a bounded task queue)
# Max number of threads to cap factor-based maximumPoolSize number to
max-pool-size-max = 64

# Specifies the bounded capacity of the task queue (< 1 == unbounded)
Expand Down
43 changes: 25 additions & 18 deletions akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,21 +345,27 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr

protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
import akka.util.Helpers.ConfigOps
ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
.setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
.configure(
Some(config getInt "task-queue-size") flatMap {
case size if size > 0
Some(config getString "task-queue-type") map {
case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size)
case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
} map { qf (q: ThreadPoolConfigBuilder) q.setQueueFactory(qf) }
case _ None
})
val builder =
ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.configure(
Some(config getInt "task-queue-size") flatMap {
case size if size > 0
Some(config getString "task-queue-type") map {
case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size)
case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
} map { qf (q: ThreadPoolConfigBuilder) q.setQueueFactory(qf) }
case _ None
})

if (config.getString("fixed-pool-size") == "off")
builder
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
.setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
else
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))
}

def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
Expand Down Expand Up @@ -435,9 +441,10 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
}

val asyncMode = config.getString("task-peeking-mode") match {
case "FIFO" true
case "LIFO" false
case unsupported throw new IllegalArgumentException(s"""Cannot instantiate ForkJoinExecutorServiceFactory. "task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
case "FIFO" true
case "LIFO" false
case unsupported throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " +
""""task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
}

new ForkJoinExecutorServiceFactory(
Expand Down
13 changes: 5 additions & 8 deletions akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,14 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))

def setFixedPoolSize(size: Int): ThreadPoolConfigBuilder =
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))

def setCorePoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.maxPoolSize < size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(corePoolSize = size))
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = math.max(size, config.maxPoolSize)))

def setMaxPoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.corePoolSize > size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(maxPoolSize = size))
this.copy(config = config.copy(maxPoolSize = math.max(size, config.corePoolSize)))

def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder =
setCorePoolSize(scaledPoolSize(min, multiplier, max))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ object StartupWithOneThreadSpec {
akka.actor.default-dispatcher {
executor = thread-pool-executor
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
fixed-pool-size = 1
}
}
"""
Expand Down
8 changes: 4 additions & 4 deletions akka-docs/rst/java/cluster-sharding.rst
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,11 @@ Inspecting cluster sharding state
---------------------------------
Two requests to inspect the cluster state are available:

`ClusterShard.getShardRegionStateInstance` which will return a `ClusterShard.ShardRegionState` that contains
the `ShardId`s running in a Region and what `EntityId`s are alive for each of them.
``ClusterShard.getShardRegionStateInstance`` which will return a ``ClusterShard.ShardRegionState`` that contains
the identifiers of the shards running in a Region and what entities are alive for each of them.

`ClusterShard.getClusterShardingStatsInstance` which will query all the regions in the cluster and return
a `ClusterShard.ClusterShardingStats` containing the `ShardId`s running in each region and a count
``ClusterShard.getClusterShardingStatsInstance`` which will query all the regions in the cluster and return
a ``ClusterShard.ClusterShardingStats`` containing the identifiers of the shards running in each region and a count
of entities that are alive in each shard.

The purpose of these messages is testing and monitoring, they are not provided to give access to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ public void defineDispatcherInCode() {
//#defining-dispatcher-in-code
}

@SuppressWarnings("unused")
@Test
public void defineFixedPoolSizeDispatcher() {
//#defining-fixed-pool-size-dispatcher
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class)
.withDispatcher("blocking-io-dispatcher"));
//#defining-fixed-pool-size-dispatcher
}

@SuppressWarnings("unused")
@Test
public void definePinnedDispatcher() {
Expand Down
9 changes: 9 additions & 0 deletions akka-docs/rst/java/dispatchers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ There are 3 different types of message dispatchers:
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO:

.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#fixed-pool-size-dispatcher-config

And then using it:

.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTest.java#defining-fixed-pool-size-dispatcher


Configuring a ``PinnedDispatcher``:

.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Expand Down
2 changes: 1 addition & 1 deletion akka-docs/rst/java/lambda-persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persist-persist-caller

First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed,
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor.
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
is extended until all nested ``persist`` callbacks have been handled.
Expand Down
2 changes: 1 addition & 1 deletion akka-docs/rst/java/persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will
.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persist-persist-caller

First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed,
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor.
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
is extended until all nested ``persist`` callbacks have been handled.
Expand Down
4 changes: 2 additions & 2 deletions akka-docs/rst/java/routing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,9 @@ The memory usage is O(n) where n is the number of sizes you allow, i.e. upperBou

Pool with ``OptimalSizeExploringResizer`` defined in configuration:

.. includecode:: code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool

.. includecode:: code/docs/routing/RouterDocTest.java#optimal-size-exploring-resize-pool
.. includecode:: code/docs/jrouting/RouterDocTest.java#optimal-size-exploring-resize-pool

Several more configuration options are available and described in ``akka.actor.deployment.default.optimal-size-exploring-resizer``
section of the reference :ref:`configuration`.
Expand Down
1 change: 0 additions & 1 deletion akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,6 @@ Be careful to not do any operations on the ``Future[Terminated]`` using the ``sy
as ``ExecutionContext`` as it will be shut down with the ``ActorSystem``, instead use for example
the Scala standard library context from ``scala.concurrent.ExecutionContext.global``.


// import system.dispatcher <- this would not work
import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
8 changes: 4 additions & 4 deletions akka-docs/rst/scala/cluster-sharding.rst
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,11 @@ Inspecting cluster sharding state
---------------------------------
Two requests to inspect the cluster state are available:

`ClusterShard.GetShardRegionState` which will return a `ClusterShard.ShardRegionState` that contains
the `ShardId`s running in a Region and what `EntityId`s are alive for each of them.
``ClusterShard.GetShardRegionState`` which will return a ``ClusterShard.ShardRegionState`` that contains
the identifiers of the shards running in a Region and what entities are alive for each of them.

`ClusterShard.GetClusterShardingStats` which will query all the regions in the cluster and return
a `ClusterShard.ClusterShardingStats` containing the `ShardId`s running in each region and a count
``ClusterShard.GetClusterShardingStats`` which will query all the regions in the cluster and return
a ``ClusterShard.ClusterShardingStats`` containing the identifiers of the shards running in each region and a count
of entities that are alive in each shard.

The purpose of these messages is testing and monitoring, they are not provided to give access to
Expand Down
21 changes: 20 additions & 1 deletion akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ object DispatcherDocSpec {
}
//#my-thread-pool-dispatcher-config
//#fixed-pool-size-dispatcher-config
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
//#fixed-pool-size-dispatcher-config
//#my-pinned-dispatcher-config
my-pinned-dispatcher {
executor = "thread-pool-executor"
Expand Down Expand Up @@ -268,11 +279,19 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
val dispatcher = system.dispatchers.lookup("my-dispatcher-bounded-queue")
}

"defining fixed-pool-size dispatcher" in {
val context = system
//#defining-fixed-pool-size-dispatcher
val myActor =
context.actorOf(Props[MyActor].withDispatcher("blocking-io-dispatcher"), "myactor2")
//#defining-fixed-pool-size-dispatcher
}

"defining pinned dispatcher" in {
val context = system
//#defining-pinned-dispatcher
val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor3")
//#defining-pinned-dispatcher
}

Expand Down
9 changes: 9 additions & 0 deletions akka-docs/rst/scala/dispatchers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ There are 3 different types of message dispatchers:
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO:

.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#fixed-pool-size-dispatcher-config

And then using it:

.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-fixed-pool-size-dispatcher

Configuring a ``PinnedDispatcher``:

.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Expand Down
8 changes: 4 additions & 4 deletions akka-docs/rst/scala/persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persist-persist-caller

First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed,
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor.
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
is extended until all nested ``persist`` callbacks have been handled.
Expand Down Expand Up @@ -577,9 +577,9 @@ received.

.. note::

At-least-once delivery implies that original message sending order is not always preserved,
and the destination may receive duplicate messages.
Semantics do not match those of a normal :class:`ActorRef` send operation:
At-least-once delivery implies that original message sending order is not always preserved,
and the destination may receive duplicate messages.
Semantics do not match those of a normal :class:`ActorRef` send operation:

* it is not at-most-once delivery

Expand Down

0 comments on commit 290f402

Please sign in to comment.