Skip to content

Commit

Permalink
Merging in master
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorklang committed Jul 4, 2012
2 parents 6fb06ee + eec6ba0 commit 3911b18
Show file tree
Hide file tree
Showing 25 changed files with 285 additions and 239 deletions.
29 changes: 1 addition & 28 deletions akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -803,33 +803,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(Await.result(z, timeout.duration) === 42)
}

"futureFlowLoops" in {
import Future.flow
import akka.util.cps._

val count = 1000

val promises = List.fill(count)(Promise[Int]())

flow {
var i = 0
val iter = promises.iterator
whileC(iter.hasNext) {
iter.next << i
i += 1
}
}

var i = 0
promises foreach { p
assert(Await.result(p, timeout.duration) === i)
i += 1
}

assert(i === count)

}

"run callbacks async" in {
val latch = Vector.fill(10)(new TestLatch)

Expand Down Expand Up @@ -1038,4 +1011,4 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}

def checkType[A: Manifest, B](in: Future[A], refmanifest: Manifest[B]): Boolean = manifest[A] == refmanifest
}
}
31 changes: 13 additions & 18 deletions akka-actor/src/main/scala/akka/actor/ActorCell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private[akka] object ActorCell {
def cancel() {}
}

final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable)
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, emptyCancellable)

final val emptyBehaviorStack: List[Actor.Receive] = Nil

Expand Down Expand Up @@ -448,29 +448,24 @@ private[akka] class ActorCell(

final def provider = system.provider

override final def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None
override final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match {
case Duration.Undefined None
case duration Some(duration)
}

override final def setReceiveTimeout(timeout: Duration): Unit = setReceiveTimeout(Some(timeout))
final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined))

final def setReceiveTimeout(timeout: Option[Duration]): Unit = {
val timeoutMs = timeout match {
case None -1L
case Some(duration)
val ms = duration.toMillis
if (ms <= 0) -1L
// 1 millisecond is minimum supported
else if (ms < 1) 1L
else ms
}
receiveTimeoutData = (timeoutMs, receiveTimeoutData._2)
}
override final def setReceiveTimeout(timeout: Duration): Unit =
receiveTimeoutData = (
if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout,
receiveTimeoutData._2)

final override def resetReceiveTimeout(): Unit = setReceiveTimeout(None)

/**
* In milliseconds
*/
var receiveTimeoutData: (Long, Cancellable) = emptyReceiveTimeoutData
var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData

@volatile
private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer
Expand Down Expand Up @@ -1014,10 +1009,10 @@ private[akka] class ActorCell(

final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
if (recvtimeout._1 > 0 && !mailbox.hasMessages) {
if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) {
recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout))
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, ReceiveTimeout))
} else cancelReceiveTimeout()

}
Expand Down
85 changes: 0 additions & 85 deletions akka-actor/src/main/scala/akka/util/cps/package.scala

This file was deleted.

22 changes: 12 additions & 10 deletions akka-agent/src/test/scala/akka/agent/AgentSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package akka.agent

import language.postfixOps

import scala.concurrent.Await
import scala.concurrent.{ Await, Future }
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import akka.util.Timeout
import akka.testkit._
import scala.concurrent.stm._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.{ CountDownLatch, TimeUnit }

class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
val latch = new CountDownLatch(num)
Expand Down Expand Up @@ -38,31 +38,33 @@ class AgentSpec extends AkkaSpec {

"maintain order between send and sendOff" in {
val countDown = new CountDownFunction[String]
val l1, l2 = new CountDownLatch(1)

val agent = Agent("a")
agent send (_ + "b")
val longRunning = (s: String) { Thread.sleep(2000); s + "c" }
agent sendOff longRunning
agent.sendOff((s: String) { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })
l1.await(5, TimeUnit.SECONDS)
agent send (_ + "d")
agent send countDown

l2.countDown
countDown.await(5 seconds)
agent() must be("abcd")

agent.close()
}

"maintain order between alter and alterOff" in {

val l1, l2 = new CountDownLatch(1)
val agent = Agent("a")

val r1 = agent.alter(_ + "b")(5000)
val r2 = agent.alterOff((s: String) { Thread.sleep(2000); s + "c" })(5000)
val r2 = agent.alterOff((s: String) { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })(5000)
l1.await(5, TimeUnit.SECONDS)
val r3 = agent.alter(_ + "d")(5000)
val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":"))
l2.countDown

Await.result(r1, 5 seconds) must be === "ab"
Await.result(r2, 5 seconds) must be === "abc"
Await.result(r3, 5 seconds) must be === "abcd"
Await.result(result, 5 seconds) must be === "ab:abc:abcd"

agent() must be("abcd")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSy
"register a started SendProcessor for the producer, which is stopped when the actor is stopped" in {
val actorRef = newEmptyActor
val processor = registerProcessorFor(actorRef)
camel.awaitActivation(actorRef, 1 second)
camel.awaitActivation(actorRef, 5 second)
processor.isStarted must be(true)
system.stop(actorRef)
camel.awaitDeactivation(actorRef, 1 second)
camel.awaitDeactivation(actorRef, 5 second)
(processor.isStopping || processor.isStopped) must be(true)
}
"remove and stop the SendProcessor if the actorRef is registered" in {
Expand Down
2 changes: 1 addition & 1 deletion akka-cluster/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ akka {
# Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
# Using auto-down implies that two separate clusters will automatically be formed in case of
# network partition.
auto-down = on
auto-down = off

# the number of gossip daemon actors
nr-of-gossip-daemons = 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
unreachable-nodes-reaper-interval = 30 s
}""")
# turn off unreachable reaper
akka.cluster.unreachable-nodes-reaper-interval = 300 s""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}

Expand All @@ -35,7 +33,7 @@ abstract class LeaderLeavingSpec

import LeaderLeavingMultiJvmSpec._

val leaderHandoffWaitingTime = 30.seconds.dilated
val leaderHandoffWaitingTime = 30.seconds

"A LEADER that is LEAVING" must {

Expand All @@ -45,41 +43,60 @@ abstract class LeaderLeavingSpec

val oldLeaderAddress = cluster.leader

if (cluster.isLeader) {
within(leaderHandoffWaitingTime) {

cluster.leave(oldLeaderAddress)
enterBarrier("leader-left")
if (cluster.isLeader) {

// verify that a NEW LEADER have taken over
awaitCond(!cluster.isLeader)
enterBarrier("registered-listener")

// verify that the LEADER is shut down
awaitCond(!cluster.isRunning, 30.seconds.dilated)
cluster.leave(oldLeaderAddress)
enterBarrier("leader-left")

// verify that the LEADER is REMOVED
awaitCond(cluster.status == MemberStatus.Removed)
// verify that a NEW LEADER have taken over
awaitCond(!cluster.isLeader)

} else {
// verify that the LEADER is shut down
awaitCond(!cluster.isRunning)

enterBarrier("leader-left")
// verify that the LEADER is REMOVED
awaitCond(cluster.status == MemberStatus.Removed)

// verify that the LEADER is LEAVING
awaitCond(cluster.latestGossip.members.exists(m m.status == MemberStatus.Leaving && m.address == oldLeaderAddress), leaderHandoffWaitingTime) // wait on LEAVING
} else {

// verify that the LEADER is EXITING
awaitCond(cluster.latestGossip.members.exists(m m.status == MemberStatus.Exiting && m.address == oldLeaderAddress), leaderHandoffWaitingTime) // wait on EXITING
val leavingLatch = TestLatch()
val exitingLatch = TestLatch()
val expectedAddresses = roles.toSet map address
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
def check(status: MemberStatus): Boolean =
(members.map(_.address) == expectedAddresses &&
members.exists(m m.address == oldLeaderAddress && m.status == status))
if (check(MemberStatus.Leaving)) leavingLatch.countDown()
if (check(MemberStatus.Exiting)) exitingLatch.countDown()
}
})
enterBarrier("registered-listener")

// verify that the LEADER is no longer part of the 'members' set
awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress), leaderHandoffWaitingTime)
enterBarrier("leader-left")

// verify that the LEADER is not part of the 'unreachable' set
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress), leaderHandoffWaitingTime)
// verify that the LEADER is LEAVING
leavingLatch.await

// verify that we have a new LEADER
awaitCond(cluster.leader != oldLeaderAddress, leaderHandoffWaitingTime)
}
// verify that the LEADER is EXITING
exitingLatch.await

// verify that the LEADER is no longer part of the 'members' set
awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress))

// verify that the LEADER is not part of the 'unreachable' set
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress))

enterBarrier("finished")
// verify that we have a new LEADER
awaitCond(cluster.leader != oldLeaderAddress)
}

enterBarrier("finished")
}
}
}
}
Loading

0 comments on commit 3911b18

Please sign in to comment.