From a84cbfb8bfabb9b3f4a167f21760b4adda78039c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 16 Apr 2018 15:14:49 +0100 Subject: [PATCH 1/3] Fixes #10810 by tracking concurrent blockinginstead of max number of threads. --- .../impl/ExecutionContextImpl.scala | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index ae6f9d6fd2b2..4c83a9b8032f 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -9,7 +9,7 @@ package scala.concurrent.impl import java.util.concurrent.{ ForkJoinPool, ForkJoinWorkerThread, ForkJoinTask, Callable, Executor, ExecutorService, ThreadFactory, TimeUnit } -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.Collection import scala.concurrent.{ BlockContext, ExecutionContext, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } import scala.annotation.tailrec @@ -24,26 +24,25 @@ private[scala] class ExecutionContextImpl private[impl] (val executor: Executor, private[concurrent] object ExecutionContextImpl { - // Implement BlockContext on FJP threads final class DefaultThreadFactory( daemonic: Boolean, - maxThreads: Int, + maxBlockers: Int, prefix: String, uncaught: Thread.UncaughtExceptionHandler) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { require(prefix ne null, "DefaultThreadFactory.prefix must be non null") - require(maxThreads > 0, "DefaultThreadFactory.maxThreads must be greater than 0") + require(maxBlockers >= 0, "DefaultThreadFactory.maxBlockers must be greater-or-equal-to 0") - private final val currentNumberOfThreads = new AtomicInteger(0) + private final val currentNumberOfBlockers = new AtomicInteger(0) - @tailrec private final def reserveThread(): Boolean = currentNumberOfThreads.get() match { - case `maxThreads` | Int.`MaxValue` => false - case other => currentNumberOfThreads.compareAndSet(other, other + 1) || reserveThread() + @tailrec private final def newBlocker(): Boolean = currentNumberOfBlockers.get() match { + case `maxBlockers` | Int.`MaxValue` => false + case other => currentNumberOfBlockers.compareAndSet(other, other + 1) || newBlocker() } - @tailrec private final def deregisterThread(): Boolean = currentNumberOfThreads.get() match { + @tailrec private final def freeBlocker(): Boolean = currentNumberOfBlockers.get() match { case 0 => false - case other => currentNumberOfThreads.compareAndSet(other, other - 1) || deregisterThread() + case other => currentNumberOfBlockers.compareAndSet(other, other - 1) || freeBlocker() } def wire[T <: Thread](thread: T): T = { @@ -53,39 +52,42 @@ private[concurrent] object ExecutionContextImpl { thread } - // As per ThreadFactory contract newThread should return `null` if cannot create new thread. - def newThread(runnable: Runnable): Thread = - if (reserveThread()) - wire(new Thread(new Runnable { - // We have to decrement the current thread count when the thread exits - override def run() = try runnable.run() finally deregisterThread() - })) else null + def newThread(runnable: Runnable): Thread = wire(new Thread(runnable)) def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = - if (reserveThread()) { - wire(new ForkJoinWorkerThread(fjp) with BlockContext { - // We have to decrement the current thread count when the thread exits - final override def onTermination(exception: Throwable): Unit = deregisterThread() - final override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { - var result: T = null.asInstanceOf[T] - ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { - @volatile var isdone = false - override def block(): Boolean = { - result = try { - // When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads - BlockContext.withBlockContext(BlockContext.defaultBlockContext) { thunk } - } finally { - isdone = true + wire(new ForkJoinWorkerThread(fjp) with BlockContext { + private[this] var isBlocked: Boolean = false // This is only ever read & written if this thread is the current thread + final override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = + if ((Thread.currentThread eq this) && !isBlocked && newBlocker()) { + try { + isBlocked = true + val b: ForkJoinPool.ManagedBlocker with (() => T) = + new ForkJoinPool.ManagedBlocker with (() => T) { + private[this] var result: T = null.asInstanceOf[T] + private[this] var done: Boolean = false + final override def block(): Boolean = { + try { + if (!done) + result = thunk + } finally { + done = true + } + + true } - true - } - override def isReleasable = isdone - }) - result - } - }) - } else null + final override def isReleasable = done + + final override def apply(): T = result + } + ForkJoinPool.managedBlock(b) + b() + } finally { + isBlocked = false + freeBlocker() + } + } else thunk // Unmanaged blocking + }) } def createDefaultExecutorService(reporter: Throwable => Unit): ExecutorService = { @@ -99,8 +101,6 @@ private[concurrent] object ExecutionContextImpl { def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling) val numThreads = getInt("scala.concurrent.context.numThreads", "x1") // The hard limit on the number of active threads that the thread factory will produce - // scala/bug#8955 Deadlocks can happen if maxNoOfThreads is too low, although we're currently not sure - // about what the exact threshold is. numThreads + 256 is conservatively high. val maxNoOfThreads = getInt("scala.concurrent.context.maxThreads", "x1") val desiredParallelism = range( @@ -116,7 +116,7 @@ private[concurrent] object ExecutionContextImpl { } val threadFactory = new ExecutionContextImpl.DefaultThreadFactory(daemonic = true, - maxThreads = maxNoOfThreads + maxExtraThreads, + maxBlockers = maxExtraThreads, prefix = "scala-execution-context-global", uncaught = uncaughtExceptionHandler) From 88e34c2597cc4dbd326572ebb3098d54e1cdf9b4 Mon Sep 17 00:00:00 2001 From: Jason Zaugg Date: Mon, 23 Apr 2018 08:35:32 +0100 Subject: [PATCH 2/3] Avoid unneeded tree duplicate/reset in default getter, case class synth More of the same as #5875 The change to default getters reduced tree churn by 6x in a real world project. --- src/compiler/scala/tools/nsc/typechecker/Namers.scala | 6 +++--- src/compiler/scala/tools/nsc/typechecker/Unapplies.scala | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/compiler/scala/tools/nsc/typechecker/Namers.scala b/src/compiler/scala/tools/nsc/typechecker/Namers.scala index d36a91669fdb..806025c026c8 100644 --- a/src/compiler/scala/tools/nsc/typechecker/Namers.scala +++ b/src/compiler/scala/tools/nsc/typechecker/Namers.scala @@ -1436,8 +1436,8 @@ trait Namers extends MethodSynthesis { * typechecked, the corresponding param would not yet have the "defaultparam" * flag. */ - private def addDefaultGetters(meth: Symbol, ddef: DefDef, vparamss: List[List[ValDef]], tparams: List[TypeDef], overridden: Symbol) { - val DefDef(_, _, rtparams0, rvparamss0, _, _) = resetAttrs(ddef.duplicate) + private def addDefaultGetters(meth: Symbol, ddef: DefDef, vparamss: List[List[ValDef]], tparams: List[TypeDef], overridden: Symbol): Unit = { + val DefDef(_, _, rtparams0, rvparamss0, _, _) = resetAttrs(deriveDefDef(ddef)(_ => EmptyTree).duplicate) // having defs here is important to make sure that there's no sneaky tree sharing // in methods with multiple default parameters def rtparams = rtparams0.map(_.duplicate) @@ -1523,7 +1523,7 @@ trait Namers extends MethodSynthesis { return // fix #3649 (prevent crash in erroneous source code) } } - val ClassDef(_, _, rtparams, _) = resetAttrs(cdef.duplicate) + val ClassDef(_, _, rtparams, _) = resetAttrs(deriveClassDef(cdef)(_ => Template(Nil, noSelfType, Nil)).duplicate) defTparams = rtparams.map(rt => copyTypeDef(rt)(mods = rt.mods &~ (COVARIANT | CONTRAVARIANT))) nmr } diff --git a/src/compiler/scala/tools/nsc/typechecker/Unapplies.scala b/src/compiler/scala/tools/nsc/typechecker/Unapplies.scala index 909157212578..0945c68add20 100644 --- a/src/compiler/scala/tools/nsc/typechecker/Unapplies.scala +++ b/src/compiler/scala/tools/nsc/typechecker/Unapplies.scala @@ -60,7 +60,8 @@ trait Unapplies extends ast.TreeDSL { } private def constrTparamsInvariant(cdef: ClassDef): List[TypeDef] = { - val ClassDef(_, _, tparams, _) = resetAttrs(cdef.duplicate) + val prunedClassDef = deriveClassDef(cdef)(tmpl => Template(Nil, noSelfType, Nil)) + val ClassDef(_, _, tparams, _) = resetAttrs(prunedClassDef.duplicate) val tparamsInvariant = tparams.map(tparam => copyTypeDef(tparam)(mods = tparam.mods &~ (COVARIANT | CONTRAVARIANT))) tparamsInvariant } From 83576634d5eca43ab064f4b90d535007723a9362 Mon Sep 17 00:00:00 2001 From: Seth Tisue Date: Fri, 11 May 2018 15:49:27 +0200 Subject: [PATCH 3/3] don't let Travis-CI fail on every PR in order to test #6621 we needed to enable Travis-CI on pull requests, but without this change, every PR failed --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4abdda13c070..2d2da13b8cc5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ cache: before_script: - - (cd admin && ./init.sh) + - 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then (cd admin && ./init.sh); fi' stages: - name: build # also builds the spec using jekyll