Skip to content

Commit

Permalink
=htc migrate BodyPartRenderer to GraphStage akka#20288
Browse files Browse the repository at this point in the history
  • Loading branch information
leachbj authored and johanandren committed Jun 10, 2016
1 parent 9ffdf81 commit 95cfbda
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.util.ByteString
import HttpEntity._
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }

import scala.concurrent.forkjoin.ThreadLocalRandom

Expand All @@ -29,46 +30,60 @@ private[http] object BodyPartRenderer {
boundary: String,
nioCharset: Charset,
partHeadersSizeHint: Int,
log: LoggingAdapter): PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Any]] =
new PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Any]] {
log: LoggingAdapter): GraphStage[FlowShape[Multipart.BodyPart, Source[ChunkStreamPart, Any]]] =
new GraphStage[FlowShape[Multipart.BodyPart, Source[ChunkStreamPart, Any]]] {
var firstBoundaryRendered = false

override def onPush(bodyPart: Multipart.BodyPart, ctx: Context[Source[ChunkStreamPart, Any]]): SyncDirective = {
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)
val in: Inlet[Multipart.BodyPart] = Inlet("BodyPartRenderer.in")
val out: Outlet[Source[ChunkStreamPart, Any]] = Outlet("BodyPartRenderer.out")
override val shape: FlowShape[Multipart.BodyPart, Source[ChunkStreamPart, Any]] = FlowShape(in, out)

def bodyPartChunks(data: Source[ByteString, Any]): Source[ChunkStreamPart, Any] = {
val entityChunks = data.map[ChunkStreamPart](Chunk(_))
(chunkStream(r.get) ++ entityChunks).mapMaterializedValue((_) ())
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)

def bodyPartChunks(data: Source[ByteString, Any]): Source[ChunkStreamPart, Any] = {
val entityChunks = data.map[ChunkStreamPart](Chunk(_))
(chunkStream(r.get) ++ entityChunks).mapMaterializedValue((_) ())
}

def completePartRendering(entity: HttpEntity): Source[ChunkStreamPart, Any] =
entity match {
case x if x.isKnownEmpty chunkStream(r.get)
case Strict(_, data) chunkStream((r ~~ data).get)
case Default(_, _, data) bodyPartChunks(data)
case IndefiniteLength(_, data) bodyPartChunks(data)
}

renderBoundary(r, boundary, suppressInitialCrLf = !firstBoundaryRendered)
firstBoundaryRendered = true

val bodyPart = grab(in)
renderEntityContentType(r, bodyPart.entity)
renderHeaders(r, bodyPart.headers, log)

def completePartRendering(): Source[ChunkStreamPart, Any] =
bodyPart.entity match {
case x if x.isKnownEmpty chunkStream(r.get)
case Strict(_, data) chunkStream((r ~~ data).get)
case Default(_, _, data) bodyPartChunks(data)
case IndefiniteLength(_, data) bodyPartChunks(data)
push(out, completePartRendering(bodyPart.entity))
}

renderBoundary(r, boundary, suppressInitialCrLf = !firstBoundaryRendered)
firstBoundaryRendered = true
renderEntityContentType(r, bodyPart.entity)
renderHeaders(r, bodyPart.headers, log)
ctx.push(completePartRendering())
}
override def onPull(): Unit =
if (isClosed(in) && firstBoundaryRendered)
completeRendering()
else if (isClosed(in)) completeStage()
else pull(in)

override def onPull(ctx: Context[Source[ChunkStreamPart, Any]]): SyncDirective = {
val finishing = ctx.isFinishing
if (finishing && firstBoundaryRendered) {
val r = new ByteStringRendering(boundary.length + 4)
renderFinalBoundary(r, boundary)
ctx.pushAndFinish(chunkStream(r.get))
} else if (finishing)
ctx.finish()
else
ctx.pull()
}
override def onUpstreamFinish(): Unit =
if (isAvailable(out) && firstBoundaryRendered) completeRendering()

override def onUpstreamFinish(ctx: Context[Source[ChunkStreamPart, Any]]): TerminationDirective = ctx.absorbTermination()
private def completeRendering(): Unit = {
val r = new ByteStringRendering(boundary.length + 4)
renderFinalBoundary(r, boundary)
push(out, chunkStream(r.get))
completeStage()
}

setHandlers(in, out, this)
}

private def chunkStream(byteString: ByteString): Source[ChunkStreamPart, Any] =
Source.single(Chunk(byteString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ sealed trait Multipart extends jm.Multipart {
boundary: String = BodyPartRenderer.randomBoundary())(implicit log: LoggingAdapter = NoLogging): MessageEntity = {
val chunks =
parts
.transform(() BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.via(BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.flatMapConcat(ConstantFun.scalaIdentityFunction)
HttpEntity.Chunked(mediaType withBoundary boundary withCharset charset, chunks)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

package akka.http.scaladsl.model

import com.typesafe.config.{ Config, ConfigFactory }
import com.typesafe.config.{Config, ConfigFactory}

import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Inside, Matchers, WordSpec }
import org.scalatest.{BeforeAndAfterAll, Inside, Matchers, WordSpec}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import akka.actor.ActorSystem
import headers._
Expand All @@ -34,6 +35,16 @@ class MultipartSpec extends WordSpec with Matchers with Inside with BeforeAndAft
MediaTypes.`multipart/mixed`,
Multipart.General.BodyPart.Strict(HttpEntity("data"), List(ETag("xzy"))))
}

"support `toEntity`" in {
val streamed = Multipart.General(
MediaTypes.`multipart/mixed`,
Source(Multipart.General.BodyPart(defaultEntity("data"), List(ETag("xzy"))) :: Nil))
val result = streamed.toEntity(boundary = "boundary")
result.contentType shouldBe MediaTypes.`multipart/mixed`.withBoundary("boundary").withCharset(HttpCharsets.`UTF-8`)
val encoding = Await.result(result.dataBytes.runWith(Sink.seq), 1.second)
encoding .map(_.utf8String).mkString shouldBe "--boundary\r\nContent-Type: text/plain; charset=UTF-8\r\nETag: \"xzy\"\r\n\r\ndata\r\n--boundary--"
}
}

"Multipart.FormData" should {
Expand Down
7 changes: 5 additions & 2 deletions project/MiMa.scala
Original file line number Diff line number Diff line change
Expand Up @@ -882,8 +882,11 @@ object MiMa extends AutoPlugin {

// #20683
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpMessage.discardEntityBytes"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpMessage.discardEntityBytes")
)
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpMessage.discardEntityBytes"),

// #20288 migrate BodyPartRenderer to GraphStage
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.engine.rendering.BodyPartRenderer.streamed")
)
)
}
}

0 comments on commit 95cfbda

Please sign in to comment.