diff --git a/akka-actor/src/main/scala/actor/FSM.scala b/akka-actor/src/main/scala/actor/FSM.scala index 0bdc04fc48f..c5eb00a6fd2 100644 --- a/akka-actor/src/main/scala/actor/FSM.scala +++ b/akka-actor/src/main/scala/actor/FSM.scala @@ -4,58 +4,125 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.stm.Ref -import se.scalablesolutions.akka.stm.local._ - +import scala.collection.mutable import java.util.concurrent.{ScheduledFuture, TimeUnit} -trait FSM[S] { this: Actor => +trait FSM[S, D] { + this: Actor => type StateFunction = scala.PartialFunction[Event, State] - var currentState: State = initialState - var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + + private val transitions = mutable.Map[S, StateFunction]() + + private def register(name: S, function: StateFunction) { + if (transitions contains name) { + transitions(name) = transitions(name) orElse function + } else { + transitions(name) = function + } + } + + protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = { + setState(State(stateName, stateData, timeout)) + } - def initialState: State + protected final def inState(stateName: S)(stateFunction: StateFunction) = { + register(stateName, stateFunction) + } + + protected final def goto(nextStateName: S): State = { + State(nextStateName, currentState.stateData) + } - def handleEvent: StateFunction = { - case event@Event(value, stateData) => - log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id) - State(NextState, currentState.stateFunction, stateData, currentState.timeout) + protected final def stay(): State = { + goto(currentState.stateName) + } + + protected final def reply(replyValue: Any): State = { + self.sender.foreach(_ ! replyValue) + stay() + } + + /** + * Stop + */ + protected final def stop(): State = { + stop(Normal) + } + + protected final def stop(reason: Reason): State = { + stop(reason, currentState.stateData) + } + + protected final def stop(reason: Reason, stateData: D): State = { + log.info("Stopped because of reason: %s", reason) + terminate(reason, currentState.stateName, stateData) + self.stop + State(currentState.stateName, stateData) + } + + def terminate(reason: Reason, stateName: S, stateData: D) = () + + def whenUnhandled(stateFunction: StateFunction) = { + handleEvent = stateFunction + } + + private var handleEvent: StateFunction = { + case Event(value, stateData) => + log.warning("Event %s not handled in state %s - keeping current state with data %s", value, currentState.stateName, stateData) + currentState } override final protected def receive: Receive = { + case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => () + // state timeout when new message in queue, skip this timeout case value => { timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} - val event = Event(value, currentState.stateData) - val newState = (currentState.stateFunction orElse handleEvent).apply(event) - - currentState = newState - - newState match { - case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue) - case _ => () // ignore for now + val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event) + if (self.isRunning) { + setState(nextState) } + } + } - newState.timeout.foreach { - timeout => - timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) - } + private def setState(nextState: State) = { + if (!transitions.contains(nextState.stateName)) { + stop(Failure("Next state %s not available".format(nextState.stateName))) + } else { + currentState = nextState + currentState.timeout.foreach {t => timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))} } } - case class State(stateEvent: StateEvent, - stateFunction: StateFunction, - stateData: S, - timeout: Option[Int] = None, - replyValue: Option[Any] = None) + case class Event(event: Any, stateData: D) + + case class State(stateName: S, stateData: D, timeout: Option[Long] = None) { + def until(timeout: Long): State = { + copy(timeout = Some(timeout)) + } + + def then(nextStateName: S): State = { + copy(stateName = nextStateName) + } - case class Event(event: Any, stateData: S) + def replying(replyValue:Any): State = { + self.sender.foreach(_ ! replyValue) + this + } + + def using(nextStateDate: D): State = { + copy(stateData = nextStateDate) + } + } - sealed trait StateEvent - object NextState extends StateEvent - object Reply extends StateEvent + sealed trait Reason + case object Normal extends Reason + case object Shutdown extends Reason + case class Failure(cause: Any) extends Reason - object StateTimeout + case object StateTimeout } diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index e4515bd3daf..496d9e9e013 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -13,34 +13,44 @@ import java.util.concurrent.TimeUnit object FSMActorSpec { - class Lock(code: String, - timeout: Int, - unlockedLatch: StandardLatch, - lockedLatch: StandardLatch) extends Actor with FSM[CodeState] { + val unlockedLatch = new StandardLatch + val lockedLatch = new StandardLatch + val unhandledLatch = new StandardLatch - def initialState = State(NextState, locked, CodeState("", code)) + class Lock(code: String, timeout: Int) extends Actor with FSM[String, CodeState] { - def locked: StateFunction = { + inState("locked") { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => - State(NextState, locked, CodeState(incomplete, code)) + stay using CodeState(incomplete, code) case codeTry if (codeTry == code) => { doUnlock - State(NextState, open, CodeState("", code), Some(timeout)) + goto("open") using CodeState("", code) until timeout } case wrong => { log.error("Wrong code %s", wrong) - State(NextState, locked, CodeState("", code)) + stay using CodeState("", code) } } } + case Event("hello", _) => stay replying "world" } - def open: StateFunction = { + inState("open") { case Event(StateTimeout, stateData) => { doLock - State(NextState, locked, stateData) + goto("locked") + } + } + + setInitialState("locked", CodeState("", code)) + + whenUnhandled { + case Event(_, stateData) => { + log.info("Unhandled") + unhandledLatch.open + stay } } @@ -63,11 +73,9 @@ class FSMActorSpec extends JUnitSuite { @Test def unlockTheLock = { - val unlockedLatch = new StandardLatch - val lockedLatch = new StandardLatch // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + val lock = Actor.actorOf(new Lock("33221", 1000)).start lock ! '3' lock ! '3' @@ -77,6 +85,21 @@ class FSMActorSpec extends JUnitSuite { assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + + lock ! "not_handled" + assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) + + val answerLatch = new StandardLatch + object Go + val tester = Actor.actorOf(new Actor { + protected def receive = { + case Go => lock ! "hello" + case "world" => answerLatch.open + + } + }).start + tester ! Go + assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 8348de21340..9ab27d4fbb4 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -20,27 +20,27 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Chopstick(name: String) extends Actor with FSM[TakenBy] { +class Chopstick(name: String) extends Actor with FSM[String, TakenBy] { self.id = name - // A chopstick begins its existence as available and taken by no one - def initialState = State(NextState, available, TakenBy(None)) - // When a chopstick is available, it can be taken by a some hakker - def available: StateFunction = { + inState("available") { case Event(Take, _) => - State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self))) + goto("taken") using TakenBy(self.sender) replying Taken(self) } // When a chopstick is taken by a hakker // It will refuse to be taken by other hakkers // But the owning hakker can put it back - def taken: StateFunction = { + inState("taken") { case Event(Take, currentState) => - State(Reply, taken, currentState, replyValue = Some(Busy(self))) + stay replying Busy(self) case Event(Put, TakenBy(hakker)) if self.sender == hakker => - State(NextState, available, TakenBy(None)) + goto("available") using TakenBy(None) } + + // A chopstick begins its existence as available and taken by no one + setInitialState("available", TakenBy(None)) } /** @@ -57,13 +57,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ -class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] { +class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[String, TakenChopsticks] { self.id = name - //All hakkers start waiting - def initialState = State(NextState, waiting, TakenChopsticks(None, None)) - - def waiting: StateFunction = { + inState("waiting") { case Event(Think, _) => log.info("%s starts to think", name) startThinking(5000) @@ -71,30 +68,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat - def thinking: StateFunction = { - case Event(StateTimeout, current) => + inState("thinking") { + case Event(StateTimeout, _) => left ! Take right ! Take - State(NextState, hungry, current) + goto("hungry") } // When a hakker is hungry it tries to pick up its chopsticks and eat // When it picks one up, it goes into wait for the other // If the hakkers first attempt at grabbing a chopstick fails, // it starts to wait for the response of the other grab - def hungry: StateFunction = { + inState("hungry") { case Event(Taken(`left`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None)) + goto("waitForOtherChopstick") using TakenChopsticks(Some(left), None) case Event(Taken(`right`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right))) - case Event(Busy(_), current) => - State(NextState, firstChopstickDenied, current) + goto("waitForOtherChopstick") using TakenChopsticks(None, Some(right)) + case Event(Busy(_), _) => + goto("firstChopstickDenied") } // When a hakker is waiting for the last chopstick it can either obtain it // and start eating, or the other chopstick was busy, and the hakker goes // back to think about how he should obtain his chopsticks :-) - def waitForOtherChopstick: StateFunction = { + inState("waitForOtherChopstick") { case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => @@ -105,13 +102,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit private def startEating(left: ActorRef, right: ActorRef): State = { log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) - State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000)) + goto("eating") using TakenChopsticks(Some(left), Some(right)) until 5000 } // When the results of the other grab comes back, // he needs to put it back if he got the other one. // Then go back and think and try to grab the chopsticks again - def firstChopstickDenied: StateFunction = { + inState("firstChopstickDenied") { case Event(Taken(secondChopstick), _) => secondChopstick ! Put startThinking(10) @@ -121,7 +118,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // When a hakker is eating, he can decide to start to think, // then he puts down his chopsticks and starts to think - def eating: StateFunction = { + inState("eating") { case Event(StateTimeout, _) => log.info("%s puts down his chopsticks and starts to think", name) left ! Put @@ -130,15 +127,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startThinking(period: Int): State = { - State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period)) + goto("thinking") using TakenChopsticks(None, None) until period } + + //All hakkers start waiting + setInitialState("waiting", TakenChopsticks(None, None)) } /* * Alright, here's our test-harness */ object DiningHakkersOnFSM { - def run { + def main(args: Array[String]) { + // Create 5 chopsticks val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick