Skip to content

Commit

Permalink
Improve Support For Single Threaded Environments (zio#4115)
Browse files Browse the repository at this point in the history
* initial work

* refactor RTSSPec tests to avoid blocking

* yield in more combinators

* don't block when canceling

* don't optimize away zero duration sleeps

* fix Dotty type inference issue

* reimplement ZSTM#orTry

* don't optimize away zero duration sleeps

* don't optimize away zero duration sleeps

* revert runtime settings

* fix flaky test

* don't use deprecated methods

* use nonFlaky
  • Loading branch information
adamgfraser authored Aug 25, 2020
1 parent 26cff65 commit db9818e
Show file tree
Hide file tree
Showing 20 changed files with 106 additions and 115 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
"unensures",
"untrack",
"urio"
]
],
"files.watcherExclude": {
"**/target": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ class TMapContentionBenchmarks {

@Setup(Level.Trial)
def setup(): Unit = {
val schedule = Schedule.recurs(repeatedUpdates)
val keysToUpdate = (1 to 100).toList
val data = (1 to 1000).toList.zipWithIndex
val map = unsafeRun(TMap.fromIterable(data).commit)
val ref = ZTRef.unsafeMake(data.toMap)

mapUpdates = ZIO.foreachPar_(keysToUpdate)(i => map.put(i, i).commit.repeat(schedule))
refUpdates = ZIO.foreachPar_(keysToUpdate)(i => ref.update(_.updated(i, i)).commit.repeat(schedule))
mapUpdates = ZIO.foreachPar_(keysToUpdate)(i => map.put(i, i).commit.repeatN(repeatedUpdates))
refUpdates = ZIO.foreachPar_(keysToUpdate)(i => ref.update(_.updated(i, i)).commit.repeatN(repeatedUpdates))
}

@Benchmark
Expand Down
6 changes: 1 addition & 5 deletions core-tests/jvm/src/test/scala-2.12/zio/StacktracesSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ object StackTracesSpec extends DefaultRunnableSpec {
assert(cause.traces)(isNonEmpty) &&
assert(cause.traces.head.parentTrace.isEmpty)(isFalse) &&
assert(cause.traces.head.parentTrace.get.parentTrace.isEmpty)(isFalse) &&
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(isFalse) &&
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(isFalse) &&
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(
isTrue
)
assert(cause.traces.head.parentTrace.get.parentTrace.get.parentTrace.isEmpty)(isFalse)
}
},
testM("fiber ancestry example with uploads") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import java.util.concurrent.Executors

import scala.concurrent.ExecutionContext

import zio.duration._
import zio.internal.Executor
import zio.test.Assertion._
import zio.test.TestAspect._
Expand Down Expand Up @@ -34,6 +33,6 @@ object CancelableFutureSpecJVM extends ZIOBaseSpec {
)
).unsafeRun(tst)
)
} @@ timeout(1.second)
} @@ nonFlaky
) @@ zioTag(future)
}
30 changes: 16 additions & 14 deletions core-tests/jvm/src/test/scala/zio/RTSSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger
import zio.clock.Clock
import zio.duration._
import zio.test.Assertion._
import zio.test.TestAspect.{ jvm, nonFlaky, silent }
import zio.test.TestAspect.{ nonFlaky, silent }
import zio.test._
import zio.test.environment.Live

Expand Down Expand Up @@ -35,29 +35,31 @@ object RTSSpec extends ZIOBaseSpec {
testM("blocking IO is effect blocking") {
for {
done <- Ref.make(false)
start <- IO.succeed(internal.OneShot.make[Unit])
fiber <- blocking.effectBlockingInterrupt { start.set(()); Thread.sleep(60L * 60L * 1000L) }
start <- Promise.make[Nothing, Unit]
fiber <- blocking.effectBlockingInterrupt { start.unsafeDone(IO.unit); Thread.sleep(60L * 60L * 1000L) }
.ensuring(done.set(true))
.fork
_ <- IO.succeed(start.get())
_ <- start.await
res <- fiber.interrupt
value <- done.get
} yield assert(res)(isInterrupted) && assert(value)(isTrue)
},
} @@ nonFlaky,
testM("cancelation is guaranteed") {
val io =
for {
release <- zio.Promise.make[Nothing, Int]
latch = internal.OneShot.make[Unit]
async = IO.effectAsyncInterrupt[Nothing, Unit] { _ => latch.set(()); Left(release.succeed(42).unit) }
fiber <- async.fork
_ <- IO.effectTotal(latch.get(1000))
_ <- fiber.interrupt.fork
result <- release.await
release <- Promise.make[Nothing, Int]
latch <- Promise.make[Nothing, Unit]
async = IO.effectAsyncInterrupt[Nothing, Unit] { _ =>
latch.unsafeDone(IO.unit); Left(release.succeed(42).unit)
}
fiber <- async.fork
_ <- latch.await
_ <- fiber.interrupt.fork
result <- release.await
} yield result == 42

assertM(io)(isTrue)
},
} @@ nonFlaky,
testM("Fiber dump looks correct") {
for {
promise <- Promise.make[Nothing, Int]
Expand Down Expand Up @@ -93,7 +95,7 @@ object RTSSpec extends ZIOBaseSpec {
} yield (startValue + exitValue) == 42

assertM(io)(isTrue)
} @@ zioTag(interruption) @@ jvm(nonFlaky),
} @@ zioTag(interruption) @@ nonFlaky,
testM("deadlock regression 1") {
import java.util.concurrent.Executors

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object CancelableFutureSpec extends ZIOBaseSpec {
_ <- UIO(f.cancel())
r <- ZIO.fromFuture(_ => f).run
} yield assert(r.succeeded)(isFalse) // not interrupted, as the Future fails when the effect in interrupted.
} @@ timeout(1.second) @@ jvmOnly @@ zioTag(interruption),
} @@ nonFlaky @@ zioTag(interruption),
testM("roundtrip preserves interruptibility") {
for {
start <- Promise.make[Nothing, Unit]
Expand Down
4 changes: 2 additions & 2 deletions core-tests/shared/src/test/scala/zio/ZIOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ object ZIOSpec extends ZIOBaseSpec {
assert(result3.dieOption)(isSome(equalTo(boom))) && assert(result3.interrupted)(isTrue)
}
}
}
} @@ nonFlaky
),
suite("forkAs")(
testM("child has specified name") {
Expand Down Expand Up @@ -2284,7 +2284,7 @@ object ZIOSpec extends ZIOBaseSpec {
testM("deep fork/join identity") {
val n = 20
assertM(concurrentFib(n))(equalTo(fib(n)))
},
} @@ jvmOnly,
testM("effectAsyncM creation is interruptible") {
for {
release <- Promise.make[Nothing, Int]
Expand Down
8 changes: 4 additions & 4 deletions core-tests/shared/src/test/scala/zio/ZQueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ object ZQueueSpec extends ZIOBaseSpec {
f <- IO.forkAll(values.map(queue.offer))
_ <- waitForSize(queue, 10)
out <- Ref.make[List[Int]](Nil)
_ <- queue.take.flatMap(i => out.update(i :: _)).repeat(Schedule.recurs(9))
_ <- queue.take.flatMap(i => out.update(i :: _)).repeatN(9)
l <- out.get
_ <- f.join
} yield assert(l.toSet)(equalTo(values.toSet))
},
testM("offers are suspended by back pressure") {
for {
queue <- Queue.bounded[Int](10)
_ <- queue.offer(1).repeat(Schedule.recurs(9))
_ <- queue.offer(1).repeatN(9)
refSuspended <- Ref.make[Boolean](true)
f <- (queue.offer(2) *> refSuspended.set(false)).fork
_ <- waitForSize(queue, 11)
Expand All @@ -73,7 +73,7 @@ object ZQueueSpec extends ZIOBaseSpec {
f <- IO.forkAll(values.map(queue.offer))
_ <- waitForSize(queue, 10)
out <- Ref.make[List[Int]](Nil)
_ <- queue.take.flatMap(i => out.update(i :: _)).repeat(Schedule.recurs(9))
_ <- queue.take.flatMap(i => out.update(i :: _)).repeatN(9)
l <- out.get
_ <- f.join
} yield assert(l.toSet)(equalTo(values.toSet))
Expand Down Expand Up @@ -270,7 +270,7 @@ object ZQueueSpec extends ZIOBaseSpec {
getter = queue.takeBetween(5, 10)
_ <- getter.race(updater)
count <- counter.get
} yield assert(count > 5)(isTrue)
} yield assert(count >= 5)(isTrue)
}
),
testM("offerAll with takeAll") {
Expand Down
2 changes: 1 addition & 1 deletion core-tests/shared/src/test/scala/zio/ZScopeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object ZScopeSpec extends ZIOBaseSpec {
testM("close can be called multiple times") {
for {
open <- ZScope.make[Unit]
_ <- open.close(()).repeat(Schedule.recurs(10))
_ <- open.close(()).repeatN(10)
value <- open.scope.closed
} yield assert(value)(isTrue)
},
Expand Down
13 changes: 6 additions & 7 deletions core-tests/shared/src/test/scala/zio/stm/TMapSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package zio.stm

import zio.test.Assertion._
import zio.test._
import zio.{ Schedule, URIO, ZIOBaseSpec }
import zio.{ URIO, ZIOBaseSpec }

object TMapSpec extends ZIOBaseSpec {

Expand Down Expand Up @@ -281,12 +281,11 @@ object TMapSpec extends ZIOBaseSpec {
},
testM("parallel value transformation") {
for {
tmap <- TMap.make("a" -> 0).commit
policy = Schedule.recurs(999)
tx = tmap.transformValues(_ + 1).commit.repeat(policy)
n = 2
_ <- URIO.collectAllPar_(List.fill(n)(tx))
res <- tmap.get("a").commit
tmap <- TMap.make("a" -> 0).commit
tx = tmap.transformValues(_ + 1).commit.repeatN(999)
n = 2
_ <- URIO.collectAllPar_(List.fill(n)(tx))
res <- tmap.get("a").commit
} yield assert(res)(isSome(equalTo(2000)))
},
testM("transformValuesM") {
Expand Down
12 changes: 6 additions & 6 deletions core-tests/shared/src/test/scala/zio/stm/ZSTMSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package stm

import zio.duration._
import zio.test.Assertion._
import zio.test.TestAspect.{ jvmOnly, nonFlaky }
import zio.test.TestAspect.nonFlaky
import zio.test._
import zio.test.environment.Live

Expand Down Expand Up @@ -830,8 +830,8 @@ object ZSTMSpec extends ZIOBaseSpec {
for {
sender <- TRef.makeCommit(50)
receiver <- TRef.makeCommit(0)
toReceiver10 = transfer(receiver, sender, 100).repeat(Schedule.recurs(9))
toSender10 = transfer(sender, receiver, 100).repeat(Schedule.recurs(9))
toReceiver10 = transfer(receiver, sender, 100).repeatN(9)
toSender10 = transfer(sender, receiver, 100).repeatN(9)
f <- toReceiver10.zipPar(toSender10).fork
_ <- sender.update(_ + 50).commit
_ <- f.join
Expand Down Expand Up @@ -1103,7 +1103,7 @@ object ZSTMSpec extends ZIOBaseSpec {
updater = ref.update(_ + 10).commit.forever
res <- (left <|> right).commit.race(updater)
} yield assert(res)(equalTo("left"))
} @@ jvmOnly,
},
testM("fails if left fails") {
val left = STM.fail("left")
val right = STM.succeed("right")
Expand Down Expand Up @@ -1433,7 +1433,7 @@ object ZSTMSpec extends ZIOBaseSpec {
_ <- tvar.set(v + 1)
v <- tvar.get
} yield v)
.repeat(Schedule.recurs(n) *> Schedule.identity)
.repeatN(n)

def compute3VarN(
n: Int,
Expand All @@ -1450,7 +1450,7 @@ object ZSTMSpec extends ZIOBaseSpec {
_ <- tvar1.set(v1 - 1)
_ <- tvar2.set(v2 + 1)
} yield v3)
.repeat(Schedule.recurs(n) *> Schedule.identity)
.repeatN(n)

def transfer(receiver: TRef[Int], sender: TRef[Int], much: Int): UIO[Int] =
STM.atomically {
Expand Down
4 changes: 0 additions & 4 deletions core/js/src/main/scala/zio/clock/PlatformSpecific.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ private[clock] trait PlatformSpecific {

override def schedule(task: Runnable, duration: Duration): CancelToken = duration match {
case Duration.Infinity => ConstFalse
case Duration.Zero =>
task.run()

ConstFalse
case Duration.Finite(_) =>
var completed = false

Expand Down
4 changes: 0 additions & 4 deletions core/jvm/src/main/scala/zio/clock/PlatformSpecific.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ private[clock] trait PlatformSpecific {

override def schedule(task: Runnable, duration: Duration): CancelToken = duration match {
case Duration.Infinity => ConstFalse
case Duration.Zero =>
task.run()

ConstFalse
case Duration.Finite(_) =>
val future = service.schedule(
new Runnable {
Expand Down
4 changes: 0 additions & 4 deletions core/native/src/main/scala/zio/clock/PlatformSpecific.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ private[clock] trait PlatformSpecific {

override def schedule(task: Runnable, duration: Duration): CancelToken = duration match {
case Duration.Infinity => ConstFalse
case Duration.Zero =>
task.run()

ConstFalse
case Duration.Finite(_) =>
var completed = false

Expand Down
25 changes: 19 additions & 6 deletions core/shared/src/main/scala/zio/Runtime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,13 @@ trait Runtime[+R] {
* This method is effectful and should only be invoked at the edges of your program.
*/
final def unsafeRunAsyncCancelable[E, A](zio: => ZIO[R, E, A])(k: Exit[E, A] => Any): Fiber.Id => Exit[E, A] = {
lazy val curZIO = if (Platform.isJS) zio else ZIO.yieldNow *> zio
unsafeRunWith(curZIO)(k)
lazy val curZio = if (Platform.isJS) zio else ZIO.yieldNow *> zio
val canceler = unsafeRunWith(curZio)(k)
fiberId => {
val result = internal.OneShot.make[Exit[E, A]]
canceler(fiberId)(result.set)
result.get()
}
}

/**
Expand All @@ -122,9 +127,15 @@ trait Runtime[+R] {
*/
final def unsafeRunToFuture[E <: Throwable, A](zio: ZIO[R, E, A]): CancelableFuture[A] = {
val p: concurrent.Promise[A] = scala.concurrent.Promise[A]()
val canceler = unsafeRunAsyncCancelable(zio)(_.fold(cause => p.failure(cause.squashTraceWith(identity)), p.success))

val canceler = unsafeRunWith(zio)(_.fold(cause => p.failure(cause.squashTraceWith(identity)), p.success))

new CancelableFuture[A](p.future) {
def cancel(): Future[Exit[Throwable, A]] = Future.successful(canceler(Fiber.Id.None))
def cancel(): Future[Exit[Throwable, A]] = {
val p: concurrent.Promise[Exit[Throwable, A]] = scala.concurrent.Promise[Exit[Throwable, A]]()
canceler(Fiber.Id.None)(p.success)
p.future
}
}
}

Expand Down Expand Up @@ -163,7 +174,9 @@ trait Runtime[+R] {
*/
def withTracingConfig(config: TracingConfig): Runtime[R] = mapPlatform(_.withTracingConfig(config))

private final def unsafeRunWith[E, A](zio: => ZIO[R, E, A])(k: Exit[E, A] => Any): Fiber.Id => Exit[E, A] = {
private final def unsafeRunWith[E, A](
zio: => ZIO[R, E, A]
)(k: Exit[E, A] => Any): Fiber.Id => (Exit[E, A] => Any) => Unit = {
val InitialInterruptStatus = InterruptStatus.Interruptible

val fiberId = Fiber.newFiberId()
Expand Down Expand Up @@ -194,7 +207,7 @@ trait Runtime[+R] {
context.evaluateNow(ZIOFn.recordStackTrace(() => zio)(zio.asInstanceOf[IO[E, A]]))
context.runAsync(k)

fiberId => unsafeRun(context.interruptAs(fiberId))
fiberId => k => unsafeRunAsync(context.interruptAs(fiberId))((exit: Exit[Nothing, Exit[E, A]]) => k(exit.flatten))
}
}

Expand Down
Loading

0 comments on commit db9818e

Please sign in to comment.