From 5823486ccbb850d827107b85c96ce17b22a9846e Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 14 Jan 2016 17:34:35 +0100 Subject: [PATCH] +htc #16819 implement server-side request timeouts --- .../java/akka/http/javadsl/TimeoutAccess.java | 44 ++++++ .../javadsl/model/headers/TimeoutAccess.java | 16 +++ .../src/main/resources/reference.conf | 12 ++ .../main/scala/akka/http/ServerSettings.scala | 4 + .../http/impl/engine/client/PoolSlot.scala | 13 +- .../engine/server/HttpServerBluePrint.scala | 131 ++++++++++++++++-- .../akka/http/scaladsl/TimeoutAccess.scala | 42 ++++++ .../akka/http/scaladsl/model/HttpEntity.scala | 21 +++ .../http/scaladsl/model/headers/headers.scala | 7 + .../impl/engine/server/HttpServerSpec.scala | 90 +++++++++++- .../akka/http/scaladsl/ClientServerSpec.scala | 2 +- .../scala/akka/stream/scaladsl/BidiFlow.scala | 5 + 12 files changed, 360 insertions(+), 27 deletions(-) create mode 100644 akka-http-core/src/main/java/akka/http/javadsl/TimeoutAccess.java create mode 100644 akka-http-core/src/main/java/akka/http/javadsl/model/headers/TimeoutAccess.java create mode 100644 akka-http-core/src/main/scala/akka/http/scaladsl/TimeoutAccess.scala diff --git a/akka-http-core/src/main/java/akka/http/javadsl/TimeoutAccess.java b/akka-http-core/src/main/java/akka/http/javadsl/TimeoutAccess.java new file mode 100644 index 00000000000..fe182a40195 --- /dev/null +++ b/akka-http-core/src/main/java/akka/http/javadsl/TimeoutAccess.java @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.javadsl; + +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.japi.Function; +import scala.concurrent.duration.Duration; + +/** + * Enables programmatic access to the server-side request timeout logic. + */ +public interface TimeoutAccess { + + /** + * Tries to set a new timeout. + * The timeout period is measured as of the point in time that the end of the request has been received, + * which may be in the past or in the future! + * Use `Duration.Inf` to completely disable request timeout checking for this request. + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + */ + void updateTimeout(Duration timeout); + + /** + * Tries to set a new timeout handler, which produces the timeout response for a + * given request. Note that the handler must produce the response synchronously and shouldn't block! + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + */ + void updateHandler(Function handler); + + /** + * Tries to set a new timeout and handler at the same time. + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + */ + void update(Duration timeout, Function handler); +} diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/headers/TimeoutAccess.java b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/TimeoutAccess.java new file mode 100644 index 00000000000..eb0e9de9285 --- /dev/null +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/TimeoutAccess.java @@ -0,0 +1,16 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.javadsl.model.headers; + +/** + * Model for the synthetic `Timeout-Access` header. + */ +public abstract class TimeoutAccess extends akka.http.scaladsl.model.HttpHeader { + public abstract akka.http.javadsl.TimeoutAccess timeoutAccess(); + + public static TimeoutAccess create(akka.http.javadsl.TimeoutAccess timeoutAccess) { + return new akka.http.scaladsl.model.headers.Timeout$minusAccess((akka.http.scaladsl.TimeoutAccess) timeoutAccess); + } +} diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index 36cd3eb332c..6eb274c636c 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -18,6 +18,18 @@ akka.http { # Set to `infinite` to completely disable idle connection timeouts. idle-timeout = 60 s + # Defines the default time period within which the application has to + # produce an HttpResponse for any given HttpRequest it received. + # The timeout begins to run when the *end* of the request has been + # received, so even potentially long uploads can have a short timeout. + # Set to `infinite` to completely disable request timeout checking. + # + # If this setting is not `infinite` the HTTP server layer attaches a + # `Timeout-Access` header to the request, which enables programmatic + # customization of the timeout period and timeout response for each + # request individually. + request-timeout = 20 s + # The time period within which the TCP binding process must be completed. # Set to `infinite` to disable. bind-timeout = 1s diff --git a/akka-http-core/src/main/scala/akka/http/ServerSettings.scala b/akka-http-core/src/main/scala/akka/http/ServerSettings.scala index fd2a887b4d8..7bde5f2c896 100644 --- a/akka-http-core/src/main/scala/akka/http/ServerSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ServerSettings.scala @@ -46,7 +46,10 @@ final case class ServerSettings( object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.server") { final case class Timeouts(idleTimeout: Duration, + requestTimeout: Duration, bindTimeout: FiniteDuration) { + require(idleTimeout > Duration.Zero, "idleTimeout must be infinite or > 0") + require(requestTimeout > Duration.Zero, "requestTimeout must be infinite or > 0") require(bindTimeout > Duration.Zero, "bindTimeout must be > 0") } implicit def timeoutsShortcut(s: ServerSettings): Timeouts = s.timeouts @@ -55,6 +58,7 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve c.getString("server-header").toOption.map(Server(_)), Timeouts( c getPotentiallyInfiniteDuration "idle-timeout", + c getPotentiallyInfiniteDuration "request-timeout", c getFiniteDuration "bind-timeout"), c getInt "max-connections", c getInt "pipelining-limit", diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 1e6ee2ce25a..6c06b1abb08 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -163,18 +163,7 @@ private object PoolSlot { case FromConnection(OnNext(response: HttpResponse)) ⇒ val requestContext = inflightRequests.head inflightRequests = inflightRequests.tail - val (entity, whenCompleted) = response.entity match { - case x: HttpEntity.Strict ⇒ x -> FastFuture.successful(()) - case x: HttpEntity.Default ⇒ - val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) - x.copy(data = newData) -> whenCompleted - case x: HttpEntity.CloseDelimited ⇒ - val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) - x.copy(data = newData) -> whenCompleted - case x: HttpEntity.Chunked ⇒ - val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks) - x.copy(chunks = newChunks) -> whenCompleted - } + val (entity, whenCompleted) = HttpEntity.captureTermination(response.entity) val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response withEntity entity))) import fm.executionContext val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ ⇒ SlotEvent.RequestCompleted(slotIx))) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 970e0db8658..5b5a3d293e5 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -5,12 +5,19 @@ package akka.http.impl.engine.server import java.net.InetSocketAddress -import java.util.Random -import akka.stream.impl.fusing.GraphInterpreter +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.{ Promise, Future } +import scala.concurrent.duration.{ Deadline, FiniteDuration, Duration } import scala.collection.immutable -import org.reactivestreams.{ Publisher, Subscriber } import scala.util.control.NonFatal +import akka.actor.Cancellable +import akka.japi.Function import akka.event.LoggingAdapter +import akka.util.ByteString +import akka.stream._ +import akka.stream.io._ +import akka.stream.scaladsl._ +import akka.stream.stage._ import akka.http.ServerSettings import akka.http.impl.engine.HttpConnectionTimeoutException import akka.http.impl.engine.parsing.ParserOutput._ @@ -18,16 +25,12 @@ import akka.http.impl.engine.parsing._ import akka.http.impl.engine.rendering.{ HttpResponseRendererFactory, ResponseRenderingContext, ResponseRenderingOutput } import akka.http.impl.engine.ws._ import akka.http.impl.util._ -import akka.http.scaladsl.Http +import akka.http.scaladsl.util.FastFuture.EnhancedFuture +import akka.http.scaladsl.{ TimeoutAccess, Http } +import akka.http.scaladsl.model.headers.`Timeout-Access` +import akka.http.javadsl.model import akka.http.scaladsl.model._ -import akka.stream._ -import akka.stream.impl.ConstantFun -import akka.stream.io._ -import akka.stream.scaladsl._ -import akka.stream.stage._ -import akka.util.ByteString import akka.http.scaladsl.model.ws.Message -import akka.stream.impl.fusing.SubSource /** * INTERNAL API @@ -54,6 +57,7 @@ private[http] object HttpServerBluePrint { def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter): Http.ServerLayer = { val theStack = userHandlerGuard(settings.pipeliningLimit) atop + requestTimeoutSupport(settings.timeouts.requestTimeout) atop requestPreparation(settings) atop controller(settings, log) atop parsingRendering(settings, log) atop @@ -78,6 +82,12 @@ private[http] object HttpServerBluePrint { def requestPreparation(settings: ServerSettings): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, Unit] = BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings)) + def requestTimeoutSupport(timeout: Duration): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, Unit] = + timeout match { + case x: FiniteDuration ⇒ BidiFlow.fromGraph(new RequestTimeoutSupport(x)).reversed + case _ ⇒ BidiFlow.identity + } + final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] { val in = Inlet[RequestOutput]("RequestStartThenRunIgnore.in") val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out") @@ -97,6 +107,7 @@ private[http] object HttpServerBluePrint { val entity = createEntity(entityCreator) withSizeLimit settings.parserSettings.maxContentLength push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol)) + case _ ⇒ throw new IllegalStateException } } setHandler(in, idle) @@ -173,6 +184,104 @@ private[http] object HttpServerBluePrint { .via(Flow[ResponseRenderingOutput].transform(() ⇒ errorHandling(errorHandler)).named("errorLogger")) } + class RequestTimeoutSupport(initialTimeout: FiniteDuration) + extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] { + private val requestIn = Inlet[HttpRequest]("requestIn") + private val requestOut = Outlet[HttpRequest]("requestOut") + private val responseIn = Inlet[HttpResponse]("responseIn") + private val responseOut = Outlet[HttpResponse]("responseOut") + + override def initialAttributes = Attributes.name("RequestTimeoutSupport") + + val shape = new BidiShape(requestIn, requestOut, responseIn, responseOut) + + def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { + var openTimeouts = immutable.Queue[TimeoutAccessImpl]() + setHandler(requestIn, new InHandler { + def onPush(): Unit = { + val request = grab(requestIn) + val (entity, requestEnd) = HttpEntity.captureTermination(request.entity) + val access = new TimeoutAccessImpl(request, initialTimeout, requestEnd, + getAsyncCallback(emitTimeoutResponse), interpreter.materializer) + openTimeouts = openTimeouts.enqueue(access) + push(requestOut, request.copy(headers = request.headers :+ `Timeout-Access`(access), entity = entity)) + } + override def onUpstreamFinish() = complete(requestOut) + override def onUpstreamFailure(ex: Throwable) = fail(requestOut, ex) + def emitTimeoutResponse(response: (TimeoutAccess, HttpResponse)) = + if (openTimeouts.head eq response._1) { + emit(responseOut, response._2, () ⇒ complete(responseOut)) + } // else the application response arrived after we scheduled the timeout response, which is close but ok + }) + // TODO: provide and use default impl for simply connecting an input and an output port as we do here + setHandler(requestOut, new OutHandler { + def onPull(): Unit = pull(requestIn) + override def onDownstreamFinish() = cancel(requestIn) + }) + setHandler(responseIn, new InHandler { + def onPush(): Unit = { + openTimeouts.head.clear() + openTimeouts = openTimeouts.tail + push(responseOut, grab(responseIn)) + } + override def onUpstreamFinish() = complete(responseOut) + override def onUpstreamFailure(ex: Throwable) = fail(responseOut, ex) + }) + setHandler(responseOut, new OutHandler { + def onPull(): Unit = pull(responseIn) + override def onDownstreamFinish() = cancel(responseIn) + }) + } + } + + private class TimeoutSetup(val timeoutBase: Deadline, + val scheduledTask: Cancellable, + val timeout: Duration, + val handler: HttpRequest ⇒ HttpResponse) + + private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit], + trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer) + extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒ + import materializer.executionContext + + set { + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this)) + } + + override def apply(request: HttpRequest) = HttpResponse(StatusCodes.ServiceUnavailable, entity = "The server was not able " + + "to produce a timely response to your request.\r\nPlease try again in a short while!") + + def clear(): Unit = // best effort timeout cancellation + get.fast.foreach(setup ⇒ if (setup.scheduledTask ne null) setup.scheduledTask.cancel()) + + override def updateTimeout(timeout: Duration): Unit = update(timeout, null: HttpRequest ⇒ HttpResponse) + override def updateHandler(handler: HttpRequest ⇒ HttpResponse): Unit = update(null, handler) + override def update(timeout: Duration, handler: HttpRequest ⇒ HttpResponse): Unit = { + val promise = Promise[TimeoutSetup]() + for (old ← getAndSet(promise.future).fast) + promise.success { + if ((old.scheduledTask eq null) || old.scheduledTask.cancel()) { + val newHandler = if (handler eq null) old.handler else handler + val newTimeout = if (timeout eq null) old.timeout else timeout + val newScheduling = newTimeout match { + case x: FiniteDuration ⇒ schedule(old.timeoutBase + x - Deadline.now, newHandler) + case _ ⇒ null // don't schedule a new timeout + } + new TimeoutSetup(old.timeoutBase, newScheduling, newTimeout, newHandler) + } else old // too late, the previously set timeout cannot be cancelled anymore + } + } + private def schedule(delay: FiniteDuration, handler: HttpRequest ⇒ HttpResponse): Cancellable = + materializer.scheduleOnce(delay, new Runnable { def run() = trigger.invoke(self, handler(request)) }) + + import akka.http.impl.util.JavaMapping.Implicits._ + /** JAVA API **/ + def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit = + update(timeout, handler(_: HttpRequest).asScala) + def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit = + updateHandler(handler(_: HttpRequest).asScala) + } + class ControllerStage(settings: ServerSettings, log: LoggingAdapter) extends GraphStage[BidiShape[RequestOutput, RequestOutput, HttpResponse, ResponseRenderingContext]] { private val requestParsingIn = Inlet[RequestOutput]("requestParsingIn") diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/TimeoutAccess.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/TimeoutAccess.scala new file mode 100644 index 00000000000..776ce602607 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/TimeoutAccess.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.scaladsl + +import scala.concurrent.duration.Duration +import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } + +/** + * Enables programmatic access to the server-side request timeout logic. + */ +trait TimeoutAccess extends akka.http.javadsl.TimeoutAccess { + + /** + * Tries to set a new timeout. + * The timeout period is measured as of the point in time that the end of the request has been received, + * which may be in the past or in the future! + * Use `Duration.Inf` to completely disable request timeout checking for this request. + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + */ + def updateTimeout(timeout: Duration): Unit + + /** + * Tries to set a new timeout handler, which produces the timeout response for a + * given request. Note that the handler must produce the response synchronously and shouldn't block! + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + */ + def updateHandler(handler: HttpRequest ⇒ HttpResponse): Unit + + /** + * Tries to set a new timeout and handler at the same time. + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + */ + def update(timeout: Duration, handler: HttpRequest ⇒ HttpResponse): Unit +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 161292239c3..2c9b87913a5 100755 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -21,6 +21,7 @@ import akka.{ japi, stream } import akka.http.scaladsl.model.ContentType.{ NonBinary, Binary } import akka.http.scaladsl.util.FastFuture import akka.http.javadsl.{ model ⇒ jm } +import akka.http.impl.util.StreamUtils import akka.http.impl.util.JavaMapping.Implicits._ import scala.compat.java8.OptionConverters._ @@ -503,4 +504,24 @@ object HttpEntity { private object SizeLimit { val Disabled = -1 // any negative value will do } + + /** + * INTERNAL API + */ + private[http] def captureTermination[T <: HttpEntity](entity: T): (T, Future[Unit]) = + entity match { + case x: HttpEntity.Strict ⇒ x.asInstanceOf[T] -> FastFuture.successful(()) + case x: HttpEntity.Default ⇒ + val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) + x.copy(data = newData).asInstanceOf[T] -> whenCompleted + case x: HttpEntity.Chunked ⇒ + val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks) + x.copy(chunks = newChunks).asInstanceOf[T] -> whenCompleted + case x: HttpEntity.CloseDelimited ⇒ + val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) + x.copy(data = newData).asInstanceOf[T] -> whenCompleted + case x: HttpEntity.IndefiniteLength ⇒ + val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) + x.copy(data = newData).asInstanceOf[T] -> whenCompleted + } } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala index 335306b8546..121e8c27edd 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala @@ -795,6 +795,13 @@ final case class `Set-Cookie`(cookie: HttpCookie) extends jm.headers.SetCookie w protected def companion = `Set-Cookie` } +object `Timeout-Access` extends ModeledCompanion[`Timeout-Access`] +final case class `Timeout-Access`(timeoutAccess: akka.http.scaladsl.TimeoutAccess) + extends jm.headers.TimeoutAccess with SyntheticHeader { + def renderValue[R <: Rendering](r: R): r.type = r ~~ timeoutAccess.toString + protected def companion = `Timeout-Access` +} + /** * Model for the synthetic `Tls-Session-Info` header which carries the SSLSession of the connection * the message carrying this header was received with. diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index 007a5469a0c..b580a5b000c 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -22,7 +22,10 @@ import HttpEntity._ import MediaTypes._ import HttpMethods._ -class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") with Inside { spec ⇒ +class HttpServerSpec extends AkkaSpec( + """akka.loggers = [] + akka.loglevel = OFF + akka.http.server.request-timeout = infinite""") with Inside { spec ⇒ implicit val materializer = ActorMaterializer() "The server implementation" should { @@ -698,6 +701,82 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") request.headers should contain(`Remote-Address`(RemoteAddress(theAddress, Some(8080)))) } + "support request timeouts" which { + + "are defined via the config" in new RequestTimeoutTestSetup(10.millis) { + send("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n") + expectRequest().header[`Timeout-Access`] shouldBe defined + expectResponseWithWipedDate( + """HTTP/1.1 503 Service Unavailable + |Server: akka-http/test + |Date: XXXX + |Content-Type: text/plain; charset=UTF-8 + |Content-Length: 105 + | + |The server was not able to produce a timely response to your request. + |Please try again in a short while!""") + } + + "are programmatically increased (not expiring)" in new RequestTimeoutTestSetup(10.millis) { + send("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n") + expectRequest().header[`Timeout-Access`].foreach(_.timeoutAccess.updateTimeout(50.millis)) + netOut.expectNoBytes(30.millis) + responses.sendNext(HttpResponse()) + expectResponseWithWipedDate( + """HTTP/1.1 200 OK + |Server: akka-http/test + |Date: XXXX + |Content-Length: 0 + | + |""") + } + + "are programmatically increased (expiring)" in new RequestTimeoutTestSetup(10.millis) { + send("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n") + expectRequest().header[`Timeout-Access`].foreach(_.timeoutAccess.updateTimeout(50.millis)) + netOut.expectNoBytes(30.millis) + expectResponseWithWipedDate( + """HTTP/1.1 503 Service Unavailable + |Server: akka-http/test + |Date: XXXX + |Content-Type: text/plain; charset=UTF-8 + |Content-Length: 105 + | + |The server was not able to produce a timely response to your request. + |Please try again in a short while!""") + } + + "are programmatically decreased" in new RequestTimeoutTestSetup(50.millis) { + send("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n") + expectRequest().header[`Timeout-Access`].foreach(_.timeoutAccess.updateTimeout(10.millis)) + val mark = System.nanoTime() + expectResponseWithWipedDate( + """HTTP/1.1 503 Service Unavailable + |Server: akka-http/test + |Date: XXXX + |Content-Type: text/plain; charset=UTF-8 + |Content-Length: 105 + | + |The server was not able to produce a timely response to your request. + |Please try again in a short while!""") + (System.nanoTime() - mark) should be < (40 * 1000000L) + } + + "have a programmatically set timeout handler" in new RequestTimeoutTestSetup(10.millis) { + send("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n") + val timeoutResponse = HttpResponse(StatusCodes.InternalServerError, entity = "OOPS!") + expectRequest().header[`Timeout-Access`].foreach(_.timeoutAccess.updateHandler(_ ⇒ timeoutResponse)) + expectResponseWithWipedDate( + """HTTP/1.1 500 Internal Server Error + |Server: akka-http/test + |Date: XXXX + |Content-Type: text/plain; charset=UTF-8 + |Content-Length: 5 + | + |OOPS!""") + } + } + "add `Connection: close` to early responses" in new TestSetup { send("""POST / HTTP/1.1 |Host: example.com @@ -723,8 +802,7 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") netOut.expectComplete() } - def isDefinedVia = afterWord("is defined via") - "support request length verification" which isDefinedVia { + "support request length verification" which afterWord("is defined via") { class LengthVerificationTest(maxContentLength: Int) extends TestSetup(maxContentLength) { val entityBase = "0123456789ABCD" @@ -912,4 +990,10 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") else s.copy(parserSettings = s.parserSettings.copy(maxContentLength = maxContentLength)) } } + class RequestTimeoutTestSetup(requestTimeout: Duration) extends TestSetup { + override def settings = { + val s = super.settings + s.copy(timeouts = s.timeouts.copy(requestTimeout = requestTimeout)) + } + } } diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 3d45b3ac219..c7b412c4e79 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -35,7 +35,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit akka.stdout-loglevel = ERROR windows-connection-abort-workaround-enabled = auto akka.log-dead-letters = OFF - """) + akka.http.server.request-timeout = infinite""") implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher implicit val materializer = ActorMaterializer() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 9b652997e86..8e35c49844c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -155,6 +155,11 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu } object BidiFlow { + private[this] val _identity: BidiFlow[Any, Any, Any, Any, Unit] = + BidiFlow.fromFlows(Flow[Any], Flow[Any]) + + def identity[A, B]: BidiFlow[A, A, B, B, Unit] = _identity.asInstanceOf[BidiFlow[A, A, B, B, Unit]] + /** * A graph with the shape of a flow logically is a flow, this method makes * it so also in type.