Skip to content

Commit

Permalink
add retry method in IOActorSpecs until I find a better home
Browse files Browse the repository at this point in the history
  • Loading branch information
derekjw committed Jan 17, 2012
1 parent c0a3441 commit 6b8e8ea
Showing 1 changed file with 48 additions and 9 deletions.
57 changes: 48 additions & 9 deletions akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package akka.actor

import akka.util.ByteString
import akka.util.cps._
import akka.util.{ ByteString, Duration, Timer }
import akka.util.duration._
import scala.util.continuations._
import akka.testkit._
import akka.dispatch.{ Await, Future }
import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher }

object IOActorSpec {

Expand Down Expand Up @@ -47,8 +47,7 @@ object IOActorSpec {
socket write bytes
state flatMap { _
IO take bytes.length map (source ! _) recover {
case e: java.net.ConnectException
self.tell(bytes, source)
case e source ! Status.Failure(e)
}
}

Expand Down Expand Up @@ -208,12 +207,52 @@ object IOActorSpec {
class IOActorSpec extends AkkaSpec with DefaultTimeout {
import IOActorSpec._

/**
* Retries the future until a result is returned or until one of the limits are hit. If no
* limits are provided the future will be retried indefinitely until a result is returned.
*
* @param count number of retries
* @param timeout duration to retry within
* @param delay duration to wait before retrying
* @param filter determines which exceptions should be retried
* @return a future containing the result or the last exception before a limit was hit.
*/
def retry[T](count: Option[Int] = None, timeout: Option[Duration] = None, delay: Option[Duration] = None, filter: Option[Throwable Boolean] = None)(future: Future[T])(implicit executor: ExecutionContext): Future[T] = {

val promise = Promise[T]()(executor)

val timer = timeout match {
case Some(duration) Some(Timer(duration))
case None None
}

def check(n: Int, e: Throwable): Boolean =
(count.isEmpty || (n < count.get)) && (timer.isEmpty || timer.get.isTicking) && (filter.isEmpty || filter.get(e))

def run(n: Int) {
future onComplete {
case Left(e) if check(n, e)
if (delay.isDefined) {
executor match {
case m: MessageDispatcher m.prerequisites.scheduler.scheduleOnce(delay.get)(run(n + 1))
case _ // Thread.sleep, ignore, or other?
}
} else run(n + 1)
case v promise complete v
}
}

run(0)

promise
}

"an IO Actor" must {
"run echo server" in {
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8064)))
val f1 = client ? ByteString("Hello World!1")
val f2 = client ? ByteString("Hello World!2")
val f3 = client ? ByteString("Hello World!3")
val f1 = retry() { client ? ByteString("Hello World!1") }
val f2 = retry() { client ? ByteString("Hello World!2") }
val f3 = retry() { client ? ByteString("Hello World!3") }
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8064)))
Await.result(f1, TestLatch.DefaultTimeout) must equal(ByteString("Hello World!1"))
Await.result(f2, TestLatch.DefaultTimeout) must equal(ByteString("Hello World!2"))
Expand All @@ -223,7 +262,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
"run echo server under high load" in {
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8065)))
val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString))
val f = Future.traverse(list)(i retry() { client ? ByteString(i.toString) })
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8065)))
assert(Await.result(f, TestLatch.DefaultTimeout).size === 1000)
}
Expand Down

0 comments on commit 6b8e8ea

Please sign in to comment.