Skip to content

Commit

Permalink
Merge branch 'master' into wip-2212-recursive-resume-∂π
Browse files Browse the repository at this point in the history
also split out ChildrenContainer into its own file and add
suspendCounter to UnstartedActorCell
  • Loading branch information
rkuhn committed Jul 5, 2012
2 parents 78a3919 + 99c4b30 commit 1cb204d
Show file tree
Hide file tree
Showing 194 changed files with 9,699 additions and 2,302 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ mongoDB/
redis/
beanstalk/
.scalastyle
bin/
17 changes: 17 additions & 0 deletions akka-actor-tests/src/test/java/akka/actor/NonPublicClass.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.actor;

public class NonPublicClass {
public static Props createProps() {
return new Props(MyNonPublicActorClass.class);
}
}

class MyNonPublicActorClass extends UntypedActor {
@Override public void onReceive(Object msg) {
getSender().tell(msg);
}
}
19 changes: 13 additions & 6 deletions akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val readA = in.readObject

a.isInstanceOf[LocalActorRef] must be === true
readA.isInstanceOf[LocalActorRef] must be === true
a.isInstanceOf[ActorRefWithCell] must be === true
readA.isInstanceOf[ActorRefWithCell] must be === true
(readA eq a) must be === true
}

val ser = new JavaSerializer(esys)
val readA = ser.fromBinary(bytes, None)
readA.isInstanceOf[LocalActorRef] must be === true
readA.isInstanceOf[ActorRefWithCell] must be === true
(readA eq a) must be === true
}

Expand Down Expand Up @@ -358,17 +358,24 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
system.stop(serverRef)
}

"support actorOfs where the class of the actor isn't public" in {
val a = system.actorOf(NonPublicClass.createProps())
a.tell("pigdog", testActor)
expectMsg("pigdog")
system stop a
}

"stop when sent a poison pill" in {
val timeout = Timeout(20000)
val ref = system.actorOf(Props(new Actor {
def receive = {
case 5 sender.tell("five")
case null sender.tell("null")
case 5 sender.tell("five")
case 0 sender.tell("null")
}
}))

val ffive = (ref.ask(5)(timeout)).mapTo[String]
val fnull = (ref.ask(null)(timeout)).mapTo[String]
val fnull = (ref.ask(0)(timeout)).mapTo[String]
ref ! PoisonPill

Await.result(ffive, timeout.duration) must be("five")
Expand Down
74 changes: 72 additions & 2 deletions akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import akka.dispatch.Await
import akka.util.duration._
import scala.collection.JavaConverters
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }
import akka.pattern.ask
import akka.util.Timeout
import akka.dispatch.Future

class JavaExtensionSpec extends JavaExtension with JUnitSuite

Expand All @@ -21,8 +24,46 @@ object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider
// Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains
class TestExtension(val system: ExtendedActorSystem) extends Extension

object ActorSystemSpec {

class Waves extends Actor {
var master: ActorRef = _
var terminaters = Set[ActorRef]()

def receive = {
case n: Int
master = sender
terminaters = Set() ++ (for (i 1 to n) yield {
val man = context.watch(context.system.actorOf(Props[Terminater]))
man ! "run"
man
})
case Terminated(child) if terminaters contains child
terminaters -= child
if (terminaters.isEmpty) {
master ! "done"
context stop self
}
}

override def preRestart(cause: Throwable, msg: Option[Any]) {
if (master ne null) {
master ! "failed with " + cause + " while processing " + msg
}
context stop self
}
}

class Terminater extends Actor {
def receive = {
case "run" context.stop(self)
}
}

}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") {
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") with ImplicitSender {

"An ActorSystem" must {

Expand Down Expand Up @@ -112,6 +153,35 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
}.getMessage must be("Must be called prior to system shutdown.")
}

"reliably create waves of actors" in {
import system.dispatcher
implicit val timeout = Timeout(30 seconds)
val waves = for (i 1 to 3) yield system.actorOf(Props[ActorSystemSpec.Waves]) ? 50000
Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done")
}

"reliable deny creation of actors while shutting down" in {
val system = ActorSystem()
system.scheduler.scheduleOnce(200 millis) { system.shutdown() }
var failing = false
var created = Vector.empty[ActorRef]
while (!system.isTerminated && system.uptime < 5) {
try {
val t = system.actorOf(Props[ActorSystemSpec.Terminater])
failing must not be true // because once failing => always failing (it’s due to shutdown)
created :+= t
} catch {
case _: IllegalStateException failing = true
}
}
if (system.uptime >= 5) {
println(created.last)
println(system.asInstanceOf[ExtendedActorSystem].printTree)
system.uptime must be < 5L
}
created filter (ref !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq())
}

}

}
}
20 changes: 20 additions & 0 deletions akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,26 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
result must be(Seq(1, 2, 3))
}
}

"be able to watch a child with the same name after the old died" in {
val parent = system.actorOf(Props(new Actor {
def receive = {
case "NKOTB"
val currentKid = context.watch(context.actorOf(Props(ctx { case "NKOTB" ctx stop ctx.self }), "kid"))
currentKid forward "NKOTB"
context become {
case Terminated(`currentKid`)
testActor ! "GREEN"
context unbecome
}
}
}))

parent ! "NKOTB"
expectMsg("GREEN")
parent ! "NKOTB"
expectMsg("GREEN")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
object FSMTimingSpec {

def suspend(actorRef: ActorRef): Unit = actorRef match {
case l: LocalActorRef l.suspend()
case _
case l: ActorRefWithCell l.suspend()
case _
}

def resume(actorRef: ActorRef): Unit = actorRef match {
case l: LocalActorRef l.resume(inResponseToFailure = false)
case _
case l: ActorRefWithCell l.resume(inResponseToFailure = false)
case _
}

trait State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
actor4 ! Kill

countDownLatch.await(10, TimeUnit.SECONDS)
assert(Await.result(actor1 ? "status", timeout.duration) == "OK", "actor1 is shutdown")
assert(Await.result(actor2 ? "status", timeout.duration) == "OK", "actor2 is shutdown")
assert(Await.result(actor3 ? "status", timeout.duration) == "OK", "actor3 is shutdown")
assert(Await.result(actor4 ? "status", timeout.duration) == "OK", "actor4 is shutdown")

Seq("actor1" -> actor1, "actor2" -> actor2, "actor3" -> actor3, "actor4" -> actor4) map {
case (id, ref) (id, ref ? "status")
} foreach {
case (id, f) (id, Await.result(f, timeout.duration)) must be === ((id, "OK"))
}
}
}

"be able to create named children in its constructor" in {
val a = system.actorOf(Props(new Actor {
context.actorOf(Props.empty, "bob")
def receive = {
case x: Exception throw x
}
def receive = { case x: Exception throw x }
override def preStart(): Unit = testActor ! "preStart"
}))
val m = "weird message"
Expand Down Expand Up @@ -123,20 +123,14 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul

"be able to create a similar kid in the fault handling strategy" in {
val parent = system.actorOf(Props(new Actor {

override val supervisorStrategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) {
override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
val newKid = context.actorOf(Props.empty, child.path.name)
testActor ! {
if ((newKid ne child) && newKid.path == child.path) "green"
else "red"
}
testActor ! { if ((newKid ne child) && newKid.path == child.path) "green" else "red" }
}
}

def receive = {
case "engage" context.stop(context.actorOf(Props.empty, "Robert"))
}
def receive = { case "engage" context.stop(context.actorOf(Props.empty, "Robert")) }
}))
parent ! "engage"
expectMsg("green")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@
*/
package akka.actor.dispatch

import org.scalatest.Assertions._
import akka.testkit._
import akka.dispatch._
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import akka.util.Switch
import java.rmi.RemoteException
import org.junit.{ After, Test }
import java.util.concurrent.{ TimeUnit, CountDownLatch, ConcurrentHashMap }
import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger }

import org.junit.runner.RunWith
import org.scalatest.Assertions.{ fail, assert }
import org.scalatest.junit.JUnitRunner

import com.typesafe.config.Config

import akka.actor._
import util.control.NoStackTrace
import akka.actor.ActorSystem
import akka.util.duration._
import akka.dispatch._
import akka.event.Logging.Error
import com.typesafe.config.Config
import akka.util.Duration
import akka.pattern.ask
import akka.testkit._
import akka.util.{ Timeout, Switch, Duration }
import akka.util.duration._

object ActorModelSpec {

Expand Down Expand Up @@ -201,7 +200,7 @@ object ActorModelSpec {
msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(),
msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(),
restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) {
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher))
val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].dispatcher))
val deadline = System.currentTimeMillis + 1000
try {
await(deadline)(stats.suspensions.get() == suspensions)
Expand Down Expand Up @@ -241,6 +240,13 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa

def newTestActor(dispatcher: String) = system.actorOf(Props[DispatcherActor].withDispatcher(dispatcher))

def awaitStarted(ref: ActorRef): Unit = {
awaitCond(ref match {
case r: RepointableRef r.isStarted
case _ true
}, 1 second, 10 millis)
}

protected def interceptedDispatcher(): MessageDispatcherInterceptor
protected def dispatcherType: String

Expand Down Expand Up @@ -280,6 +286,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
implicit val dispatcher = interceptedDispatcher()
val start, oneAtATime = new CountDownLatch(1)
val a = newTestActor(dispatcher.id)
awaitStarted(a)

a ! CountDown(start)
assertCountDown(start, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
Expand Down Expand Up @@ -328,7 +335,8 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa

"not process messages for a suspended actor" in {
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id).asInstanceOf[LocalActorRef]
val a = newTestActor(dispatcher.id).asInstanceOf[InternalActorRef]
awaitStarted(a)
val done = new CountDownLatch(1)
a.suspend
a ! CountDown(done)
Expand Down Expand Up @@ -436,6 +444,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa

"not double-deregister" in {
implicit val dispatcher = interceptedDispatcher()
for (i 1 to 1000) system.actorOf(Props.empty)
val a = newTestActor(dispatcher.id)
a ! DoubleStop
awaitCond(statsFor(a, dispatcher).registers.get == 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package akka.actor.dispatch

import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.{ Mailbox, Dispatchers }
import akka.actor.{ LocalActorRef, IllegalActorStateException, Actor, Props }

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

import akka.actor.{ Props, ActorRefWithCell, ActorCell, Actor }
import akka.dispatch.Mailbox
import akka.testkit.AkkaSpec

object BalancingDispatcherSpec {
Expand Down Expand Up @@ -51,8 +55,8 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) {
"have fast actor stealing work from slow actor" in {
val finishedCounter = new CountDownLatch(110)

val slow = system.actorOf(Props(new DelayableActor(50, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[LocalActorRef]
val fast = system.actorOf(Props(new DelayableActor(10, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[LocalActorRef]
val slow = system.actorOf(Props(new DelayableActor(50, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[ActorRefWithCell]
val fast = system.actorOf(Props(new DelayableActor(10, finishedCounter)).withDispatcher(delayableActorDispatcher)).asInstanceOf[ActorRefWithCell]

var sentToFast = 0

Expand All @@ -76,11 +80,11 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) {
}

finishedCounter.await(5, TimeUnit.SECONDS)
fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount)
fast.underlying.asInstanceOf[ActorCell].mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
slow.underlying.asInstanceOf[ActorCell].mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
fast.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlying.asInstanceOf[ActorCell].actor.asInstanceOf[DelayableActor].invocationCount)
system.stop(slow)
system.stop(fast)
}
Expand Down
Loading

0 comments on commit 1cb204d

Please sign in to comment.