Skip to content

Commit

Permalink
Merge pull request akka#22340 from ortigali/wip-21170-default-blockin…
Browse files Browse the repository at this point in the history
…g-io-dispatcher

Add default-blocking-io-dispatcher and make TTL configurable
  • Loading branch information
patriknw authored Feb 24, 2017
2 parents 3351747 + e9fdd47 commit 0850fe3
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 13 deletions.
26 changes: 22 additions & 4 deletions akka-actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ akka {
router = "consistent-hashing-pool"
nr-of-instances = 4
}
"/IO-DNS/inet-address/*" {
dispatcher = "akka.actor.default-blocking-io-dispatcher"
}
}

default-dispatcher {
Expand Down Expand Up @@ -438,6 +441,16 @@ akka {
mailbox-requirement = ""
}

default-blocking-io-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1

thread-pool-executor {
fixed-pool-size = 16
}
}

default-mailbox {
# FQCN of the MailboxType. The Class of the FQCN must have a public
# constructor with
Expand Down Expand Up @@ -768,7 +781,7 @@ akka {

# Fully qualified config path which holds the dispatcher configuration
# on which file IO tasks are scheduled
file-io-dispatcher = "akka.actor.default-dispatcher"
file-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"

# The maximum number of bytes (or "unlimited") to transfer in one batch
# when using `WriteFile` command which uses `FileChannel.transferTo` to
Expand Down Expand Up @@ -918,9 +931,14 @@ akka {
# Must implement akka.io.DnsProvider
provider-object = "akka.io.InetAddressDnsProvider"

# These TTLs are set to default java 6 values
positive-ttl = 30s
negative-ttl = 10s
# To set the time to cache name resolutions
# Possible values:
# default: sun.net.InetAddressCachePolicy.get() and getNegative()
# forever: cache forever
# never: no caching
# n [time unit]: positive timeout with unit, for example "30 s"
positive-ttl = default
negative-ttl = default

# How often to sweep out expired cache entries.
# Note that this interval has nothing to do with TTLs
Expand Down
2 changes: 1 addition & 1 deletion akka-actor/src/main/scala/akka/io/Dns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class DnsExt(system: ExtendedActorSystem) extends IO.Extension {

val manager: ActorRef = {
system.systemActorOf(
props = Props(classOf[SimpleDnsManager], this).withDeploy(Deploy.local).withDispatcher(Settings.Dispatcher),
props = Props(provider.managerClass, this).withDeploy(Deploy.local).withDispatcher(Settings.Dispatcher),
name = "IO-DNS")
}

Expand Down
26 changes: 22 additions & 4 deletions akka-actor/src/main/scala/akka/io/InetAddressDnsResolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,28 @@ import akka.actor.Actor
import com.typesafe.config.Config

import scala.collection.immutable
import sun.net.{ InetAddressCachePolicy IACP }
import akka.util.Helpers.Requiring

class InetAddressDnsResolver(cache: SimpleDnsCache, config: Config) extends Actor {
val positiveTtl = config.getDuration("positive-ttl", TimeUnit.MILLISECONDS)
val negativeTtl = config.getDuration("negative-ttl", TimeUnit.MILLISECONDS)

import IACP.NEVER // 0 constant

private def getTtl(path: String, positive: Boolean): Long =
config.getString(path) match {
case "default"
(if (positive) IACP.get else IACP.getNegative) match {
case NEVER NEVER
case n if n > 0 TimeUnit.SECONDS.toMillis(n)
case _ Long.MaxValue // forever if negative
}
case "forever" Long.MaxValue
case "never" NEVER
case _ config.getDuration(path, TimeUnit.MILLISECONDS)
.requiring(_ > 0, s"akka.io.dns.$path must be 'default', 'forever', 'never' or positive duration")
}
val positiveTtl = getTtl("positive-ttl", true)
val negativeTtl = getTtl("negative-ttl", false)

override def receive = {
case Dns.Resolve(name)
Expand All @@ -19,12 +37,12 @@ class InetAddressDnsResolver(cache: SimpleDnsCache, config: Config) extends Acto
case None
try {
val answer = Dns.Resolved(name, InetAddress.getAllByName(name))
cache.put(answer, positiveTtl)
if (positiveTtl != NEVER) cache.put(answer, positiveTtl)
answer
} catch {
case e: UnknownHostException
val answer = Dns.Resolved(name, immutable.Seq.empty, immutable.Seq.empty)
cache.put(answer, negativeTtl)
if (negativeTtl != NEVER) cache.put(answer, negativeTtl)
answer
}
}
Expand Down
3 changes: 2 additions & 1 deletion akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ object SimpleDnsCache {
}

def put(answer: Resolved, ttlMillis: Long): Cache = {
val until = clock() + ttlMillis
val until0 = clock() + ttlMillis
val until = if (until0 < 0) Long.MaxValue else until0

new Cache(
queue + new ExpiryEntry(answer.name, until),
Expand Down
4 changes: 1 addition & 3 deletions akka-stream/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ akka {
throughput = 1

thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 16
fixed-pool-size = 16
}
}
}
Expand Down

0 comments on commit 0850fe3

Please sign in to comment.