Skip to content

Commit

Permalink
Merge pull request akka#1002 from akka/wip-2877-throttler-ack-patriknw
Browse files Browse the repository at this point in the history
Propagate statusPromise down to the actual change in throttler, see #2877
  • Loading branch information
patriknw committed Jan 8, 2013
2 parents 554e339 + 9c1a00d commit 5d53ec0
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 22 deletions.
3 changes: 3 additions & 0 deletions akka-remote/src/main/scala/akka/remote/Endpoint.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote

import akka.{ OnlyCauseStackTrace, AkkaException }
Expand Down
3 changes: 3 additions & 0 deletions akka-remote/src/main/scala/akka/remote/FailureDetector.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote

import java.util.concurrent.TimeUnit._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote

import akka.remote.FailureDetector.Clock
Expand Down
10 changes: 9 additions & 1 deletion akka-remote/src/main/scala/akka/remote/Remoting.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote

import scala.language.postfixOps
Expand Down Expand Up @@ -400,7 +403,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends

val accepting: Receive = {
case ManagementCommand(cmd, statusPromise)
transportMapping.values foreach { _.managementCommand(cmd, statusPromise) }
val allStatuses = transportMapping.values map { transport
val p = Promise[Boolean]()
transport.managementCommand(cmd, p)
p.future
}
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)

case s @ Send(message, senderOption, recipientRef)
val recipientAddress = recipientRef.path.address
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote

import akka.event.{ LoggingAdapter, Logging }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport

import scala.language.postfixOps
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport

import akka.AkkaException
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport

import akka.{ OnlyCauseStackTrace, AkkaException }
Expand Down Expand Up @@ -375,7 +378,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Disassociate
stop()

case Heartbeat failureDetector.heartbeat(); stay()
case Heartbeat
failureDetector.heartbeat(); stay()

case Payload(payload) stateData match {
case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport

import FailureInjectorTransportAdapter._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport

import TestTransport._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport

import ThrottlerTransportAdapter._
import akka.actor._
import akka.pattern.pipe
import akka.remote.EndpointManager.ManagementCommand
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenerRegistered
Expand All @@ -17,6 +21,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.math.min
import scala.util.{ Success, Failure }
Expand Down Expand Up @@ -107,10 +112,8 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
protected def managerProps = Props(new ThrottlerManager(wrappedTransport))

override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
case s @ SetThrottle(_, _, _)
manager ! s
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
case s: SetThrottle manager ! ManagementCommand(s, statusPromise)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
}
}

Expand Down Expand Up @@ -164,17 +167,22 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
handleTable ::= nakedAddress(naked) -> wrappedHandle
statusPromise.success(wrappedHandle)
case s @ SetThrottle(address, direction, mode)
case ManagementCommand(SetThrottle(address, direction, mode), statusPromise)
val naked = nakedAddress(address)
throttlingModes += naked -> (mode, direction)
handleTable.foreach {
case (addr, handle)
if (addr == naked) setMode(handle, mode, direction)
val allStatuses = handleTable.map {
case (`naked`, handle)
val p = Promise[Boolean]()
setMode(handle, mode, direction, p)
p.future
case _ Future.successful(true)
}
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)

case Checkin(origin, handle)
val naked: Address = nakedAddress(origin)
handleTable ::= naked -> handle
setMode(naked, handle)
setMode(naked, handle, Promise[Boolean]())

}

Expand All @@ -192,16 +200,19 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
}
}

private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = {
private def setMode(nakedAddress: Address, handle: ThrottlerHandle, statusPromise: Promise[Boolean]): Unit = {
throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) setMode(handle, mode, direction)
case None setMode(handle, Unthrottled, Direction.Both)
case Some((mode, direction)) setMode(handle, mode, direction, statusPromise)
case None setMode(handle, Unthrottled, Direction.Both, statusPromise)
}
}

private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = {
if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode
if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode)
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction,
statusPromise: Promise[Boolean]): Unit = {
if (direction.includes(Direction.Send))
handle.outboundThrottleMode.set(mode)
if (direction.includes(Direction.Receive))
handle.throttlerActor ! ManagementCommand(mode, statusPromise)
}

private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = {
Expand Down Expand Up @@ -286,17 +297,17 @@ private[transport] class ThrottledAssociation(
case Event(InboundPayload(p), _)
throttledMessages = throttledMessages enqueue p
stay()
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), ExposedHandle(exposedHandle))
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) {
try if (mode == Blackhole) {
throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate()
stop()
} else {
associationHandler notify InboundAssociation(exposedHandle)
exposedHandle.readHandlerPromise.future pipeTo self
goto(WaitUpstreamListener)
}
} finally statusPromise.success(true)
}

when(WaitUpstreamListener) {
Expand All @@ -321,12 +332,13 @@ private[transport] class ThrottledAssociation(
}

when(Throttling) {
case Event(mode: ThrottleMode, _)
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), _)
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString]
if (mode == Blackhole) throttledMessages = Queue.empty[ByteString]
cancelTimer(DequeueTimerName)
if (throttledMessages.nonEmpty)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
statusPromise.success(true)
stay()
case Event(InboundPayload(p), _)
forwardOrDelay(p)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport

import scala.concurrent.{ Promise, Future }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty

import akka.AkkaException
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty

import akka.{ OnlyCauseStackTrace, ConfigurationException }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty

import akka.actor.Address
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty

import akka.actor.Address
Expand Down

0 comments on commit 5d53ec0

Please sign in to comment.