Skip to content

Commit

Permalink
Dilate ask timeout in ConcurrentActivationTest, see #3413
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed May 29, 2013
1 parent 003588e commit 5cda24d
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package akka.camel

import language.postfixOps

import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import scala.concurrent.{ Promise, Await, Future }
Expand All @@ -17,6 +16,7 @@ import akka.testkit._
import akka.util.Timeout
import org.apache.camel.model.RouteDefinition
import org.apache.camel.builder.Builder
import akka.actor.ActorLogging

/**
* A test to concurrently register and de-register consumer and producer endpoints
Expand All @@ -25,8 +25,6 @@ class ConcurrentActivationTest extends WordSpec with MustMatchers with NonShared

"Activation" must {
"support concurrent registrations and de-registrations" in {
implicit val timeout = Timeout((10 seconds).dilated)
val timeoutDuration = timeout.duration
implicit val ec = system.dispatcher
val number = 10
val eventFilter = EventFilter.warning(pattern = "received dead letter from .*producerRegistrar.*")
Expand Down Expand Up @@ -58,7 +56,7 @@ class ConcurrentActivationTest extends WordSpec with MustMatchers with NonShared
promiseAllRefs.success((activations.flatten, deactivations.flatten))
}
}
val (activations, deactivations) = Await.result(allRefsFuture, timeoutDuration)
val (activations, deactivations) = Await.result(allRefsFuture, 10.seconds.dilated)
// must be the size of the activated activated producers and consumers
activations.size must be(2 * number * number)
// must be the size of the activated activated producers and consumers
Expand Down Expand Up @@ -112,14 +110,14 @@ case class Activations()
case class DeActivations()

class Registrar(val start: Int, val number: Int, activationsPromise: Promise[List[ActorRef]],
deActivationsPromise: Promise[List[ActorRef]]) extends Actor {
deActivationsPromise: Promise[List[ActorRef]]) extends Actor with ActorLogging {
private var actorRefs = Set[ActorRef]()
private var activations = Set[Future[ActorRef]]()
private var deActivations = Set[Future[ActorRef]]()
private var index = 0
private val camel = CamelExtension(context.system)
private implicit val ec = context.dispatcher
private implicit val timeout = Timeout(10 seconds)
private implicit val timeout = Timeout(10.seconds.dilated(context.system))

def receive = {
case reg: RegisterConsumersAndProducers
Expand All @@ -134,7 +132,11 @@ class Registrar(val start: Int, val number: Int, activationsPromise: Promise[Lis
case reg: DeRegisterConsumersAndProducers
actorRefs.foreach { aref
context.stop(aref)
deActivations = deActivations + camel.deactivationFutureFor(aref)
val result = camel.deactivationFutureFor(aref)
result.onFailure {
case e log.error("deactivationFutureFor {} failed: {}", aref, e.getMessage)
}
deActivations += result
if (deActivations.size == number * 2) {
Future.sequence(deActivations.toList) map deActivationsPromise.success
}
Expand All @@ -144,7 +146,11 @@ class Registrar(val start: Int, val number: Int, activationsPromise: Promise[Lis
def add(actor: Actor, name: String) {
val ref = context.actorOf(Props(actor), name)
actorRefs = actorRefs + ref
activations = activations + camel.activationFutureFor(ref)
val result = camel.activationFutureFor(ref)
result.onFailure {
case e log.error("activationFutureFor {} failed: {}", ref, e.getMessage)
}
activations += result
}
}

Expand Down

0 comments on commit 5cda24d

Please sign in to comment.