Skip to content

Commit

Permalink
refactoring the FSM part
Browse files Browse the repository at this point in the history
  • Loading branch information
imn committed Oct 26, 2010
1 parent 5c9fab8 commit ff81cfb
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 75 deletions.
133 changes: 100 additions & 33 deletions akka-actor/src/main/scala/actor/FSM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,125 @@

package se.scalablesolutions.akka.actor

import se.scalablesolutions.akka.stm.Ref
import se.scalablesolutions.akka.stm.local._

import scala.collection.mutable
import java.util.concurrent.{ScheduledFuture, TimeUnit}

trait FSM[S] { this: Actor =>
trait FSM[S, D] {
this: Actor =>

type StateFunction = scala.PartialFunction[Event, State]

var currentState: State = initialState
var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
private var currentState: State = _
private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None

private val transitions = mutable.Map[S, StateFunction]()

private def register(name: S, function: StateFunction) {
if (transitions contains name) {
transitions(name) = transitions(name) orElse function
} else {
transitions(name) = function
}
}

protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = {
setState(State(stateName, stateData, timeout))
}

def initialState: State
protected final def inState(stateName: S)(stateFunction: StateFunction) = {
register(stateName, stateFunction)
}

protected final def goto(nextStateName: S): State = {
State(nextStateName, currentState.stateData)
}

def handleEvent: StateFunction = {
case event@Event(value, stateData) =>
log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id)
State(NextState, currentState.stateFunction, stateData, currentState.timeout)
protected final def stay(): State = {
goto(currentState.stateName)
}

protected final def reply(replyValue: Any): State = {
self.sender.foreach(_ ! replyValue)
stay()
}

/**
* Stop
*/
protected final def stop(): State = {
stop(Normal)
}

protected final def stop(reason: Reason): State = {
stop(reason, currentState.stateData)
}

protected final def stop(reason: Reason, stateData: D): State = {
log.info("Stopped because of reason: %s", reason)
terminate(reason, currentState.stateName, stateData)
self.stop
State(currentState.stateName, stateData)
}

def terminate(reason: Reason, stateName: S, stateData: D) = ()

def whenUnhandled(stateFunction: StateFunction) = {
handleEvent = stateFunction
}

private var handleEvent: StateFunction = {
case Event(value, stateData) =>
log.warning("Event %s not handled in state %s - keeping current state with data %s", value, currentState.stateName, stateData)
currentState
}

override final protected def receive: Receive = {
case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => ()
// state timeout when new message in queue, skip this timeout
case value => {
timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None}

val event = Event(value, currentState.stateData)
val newState = (currentState.stateFunction orElse handleEvent).apply(event)

currentState = newState

newState match {
case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue)
case _ => () // ignore for now
val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event)
if (self.isRunning) {
setState(nextState)
}
}
}

newState.timeout.foreach {
timeout =>
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS))
}
private def setState(nextState: State) = {
if (!transitions.contains(nextState.stateName)) {
stop(Failure("Next state %s not available".format(nextState.stateName)))
} else {
currentState = nextState
currentState.timeout.foreach {t => timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))}
}
}

case class State(stateEvent: StateEvent,
stateFunction: StateFunction,
stateData: S,
timeout: Option[Int] = None,
replyValue: Option[Any] = None)
case class Event(event: Any, stateData: D)

case class State(stateName: S, stateData: D, timeout: Option[Long] = None) {
def until(timeout: Long): State = {
copy(timeout = Some(timeout))
}

def then(nextStateName: S): State = {
copy(stateName = nextStateName)
}

case class Event(event: Any, stateData: S)
def replying(replyValue:Any): State = {
self.sender.foreach(_ ! replyValue)
this
}

def using(nextStateDate: D): State = {
copy(stateData = nextStateDate)
}
}

sealed trait StateEvent
object NextState extends StateEvent
object Reply extends StateEvent
sealed trait Reason
case object Normal extends Reason
case object Shutdown extends Reason
case class Failure(cause: Any) extends Reason

object StateTimeout
case object StateTimeout
}
51 changes: 37 additions & 14 deletions akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,44 @@ import java.util.concurrent.TimeUnit

object FSMActorSpec {

class Lock(code: String,
timeout: Int,
unlockedLatch: StandardLatch,
lockedLatch: StandardLatch) extends Actor with FSM[CodeState] {
val unlockedLatch = new StandardLatch
val lockedLatch = new StandardLatch
val unhandledLatch = new StandardLatch

def initialState = State(NextState, locked, CodeState("", code))
class Lock(code: String, timeout: Int) extends Actor with FSM[String, CodeState] {

def locked: StateFunction = {
inState("locked") {
case Event(digit: Char, CodeState(soFar, code)) => {
soFar + digit match {
case incomplete if incomplete.length < code.length =>
State(NextState, locked, CodeState(incomplete, code))
stay using CodeState(incomplete, code)
case codeTry if (codeTry == code) => {
doUnlock
State(NextState, open, CodeState("", code), Some(timeout))
goto("open") using CodeState("", code) until timeout
}
case wrong => {
log.error("Wrong code %s", wrong)
State(NextState, locked, CodeState("", code))
stay using CodeState("", code)
}
}
}
case Event("hello", _) => stay replying "world"
}

def open: StateFunction = {
inState("open") {
case Event(StateTimeout, stateData) => {
doLock
State(NextState, locked, stateData)
goto("locked")
}
}

setInitialState("locked", CodeState("", code))

whenUnhandled {
case Event(_, stateData) => {
log.info("Unhandled")
unhandledLatch.open
stay
}
}

Expand All @@ -63,11 +73,9 @@ class FSMActorSpec extends JUnitSuite {

@Test
def unlockTheLock = {
val unlockedLatch = new StandardLatch
val lockedLatch = new StandardLatch

// lock that locked after being open for 1 sec
val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start
val lock = Actor.actorOf(new Lock("33221", 1000)).start

lock ! '3'
lock ! '3'
Expand All @@ -77,6 +85,21 @@ class FSMActorSpec extends JUnitSuite {

assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS))
assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS))

lock ! "not_handled"
assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS))

val answerLatch = new StandardLatch
object Go
val tester = Actor.actorOf(new Actor {
protected def receive = {
case Go => lock ! "hello"
case "world" => answerLatch.open

}
}).start
tester ! Go
assert(answerLatch.tryAwait(2, TimeUnit.SECONDS))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,27 @@ case class TakenBy(hakker: Option[ActorRef])
/*
* A chopstick is an actor, it can be taken, and put back
*/
class Chopstick(name: String) extends Actor with FSM[TakenBy] {
class Chopstick(name: String) extends Actor with FSM[String, TakenBy] {
self.id = name

// A chopstick begins its existence as available and taken by no one
def initialState = State(NextState, available, TakenBy(None))

// When a chopstick is available, it can be taken by a some hakker
def available: StateFunction = {
inState("available") {
case Event(Take, _) =>
State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self)))
goto("taken") using TakenBy(self.sender) replying Taken(self)
}

// When a chopstick is taken by a hakker
// It will refuse to be taken by other hakkers
// But the owning hakker can put it back
def taken: StateFunction = {
inState("taken") {
case Event(Take, currentState) =>
State(Reply, taken, currentState, replyValue = Some(Busy(self)))
stay replying Busy(self)
case Event(Put, TakenBy(hakker)) if self.sender == hakker =>
State(NextState, available, TakenBy(None))
goto("available") using TakenBy(None)
}

// A chopstick begins its existence as available and taken by no one
setInitialState("available", TakenBy(None))
}

/**
Expand All @@ -57,44 +57,41 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef])
/*
* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-)
*/
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] {
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[String, TakenChopsticks] {
self.id = name

//All hakkers start waiting
def initialState = State(NextState, waiting, TakenChopsticks(None, None))

def waiting: StateFunction = {
inState("waiting") {
case Event(Think, _) =>
log.info("%s starts to think", name)
startThinking(5000)
}

//When a hakker is thinking it can become hungry
//and try to pick up its chopsticks and eat
def thinking: StateFunction = {
case Event(StateTimeout, current) =>
inState("thinking") {
case Event(StateTimeout, _) =>
left ! Take
right ! Take
State(NextState, hungry, current)
goto("hungry")
}

// When a hakker is hungry it tries to pick up its chopsticks and eat
// When it picks one up, it goes into wait for the other
// If the hakkers first attempt at grabbing a chopstick fails,
// it starts to wait for the response of the other grab
def hungry: StateFunction = {
inState("hungry") {
case Event(Taken(`left`), _) =>
State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None))
goto("waitForOtherChopstick") using TakenChopsticks(Some(left), None)
case Event(Taken(`right`), _) =>
State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right)))
case Event(Busy(_), current) =>
State(NextState, firstChopstickDenied, current)
goto("waitForOtherChopstick") using TakenChopsticks(None, Some(right))
case Event(Busy(_), _) =>
goto("firstChopstickDenied")
}

// When a hakker is waiting for the last chopstick it can either obtain it
// and start eating, or the other chopstick was busy, and the hakker goes
// back to think about how he should obtain his chopsticks :-)
def waitForOtherChopstick: StateFunction = {
inState("waitForOtherChopstick") {
case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right)
case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right)
case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) =>
Expand All @@ -105,13 +102,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit

private def startEating(left: ActorRef, right: ActorRef): State = {
log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id)
State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000))
goto("eating") using TakenChopsticks(Some(left), Some(right)) until 5000
}

// When the results of the other grab comes back,
// he needs to put it back if he got the other one.
// Then go back and think and try to grab the chopsticks again
def firstChopstickDenied: StateFunction = {
inState("firstChopstickDenied") {
case Event(Taken(secondChopstick), _) =>
secondChopstick ! Put
startThinking(10)
Expand All @@ -121,7 +118,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit

// When a hakker is eating, he can decide to start to think,
// then he puts down his chopsticks and starts to think
def eating: StateFunction = {
inState("eating") {
case Event(StateTimeout, _) =>
log.info("%s puts down his chopsticks and starts to think", name)
left ! Put
Expand All @@ -130,15 +127,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
}

private def startThinking(period: Int): State = {
State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period))
goto("thinking") using TakenChopsticks(None, None) until period
}

//All hakkers start waiting
setInitialState("waiting", TakenChopsticks(None, None))
}

/*
* Alright, here's our test-harness
*/
object DiningHakkersOnFSM {
def run {
def main(args: Array[String]) {

// Create 5 chopsticks
val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start
// Create 5 awesome fsm hakkers and assign them their left and right chopstick
Expand Down

0 comments on commit ff81cfb

Please sign in to comment.