Skip to content

Commit

Permalink
unb0rk akka-actor-nightly build
Browse files Browse the repository at this point in the history
This means tightening types from Duration to FiniteDuration in several
places; a good thing, since we replace runtime complaints by compile
time errors.
  • Loading branch information
rkuhn committed Sep 14, 2012
1 parent 4afd41f commit 251a622
Show file tree
Hide file tree
Showing 34 changed files with 183 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package akka.actor

import language.postfixOps

import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.Timeout
import scala.concurrent.{ Await, Future, Promise }
Expand All @@ -21,6 +20,7 @@ import akka.serialization.JavaSerializer
import akka.actor.TypedActor._
import java.lang.IllegalStateException
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
import scala.concurrent.util.FiniteDuration

object TypedActorSpec {

Expand Down Expand Up @@ -203,10 +203,10 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)

def newFooBar: Foo = newFooBar(Duration(2, "s"))

def newFooBar(d: Duration): Foo =
def newFooBar(d: FiniteDuration): Foo =
TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)))

def newFooBar(dispatcher: String, d: Duration): Foo =
def newFooBar(dispatcher: String, d: FiniteDuration): Foo =
TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)).withDispatcher(dispatcher))

def newStacked(): Stacked =
Expand Down
10 changes: 0 additions & 10 deletions akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,6 @@ class AskSpec extends AkkaSpec {
}.getMessage must be === expectedMsg
}

"return broken promises on infinite timeout" in {
implicit val timeout = Timeout.never
val echo = system.actorOf(Props(new Actor { def receive = { case x sender ! x } }))
val f = echo ? "foo"
val expectedMsg = "Timeouts to `ask` must be finite. Question not sent to [%s]" format echo
intercept[IllegalArgumentException] {
Await.result(f, remaining)
}.getMessage must be === expectedMsg
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package akka.routing

import language.postfixOps

import akka.actor.Actor
import akka.testkit._
import akka.actor.Props
Expand All @@ -15,6 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.pattern.ask
import scala.concurrent.util.Duration
import java.util.concurrent.TimeoutException
import scala.concurrent.util.FiniteDuration

object ResizerSpec {

Expand Down Expand Up @@ -174,8 +174,8 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with

val router = system.actorOf(Props(new Actor {
def receive = {
case d: Duration Thread.sleep(d.dilated.toMillis); sender ! "done"
case "echo" sender ! "reply"
case d: FiniteDuration Thread.sleep(d.dilated.toMillis); sender ! "done"
case "echo" sender ! "reply"
}
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))

Expand All @@ -190,7 +190,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with

routees(router) must be(3)

def loop(loops: Int, d: Duration) = {
def loop(loops: Int, d: FiniteDuration) = {
for (m 0 until loops) router ! d
for (m 0 until loops) expectMsg(d * 3, "done")
}
Expand Down
4 changes: 2 additions & 2 deletions akka-actor/src/main/scala/akka/actor/TypedActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package akka.actor

import language.existentials

import akka.japi.{ Creator, Option JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.Timeout
Expand All @@ -20,6 +19,7 @@ import scala.reflect.ClassTag
import akka.serialization.{ JavaSerializer, SerializationExtension }
import java.io.ObjectStreamException
import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.FiniteDuration

/**
* A TypedActorFactory is something that can created TypedActor instances.
Expand Down Expand Up @@ -421,7 +421,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
/**
* INTERNAL USE ONLY
*/
private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: Duration) {
private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) {
@throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match {
case null throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value")
case some toTypedActorInvocationHandler(some)
Expand Down
4 changes: 2 additions & 2 deletions akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ trait Inbox { this: ActorDSL.type ⇒
* this method within an actor!</b>
*/
def receive(timeout: FiniteDuration = defaultTimeout): Any = {
implicit val t = Timeout(timeout + extraTime)
implicit val t = Timeout((timeout + extraTime).asInstanceOf[FiniteDuration])
Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf)
}

Expand All @@ -186,7 +186,7 @@ trait Inbox { this: ActorDSL.type ⇒
* this method within an actor!</b>
*/
def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = {
implicit val t = Timeout(timeout + extraTime)
implicit val t = Timeout((timeout + extraTime).asInstanceOf[FiniteDuration])
predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf))
}

Expand Down
3 changes: 1 addition & 2 deletions akka-actor/src/main/scala/akka/pattern/AskSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ trait AskSupport {
actorRef.tell(message)
Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef))
case ref: InternalActorRef
if (!timeout.duration.isFinite) Future.failed[Any](new IllegalArgumentException("Timeouts to `ask` must be finite. Question not sent to [%s]" format actorRef))
else if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef))
if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef))
else {
val provider = ref.provider
val a = PromiseActorRef(provider, timeout)
Expand Down
20 changes: 10 additions & 10 deletions akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.util.Unsafe
import scala.util.control.NoStackTrace
import java.util.concurrent.{ Callable, CopyOnWriteArrayList }
import scala.concurrent.{ ExecutionContext, Future, Promise, Await }
import scala.concurrent.util.{ Duration, Deadline }
import scala.concurrent.util.{ FiniteDuration, Deadline }
import scala.concurrent.util.duration._
import scala.util.control.NonFatal
import scala.util.Success
Expand Down Expand Up @@ -38,8 +38,8 @@ object CircuitBreaker {
* @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
*/
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker =
new CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(syncExecutionContext)
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(syncExecutionContext)

/**
* Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed
Expand All @@ -52,8 +52,8 @@ object CircuitBreaker {
* @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
*/
def create(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker =
apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)
def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
apply(scheduler, maxFailures, callTimeout, resetTimeout)
}

/**
Expand All @@ -76,9 +76,9 @@ object CircuitBreaker {
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
* @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
*/
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {

def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) = {
def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = {
this(scheduler, maxFailures, callTimeout, resetTimeout)(executor)
}

Expand Down Expand Up @@ -409,7 +409,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] =
if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)).future
if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(0.seconds)).future

/**
* Reset breaker on successful call.
Expand Down Expand Up @@ -453,7 +453,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] =
Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)).future
Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft.asInstanceOf[FiniteDuration])).future

/**
* Calculate remaining timeout to inform the caller in case a backoff algorithm is useful
Expand Down Expand Up @@ -510,6 +510,6 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @param message Defaults to "Circuit Breaker is open; calls are failing fast"
*/
class CircuitBreakerOpenException(
val remainingDuration: Duration,
val remainingDuration: FiniteDuration,
message: String = "Circuit Breaker is open; calls are failing fast")
extends AkkaException(message) with NoStackTrace
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.dispatch.{ Unwatch, Watch }
import scala.concurrent.Future
import scala.concurrent.util.Duration
import scala.util.Success
import scala.concurrent.util.FiniteDuration

trait GracefulStopSupport {
/**
Expand All @@ -36,7 +37,7 @@ trait GracefulStopSupport {
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
def gracefulStop(target: ActorRef, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) Future successful true
else system match {
case e: ExtendedActorSystem
Expand Down
3 changes: 2 additions & 1 deletion akka-actor/src/main/scala/akka/pattern/Patterns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.pattern
import akka.actor.Scheduler
import scala.concurrent.ExecutionContext
import java.util.concurrent.Callable
import scala.concurrent.util.FiniteDuration

object Patterns {
import akka.actor.{ ActorRef, ActorSystem }
Expand Down Expand Up @@ -103,7 +104,7 @@ object Patterns {
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] =
def gracefulStop(target: ActorRef, timeout: FiniteDuration, system: ActorSystem): Future[java.lang.Boolean] =
scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]

/**
Expand Down
16 changes: 8 additions & 8 deletions akka-actor/src/main/scala/akka/routing/Routing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package akka.routing

import language.implicitConversions
import language.postfixOps

import akka.actor._
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
Expand All @@ -19,6 +18,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.dispatch.Dispatchers
import scala.annotation.tailrec
import concurrent.ExecutionContext
import scala.concurrent.util.FiniteDuration

/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
Expand Down Expand Up @@ -1098,13 +1098,13 @@ object ScatterGatherFirstCompletedRouter {
/**
* Creates a new ScatterGatherFirstCompletedRouter, routing to the specified routees, timing out after the specified Duration
*/
def apply(routees: Iterable[ActorRef], within: Duration): ScatterGatherFirstCompletedRouter =
def apply(routees: Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter =
new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within)

/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef], within: Duration): ScatterGatherFirstCompletedRouter = {
def create(routees: java.lang.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter = {
import scala.collection.JavaConverters._
apply(routees.asScala, within)
}
Expand Down Expand Up @@ -1153,7 +1153,7 @@ object ScatterGatherFirstCompletedRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: FiniteDuration,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
Expand All @@ -1166,22 +1166,22 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int, w: Duration) = this(nrOfInstances = nr, within = w)
def this(nr: Int, w: FiniteDuration) = this(nrOfInstances = nr, within = w)

/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String], w: Duration) =
def this(routeePaths: java.lang.Iterable[String], w: FiniteDuration) =
this(routees = iterableAsScalaIterable(routeePaths), within = w)

/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w)
def this(resizer: Resizer, w: FiniteDuration) = this(resizer = Some(resizer), within = w)

/**
* Java API for setting routerDispatcher
Expand Down Expand Up @@ -1211,7 +1211,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒

def routees: Iterable[String]

def within: Duration
def within: FiniteDuration

def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
Expand Down
13 changes: 3 additions & 10 deletions akka-actor/src/main/scala/akka/util/Timeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import language.implicitConversions

import java.util.concurrent.TimeUnit
import java.lang.{ Double JDouble }
import scala.concurrent.util.Duration
import scala.concurrent.util.{ Duration, FiniteDuration }

@SerialVersionUID(1L)
case class Timeout(duration: Duration) {
case class Timeout(duration: FiniteDuration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
Expand All @@ -26,17 +26,10 @@ object Timeout {
*/
val zero: Timeout = new Timeout(Duration.Zero)

/**
* A Timeout with infinite duration. Will never timeout. Use extreme caution with this
* as it may cause memory leaks, blocked threads, or may not even be supported by
* the receiver, which would result in an exception.
*/
val never: Timeout = new Timeout(Duration.Inf)

def apply(timeout: Long): Timeout = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit): Timeout = new Timeout(length, unit)

implicit def durationToTimeout(duration: Duration): Timeout = new Timeout(duration)
implicit def durationToTimeout(duration: FiniteDuration): Timeout = new Timeout(duration)
implicit def intToTimeout(timeout: Int): Timeout = new Timeout(timeout)
implicit def longToTimeout(timeout: Long): Timeout = new Timeout(timeout)
}
5 changes: 3 additions & 2 deletions akka-agent/src/main/scala/akka/agent/Agent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.util.Timeout
import scala.concurrent.stm._
import concurrent.{ ExecutionContext, Future, Promise, Await }
import concurrent.util.Duration
import scala.concurrent.util.FiniteDuration

/**
* Used internally to send functions.
Expand Down Expand Up @@ -240,7 +241,7 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
* Dispatch a function to update the internal state, and return a Future where that new state can be obtained
* within the given timeout
*/
def alter(f: JFunc[T, T], timeout: Duration): Future[T] = alter(x f(x))(timeout)
def alter(f: JFunc[T, T], timeout: FiniteDuration): Future[T] = alter(x f(x))(timeout)

/**
* Java API:
Expand All @@ -259,7 +260,7 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
*/
def alterOff(f: JFunc[T, T], timeout: Duration, ec: ExecutionContext): Unit = alterOff(x f(x))(Timeout(timeout), ec)
def alterOff(f: JFunc[T, T], timeout: FiniteDuration, ec: ExecutionContext): Unit = alterOff(x f(x))(Timeout(timeout), ec)

/**
* Java API:
Expand Down
5 changes: 3 additions & 2 deletions akka-camel/src/main/scala/akka/camel/Activation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.pattern._
import scala.concurrent.util.Duration
import concurrent.{ ExecutionContext, Future }
import scala.concurrent.util.FiniteDuration

/**
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
Expand All @@ -27,7 +28,7 @@ trait Activation {
* @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future
*/
def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] =
def activationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointActivated(`endpoint`) endpoint
case EndpointFailedToActivate(`endpoint`, cause) throw cause
Expand All @@ -40,7 +41,7 @@ trait Activation {
* @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future
*/
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] =
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointDeActivated(`endpoint`) endpoint
case EndpointFailedToDeActivate(`endpoint`, cause) throw cause
Expand Down
Loading

0 comments on commit 251a622

Please sign in to comment.