Skip to content

Commit

Permalink
+htc akka#16819 implement server-side request timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
sirthias authored and ktoso committed Jan 19, 2016
1 parent 7eed4a3 commit 5823486
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 27 deletions.
44 changes: 44 additions & 0 deletions akka-http-core/src/main/java/akka/http/javadsl/TimeoutAccess.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.http.javadsl;

import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.Function;
import scala.concurrent.duration.Duration;

/**
* Enables programmatic access to the server-side request timeout logic.
*/
public interface TimeoutAccess {

/**
* Tries to set a new timeout.
* The timeout period is measured as of the point in time that the end of the request has been received,
* which may be in the past or in the future!
* Use `Duration.Inf` to completely disable request timeout checking for this request.
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
void updateTimeout(Duration timeout);

/**
* Tries to set a new timeout handler, which produces the timeout response for a
* given request. Note that the handler must produce the response synchronously and shouldn't block!
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
void updateHandler(Function<HttpRequest, HttpResponse> handler);

/**
* Tries to set a new timeout and handler at the same time.
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
void update(Duration timeout, Function<HttpRequest, HttpResponse> handler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.http.javadsl.model.headers;

/**
* Model for the synthetic `Timeout-Access` header.
*/
public abstract class TimeoutAccess extends akka.http.scaladsl.model.HttpHeader {
public abstract akka.http.javadsl.TimeoutAccess timeoutAccess();

public static TimeoutAccess create(akka.http.javadsl.TimeoutAccess timeoutAccess) {
return new akka.http.scaladsl.model.headers.Timeout$minusAccess((akka.http.scaladsl.TimeoutAccess) timeoutAccess);
}
}
12 changes: 12 additions & 0 deletions akka-http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ akka.http {
# Set to `infinite` to completely disable idle connection timeouts.
idle-timeout = 60 s

# Defines the default time period within which the application has to
# produce an HttpResponse for any given HttpRequest it received.
# The timeout begins to run when the *end* of the request has been
# received, so even potentially long uploads can have a short timeout.
# Set to `infinite` to completely disable request timeout checking.
#
# If this setting is not `infinite` the HTTP server layer attaches a
# `Timeout-Access` header to the request, which enables programmatic
# customization of the timeout period and timeout response for each
# request individually.
request-timeout = 20 s

# The time period within which the TCP binding process must be completed.
# Set to `infinite` to disable.
bind-timeout = 1s
Expand Down
4 changes: 4 additions & 0 deletions akka-http-core/src/main/scala/akka/http/ServerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ final case class ServerSettings(

object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.server") {
final case class Timeouts(idleTimeout: Duration,
requestTimeout: Duration,
bindTimeout: FiniteDuration) {
require(idleTimeout > Duration.Zero, "idleTimeout must be infinite or > 0")
require(requestTimeout > Duration.Zero, "requestTimeout must be infinite or > 0")
require(bindTimeout > Duration.Zero, "bindTimeout must be > 0")
}
implicit def timeoutsShortcut(s: ServerSettings): Timeouts = s.timeouts
Expand All @@ -55,6 +58,7 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve
c.getString("server-header").toOption.map(Server(_)),
Timeouts(
c getPotentiallyInfiniteDuration "idle-timeout",
c getPotentiallyInfiniteDuration "request-timeout",
c getFiniteDuration "bind-timeout"),
c getInt "max-connections",
c getInt "pipelining-limit",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,7 @@ private object PoolSlot {
case FromConnection(OnNext(response: HttpResponse))
val requestContext = inflightRequests.head
inflightRequests = inflightRequests.tail
val (entity, whenCompleted) = response.entity match {
case x: HttpEntity.Strict x -> FastFuture.successful(())
case x: HttpEntity.Default
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData) -> whenCompleted
case x: HttpEntity.CloseDelimited
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData) -> whenCompleted
case x: HttpEntity.Chunked
val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks)
x.copy(chunks = newChunks) -> whenCompleted
}
val (entity, whenCompleted) = HttpEntity.captureTermination(response.entity)
val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response withEntity entity)))
import fm.executionContext
val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ SlotEvent.RequestCompleted(slotIx)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,32 @@
package akka.http.impl.engine.server

import java.net.InetSocketAddress
import java.util.Random
import akka.stream.impl.fusing.GraphInterpreter
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ Promise, Future }
import scala.concurrent.duration.{ Deadline, FiniteDuration, Duration }
import scala.collection.immutable
import org.reactivestreams.{ Publisher, Subscriber }
import scala.util.control.NonFatal
import akka.actor.Cancellable
import akka.japi.Function
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.stream._
import akka.stream.io._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.http.ServerSettings
import akka.http.impl.engine.HttpConnectionTimeoutException
import akka.http.impl.engine.parsing.ParserOutput._
import akka.http.impl.engine.parsing._
import akka.http.impl.engine.rendering.{ HttpResponseRendererFactory, ResponseRenderingContext, ResponseRenderingOutput }
import akka.http.impl.engine.ws._
import akka.http.impl.util._
import akka.http.scaladsl.Http
import akka.http.scaladsl.util.FastFuture.EnhancedFuture
import akka.http.scaladsl.{ TimeoutAccess, Http }
import akka.http.scaladsl.model.headers.`Timeout-Access`
import akka.http.javadsl.model
import akka.http.scaladsl.model._
import akka.stream._
import akka.stream.impl.ConstantFun
import akka.stream.io._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.ByteString
import akka.http.scaladsl.model.ws.Message
import akka.stream.impl.fusing.SubSource

/**
* INTERNAL API
Expand All @@ -54,6 +57,7 @@ private[http] object HttpServerBluePrint {
def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter): Http.ServerLayer = {
val theStack =
userHandlerGuard(settings.pipeliningLimit) atop
requestTimeoutSupport(settings.timeouts.requestTimeout) atop
requestPreparation(settings) atop
controller(settings, log) atop
parsingRendering(settings, log) atop
Expand All @@ -78,6 +82,12 @@ private[http] object HttpServerBluePrint {
def requestPreparation(settings: ServerSettings): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, Unit] =
BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings))

def requestTimeoutSupport(timeout: Duration): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, Unit] =
timeout match {
case x: FiniteDuration BidiFlow.fromGraph(new RequestTimeoutSupport(x)).reversed
case _ BidiFlow.identity
}

final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] {
val in = Inlet[RequestOutput]("RequestStartThenRunIgnore.in")
val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out")
Expand All @@ -97,6 +107,7 @@ private[http] object HttpServerBluePrint {

val entity = createEntity(entityCreator) withSizeLimit settings.parserSettings.maxContentLength
push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol))
case _ throw new IllegalStateException
}
}
setHandler(in, idle)
Expand Down Expand Up @@ -173,6 +184,104 @@ private[http] object HttpServerBluePrint {
.via(Flow[ResponseRenderingOutput].transform(() errorHandling(errorHandler)).named("errorLogger"))
}

class RequestTimeoutSupport(initialTimeout: FiniteDuration)
extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] {
private val requestIn = Inlet[HttpRequest]("requestIn")
private val requestOut = Outlet[HttpRequest]("requestOut")
private val responseIn = Inlet[HttpResponse]("responseIn")
private val responseOut = Outlet[HttpResponse]("responseOut")

override def initialAttributes = Attributes.name("RequestTimeoutSupport")

val shape = new BidiShape(requestIn, requestOut, responseIn, responseOut)

def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
var openTimeouts = immutable.Queue[TimeoutAccessImpl]()
setHandler(requestIn, new InHandler {
def onPush(): Unit = {
val request = grab(requestIn)
val (entity, requestEnd) = HttpEntity.captureTermination(request.entity)
val access = new TimeoutAccessImpl(request, initialTimeout, requestEnd,
getAsyncCallback(emitTimeoutResponse), interpreter.materializer)
openTimeouts = openTimeouts.enqueue(access)
push(requestOut, request.copy(headers = request.headers :+ `Timeout-Access`(access), entity = entity))
}
override def onUpstreamFinish() = complete(requestOut)
override def onUpstreamFailure(ex: Throwable) = fail(requestOut, ex)
def emitTimeoutResponse(response: (TimeoutAccess, HttpResponse)) =
if (openTimeouts.head eq response._1) {
emit(responseOut, response._2, () complete(responseOut))
} // else the application response arrived after we scheduled the timeout response, which is close but ok
})
// TODO: provide and use default impl for simply connecting an input and an output port as we do here
setHandler(requestOut, new OutHandler {
def onPull(): Unit = pull(requestIn)
override def onDownstreamFinish() = cancel(requestIn)
})
setHandler(responseIn, new InHandler {
def onPush(): Unit = {
openTimeouts.head.clear()
openTimeouts = openTimeouts.tail
push(responseOut, grab(responseIn))
}
override def onUpstreamFinish() = complete(responseOut)
override def onUpstreamFailure(ex: Throwable) = fail(responseOut, ex)
})
setHandler(responseOut, new OutHandler {
def onPull(): Unit = pull(responseIn)
override def onDownstreamFinish() = cancel(responseIn)
})
}
}

private class TimeoutSetup(val timeoutBase: Deadline,
val scheduledTask: Cancellable,
val timeout: Duration,
val handler: HttpRequest HttpResponse)

private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit],
trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer)
extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest HttpResponse) { self
import materializer.executionContext

set {
requestEnd.fast.map(_ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this))
}

override def apply(request: HttpRequest) = HttpResponse(StatusCodes.ServiceUnavailable, entity = "The server was not able " +
"to produce a timely response to your request.\r\nPlease try again in a short while!")

def clear(): Unit = // best effort timeout cancellation
get.fast.foreach(setup if (setup.scheduledTask ne null) setup.scheduledTask.cancel())

override def updateTimeout(timeout: Duration): Unit = update(timeout, null: HttpRequest HttpResponse)
override def updateHandler(handler: HttpRequest HttpResponse): Unit = update(null, handler)
override def update(timeout: Duration, handler: HttpRequest HttpResponse): Unit = {
val promise = Promise[TimeoutSetup]()
for (old getAndSet(promise.future).fast)
promise.success {
if ((old.scheduledTask eq null) || old.scheduledTask.cancel()) {
val newHandler = if (handler eq null) old.handler else handler
val newTimeout = if (timeout eq null) old.timeout else timeout
val newScheduling = newTimeout match {
case x: FiniteDuration schedule(old.timeoutBase + x - Deadline.now, newHandler)
case _ null // don't schedule a new timeout
}
new TimeoutSetup(old.timeoutBase, newScheduling, newTimeout, newHandler)
} else old // too late, the previously set timeout cannot be cancelled anymore
}
}
private def schedule(delay: FiniteDuration, handler: HttpRequest HttpResponse): Cancellable =
materializer.scheduleOnce(delay, new Runnable { def run() = trigger.invoke(self, handler(request)) })

import akka.http.impl.util.JavaMapping.Implicits._
/** JAVA API **/
def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
update(timeout, handler(_: HttpRequest).asScala)
def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
updateHandler(handler(_: HttpRequest).asScala)
}

class ControllerStage(settings: ServerSettings, log: LoggingAdapter)
extends GraphStage[BidiShape[RequestOutput, RequestOutput, HttpResponse, ResponseRenderingContext]] {
private val requestParsingIn = Inlet[RequestOutput]("requestParsingIn")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.http.scaladsl

import scala.concurrent.duration.Duration
import akka.http.scaladsl.model.{ HttpResponse, HttpRequest }

/**
* Enables programmatic access to the server-side request timeout logic.
*/
trait TimeoutAccess extends akka.http.javadsl.TimeoutAccess {

/**
* Tries to set a new timeout.
* The timeout period is measured as of the point in time that the end of the request has been received,
* which may be in the past or in the future!
* Use `Duration.Inf` to completely disable request timeout checking for this request.
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
def updateTimeout(timeout: Duration): Unit

/**
* Tries to set a new timeout handler, which produces the timeout response for a
* given request. Note that the handler must produce the response synchronously and shouldn't block!
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
def updateHandler(handler: HttpRequest HttpResponse): Unit

/**
* Tries to set a new timeout and handler at the same time.
*
* Due to the inherent raciness it is not guaranteed that the update will be applied before
* the previously set timeout has expired!
*/
def update(timeout: Duration, handler: HttpRequest HttpResponse): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.{ japi, stream }
import akka.http.scaladsl.model.ContentType.{ NonBinary, Binary }
import akka.http.scaladsl.util.FastFuture
import akka.http.javadsl.{ model jm }
import akka.http.impl.util.StreamUtils
import akka.http.impl.util.JavaMapping.Implicits._

import scala.compat.java8.OptionConverters._
Expand Down Expand Up @@ -503,4 +504,24 @@ object HttpEntity {
private object SizeLimit {
val Disabled = -1 // any negative value will do
}

/**
* INTERNAL API
*/
private[http] def captureTermination[T <: HttpEntity](entity: T): (T, Future[Unit]) =
entity match {
case x: HttpEntity.Strict x.asInstanceOf[T] -> FastFuture.successful(())
case x: HttpEntity.Default
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData).asInstanceOf[T] -> whenCompleted
case x: HttpEntity.Chunked
val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks)
x.copy(chunks = newChunks).asInstanceOf[T] -> whenCompleted
case x: HttpEntity.CloseDelimited
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData).asInstanceOf[T] -> whenCompleted
case x: HttpEntity.IndefiniteLength
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData).asInstanceOf[T] -> whenCompleted
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,13 @@ final case class `Set-Cookie`(cookie: HttpCookie) extends jm.headers.SetCookie w
protected def companion = `Set-Cookie`
}

object `Timeout-Access` extends ModeledCompanion[`Timeout-Access`]
final case class `Timeout-Access`(timeoutAccess: akka.http.scaladsl.TimeoutAccess)
extends jm.headers.TimeoutAccess with SyntheticHeader {
def renderValue[R <: Rendering](r: R): r.type = r ~~ timeoutAccess.toString
protected def companion = `Timeout-Access`
}

/**
* Model for the synthetic `Tls-Session-Info` header which carries the SSLSession of the connection
* the message carrying this header was received with.
Expand Down
Loading

0 comments on commit 5823486

Please sign in to comment.