Skip to content

Commit

Permalink
Merge pull request akka#19425 from akka/wip-19094-fix-double-pull-Poo…
Browse files Browse the repository at this point in the history
…lConductor-RK

Wip 19094 fix double pull pool conductor rk
  • Loading branch information
rkuhn committed Jan 13, 2016
2 parents 55d9605 + e1922da commit be0c8af
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ private[http] object OutgoingConnectionBlueprint {
case (Seq(MessageEnd), remaining)
SubSource.kill(remaining)
false
case _
true
case (seq, _)
seq.nonEmpty
}
.map {
case (Seq(ResponseStart(statusCode, protocol, headers, createEntity, _)), entityParts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,16 @@ private object PoolConductor {
slotStates(slotIx) = slotStateAfterDisconnect(slotStates(slotIx), failed)
}
pull(slotIn)
val tryPull = nextSlot == -1
val wasBlocked = nextSlot == -1
nextSlot = bestSlot()
if (tryPull) tryPullCtx()
val nowUnblocked = nextSlot != -1
if (wasBlocked && nowUnblocked) pull(ctxIn) // get next request context
}
})

setHandler(out, eagerTerminateOutput)

val tryPullCtx = () if (nextSlot != -1) pull(ctxIn)
val tryPullCtx = () if (nextSlot != -1 && !hasBeenPulled(ctxIn)) pull(ctxIn)

override def preStart(): Unit = {
pull(ctxIn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[http] object Websocket {
*/
def messageAPI(serverSide: Boolean,
closeTimeout: FiniteDuration): BidiFlow[FrameHandler.Output, Message, Message, FrameOutHandler.Input, Unit] = {
/** Completes this branch of the flow if no more messages are expected and converts close codes into errors */
/* Completes this branch of the flow if no more messages are expected and converts close codes into errors */
class PrepareForUserHandler extends PushStage[MessagePart, MessagePart] {
var inMessage = false
def onPush(elem: MessagePart, ctx: Context[MessagePart]): SyncDirective = elem match {
Expand All @@ -80,7 +80,7 @@ private[http] object Websocket {
}
}

/** Collects user-level API messages from MessageDataParts */
/* Collects user-level API messages from MessageDataParts */
val collectMessage: Flow[MessageDataPart, Message, Unit] =
Flow[MessageDataPart]
.prefixAndTail(1)
Expand Down Expand Up @@ -172,13 +172,12 @@ private[http] object Websocket {
}
}
})
val pullIn = () pull(in)
val pullIn = () tryPull(in)

setHandler(bypass, eagerTerminateOutput)
setHandler(user, ignoreTerminateOutput)

override def preStart(): Unit = {
super.preStart()
pullIn()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.impl.engine.ws

import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await
import com.typesafe.config.ConfigFactory
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl._
import akka.stream.scaladsl._
import akka.stream._
import scala.concurrent.duration._
import org.scalatest.concurrent.ScalaFutures
import org.scalactic.ConversionCheckedTripleEquals
import akka.stream.testkit.Utils

class BypassRouterSpec extends AkkaSpec("akka.stream.materializer.debug.fuzzing-mode = off") with ScalaFutures with ConversionCheckedTripleEquals {

implicit val patience = PatienceConfig(3.seconds)
import system.dispatcher
implicit val materializer = ActorMaterializer()

"BypassRouter" must {

"work without double pull-ing some ports" in Utils.assertAllStagesStopped {
val bindingFuture = Http().bindAndHandleSync({
case HttpRequest(_, _, headers, _, _)
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket u }.get
upgrade.handleMessages(Flow.apply, None)
}, interface = "localhost", port = 8080)
val binding = Await.result(bindingFuture, 3.seconds)

val N = 100
val (response, count) = Http().singleWebsocketRequest(
WebsocketRequest("ws://127.0.0.1:8080"),
Flow.fromSinkAndSourceMat(
Sink.fold(0)((n, _: Message) n + 1),
Source.repeat(TextMessage("hello")).take(N))(Keep.left))

count.futureValue should ===(N)
binding.unbind()
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,9 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext
if (shell.isInitialized) {
// yes, this steals another shell’s Resume, but that’s okay because extra ones will just not do anything
finishShellRegistration()
} else tryInit(shell)
} else if (!tryInit(shell)) {
if (activeInterpreters.isEmpty) finishShellRegistration()
}
}

override def preStart(): Unit = {
Expand Down Expand Up @@ -564,5 +566,9 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext
}
}

override def postStop(): Unit = activeInterpreters.foreach(_.tryAbort(AbruptTerminationException(self)))
override def postStop(): Unit = {
val ex = AbruptTerminationException(self)
activeInterpreters.foreach(_.tryAbort(ex))
newShells.foreach(s if (tryInit(s)) s.tryAbort(ex))
}
}

0 comments on commit be0c8af

Please sign in to comment.