Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonas Bonér committed Oct 31, 2010
1 parent 46b84d7 commit 96c00f2
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 30 deletions.
41 changes: 21 additions & 20 deletions akka-actor/src/main/scala/actor/Agent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package akka.actor

import akka.stm.Ref
import akka.config.RemoteAddress
import akka.japi.{Function => JFunc, Procedure => JProc}
import akka.AkkaException
import akka.japi.{ Function => JFunc, Procedure => JProc }

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CountDownLatch
import akka.config.RemoteAddress

class AgentException private[akka](message: String) extends AkkaException(message)

Expand Down Expand Up @@ -105,10 +106,11 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] =

import Agent._
import Actor._

val dispatcher = remote match {
case Some(address) =>
val d = actorOf(new AgentDispatcher[T]())
d.makeRemote(remote.get.hostname,remote.get.port)
d.makeRemote(remote.get.hostname, remote.get.port)
d.start
d ! Value(initialValue)
d
Expand All @@ -127,7 +129,7 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] =
if (dispatcher.isTransactionInScope) throw new AgentException(
"Can't call Agent.get within an enclosing transaction."+
"\n\tWould block indefinitely.\n\tPlease refactor your code.")
val f = (dispatcher.!!![T](Read,java.lang.Long.MAX_VALUE)).await
val f = (dispatcher.!!![T](Read, java.lang.Long.MAX_VALUE)).await
if (f.exception.isDefined) throw f.exception.get
else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out"))
}
Expand Down Expand Up @@ -187,19 +189,19 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] =
* Does not change the value of the agent (this).
* Java API
*/
final def sendProc(f: JProc[T]): Unit = dispatcher ! Procedure((t:T) => f(t))
final def sendProc(f: JProc[T]): Unit = dispatcher ! Procedure((t: T) => f(t))

/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote)
final def map[B](f: (T) => B): Agent[B] = Agent(f(get), remote)

/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
*/
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote)
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(), remote)

/**
* Applies function with type 'T => B' to the agent's internal state.
Expand All @@ -212,14 +214,14 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] =
* Does not change the value of the agent (this).
* Java API
*/
final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote)
final def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get), remote)

/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
* Does not change the value of the agent (this).
* Java API
*/
final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote)
final def flatMap[B](f: JFunc[T, Agent[B]]): Agent[B] = Agent(f(get)(), remote)

/**
* Applies procedure with type T to the agent's internal state.
Expand All @@ -244,7 +246,8 @@ sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] =
*/
object Agent {
import Actor._
/*

/**
* The internal messages for passing around requests.
*/
private[akka] case class Value[T](value: T)
Expand All @@ -256,20 +259,20 @@ object Agent {
* Creates a new Agent of type T with the initial value of value.
*/
def apply[T](value: T): Agent[T] =
apply(value,None)
apply(value, None)

/**
* Creates an Agent backed by a client managed Actor if Some(remoteAddress)
* or a local agent if None
*/
def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] =
new Agent[T](value,remoteAddress)
new Agent[T](value, remoteAddress)

/**
* Creates an Agent backed by a client managed Actor
*/
def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] =
apply(value,Some(remoteAddress))
apply(value, Some(remoteAddress))
}

/**
Expand All @@ -291,13 +294,11 @@ final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor {
* Periodically handles incoming messages.
*/
def receive = {
case Value(v: T) =>
swap(v)
case Read => self.reply_?(value.get())
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>
proc(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null")))
case Value(v: T) => swap(v)
case Read => self.reply_?(value.get())
case Function(fun: (T => T)) => swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) => proc(value.getOrElse(
throw new AgentException("Could not read Agent's value; value is null")))
}

/**
Expand Down
4 changes: 3 additions & 1 deletion akka-actor/src/main/scala/actor/UntypedActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package akka.actor
import akka.dispatch._
import akka.stm.global._
import akka.config.Supervision._
import akka.japi.Procedure

import java.net.InetSocketAddress

import scala.reflect.BeanProperty
import akka.japi.Procedure

/**
* Subclass this abstract class to create a MDB-style untyped actor.
Expand Down Expand Up @@ -62,6 +62,7 @@ import akka.japi.Procedure
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class UntypedActor extends Actor {

def getContext(): ActorRef = self

final protected def receive = {
Expand Down Expand Up @@ -123,6 +124,7 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object UntypedActor {

/**
* Creates an ActorRef out of the Actor type represented by the class provided.
* Example in Java:
Expand Down
21 changes: 15 additions & 6 deletions akka-actor/src/main/scala/dispatch/Future.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package akka.dispatch

import akka.AkkaException
import akka.actor.Actor.spawn
import akka.routing.Dispatcher

import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import akka.routing.Dispatcher

class FutureTimeoutException(message: String) extends AkkaException(message)

Expand All @@ -26,12 +27,10 @@ object Futures {
dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
(body: => T): Future[T] = {
val f = new DefaultCompletableFuture[T](timeout)

spawn({
try { f completeWithResult body }
catch { case e => f completeWithException e}
})(dispatcher)

f
}

Expand All @@ -45,8 +44,7 @@ object Futures {
var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
if (sleepMs > 0 && future.isEmpty)
Thread.sleep(sleepMs)
if (sleepMs > 0 && future.isEmpty) Thread.sleep(sleepMs)
} while (future.isEmpty)
future.get
}
Expand Down Expand Up @@ -89,12 +87,19 @@ object Futures {

sealed trait Future[T] {
def await : Future[T]

def awaitBlocking : Future[T]

def isCompleted: Boolean

def isExpired: Boolean

def timeoutInNanos: Long

def result: Option[T]

def exception: Option[Throwable]

def map[O](f: (T) => O): Future[O] = {
val wrapped = this
new Future[O] {
Expand All @@ -111,12 +116,14 @@ sealed trait Future[T] {

trait CompletableFuture[T] extends Future[T] {
def completeWithResult(result: T)

def completeWithException(exception: Throwable)
}

// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
import TimeUnit.{MILLISECONDS => TIME_UNIT}

def this() = this(0)

val timeoutInNanos = TIME_UNIT.toNanos(timeout)
Expand Down Expand Up @@ -207,7 +214,9 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
_lock.unlock
}

private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
protected def onComplete(result: T) {}

protected def onCompleteException(exception: Throwable) {}

private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
}
5 changes: 2 additions & 3 deletions akka-remote/src/main/scala/remote/RemoteServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}

import akka.actor.{
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage}
import akka.actor.Actor._
import akka.actor.{Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage}
import akka.util._
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.Config._
import akka.config.ConfigurationException
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
Expand All @@ -31,7 +31,6 @@ import org.jboss.netty.handler.ssl.SslHandler

import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import akka.config.ConfigurationException

/**
* Use this object if you need a single remote server on a specific node.
Expand Down

0 comments on commit 96c00f2

Please sign in to comment.