Skip to content

Commit

Permalink
backpressure (vapor#1979)
Browse files Browse the repository at this point in the history
* backpressure

* update HTTP request decoder w/ backpessure

* update circle

* fix copied circle ci

* add snippet

* comment out xenial tests
  • Loading branch information
tanner0101 authored May 30, 2019
1 parent 24cdfe9 commit d07a918
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 41 deletions.
31 changes: 31 additions & 0 deletions Sources/Development/routes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,36 @@ public func routes(_ r: Routes, _ c: Container) throws {
r.on(.GET, "ping", body: .stream) { req in
return "123" as StaticString
}

// ( echo -e 'POST /slow-stream HTTP/1.1\r\nContent-Length: 1000000000\r\n\r\n'; dd if=/dev/zero; ) | nc localhost 8080
r.on(.POST, "slow-stream", body: .stream) { req -> EventLoopFuture<String> in
let done = req.eventLoop.makePromise(of: String.self)

var total = 0
req.body.drain { result in
let promise = req.eventLoop.makePromise(of: Void.self)

switch result {
case .buffer(let buffer):
req.eventLoop.scheduleTask(in: .milliseconds(1000)) {
total += buffer.readableBytes
promise.succeed(())
}
case .error(let error):
done.fail(error)
case .end:
promise.succeed(())
done.succeed(total.description)
}

// manually return pre-completed future
// this should balloon in memory
// return req.eventLoop.makeSucceededFuture(())
return promise.futureResult
}

return done.futureResult
}

r.post("login") { req -> String in
let creds = try req.content.decode(Creds.self)
Expand Down Expand Up @@ -46,6 +76,7 @@ public func routes(_ r: Routes, _ c: Container) throws {
case .end:
promise.succeed("Done")
}
return req.eventLoop.makeSucceededFuture(())
}
return promise.futureResult
}
Expand Down
11 changes: 10 additions & 1 deletion Sources/Vapor/HTTP/BodyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,14 @@ public enum BodyStreamResult {
}

public protocol BodyStreamWriter {
func write(_ result: BodyStreamResult)
var eventLoop: EventLoop { get }
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?)
}

extension BodyStreamWriter {
public func write(_ result: BodyStreamResult) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.write(result, promise: promise)
return promise.futureResult
}
}
8 changes: 5 additions & 3 deletions Sources/Vapor/Request/Request+Body.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ extension Request {
}
}

public func drain(_ handler: @escaping (BodyStreamResult) -> ()) {
public func drain(_ handler: @escaping (BodyStreamResult) -> EventLoopFuture<Void>) {
switch self.request.bodyStorage {
case .stream(let stream):
stream.read(handler)
stream.read { (result, promise) in
handler(result).cascade(to: promise)
}
case .collected(let buffer):
handler(.buffer(buffer))
_ = handler(.buffer(buffer))
case .none: break
}
}
Expand Down
28 changes: 14 additions & 14 deletions Sources/Vapor/Request/Request+BodyStream.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extension Request {
final class BodyStream: BodyStreamWriter {
/// Handles an incoming `HTTPChunkedStreamResult`.
typealias Handler = (BodyStreamResult) -> ()
typealias Handler = (BodyStreamResult, EventLoopPromise<Void>?) -> ()

/// If `true`, this `HTTPChunkedStream` has already sent an `end` chunk.
private(set) var isClosed: Bool
Expand All @@ -11,13 +11,12 @@ extension Request {

/// If a `handler` has not been set when `write(_:)` is called, this property
/// is used to store the waiting data.
private var buffer: [BodyStreamResult]

/// Creates a new `HTTPChunkedStream`.
///
/// - parameters:
/// - worker: `Worker` to complete futures on.
init() {
private var buffer: [(BodyStreamResult, EventLoopPromise<Void>?)]

let eventLoop: EventLoop

init(on eventLoop: EventLoop) {
self.eventLoop = eventLoop
self.isClosed = false
self.buffer = []
}
Expand All @@ -33,8 +32,8 @@ extension Request {
/// - handler: `HTTPChunkedHandler` to use for receiving chunks from this stream.
func read(_ handler: @escaping Handler) {
self.handler = handler
for item in self.buffer {
handler(item)
for (result, promise) in self.buffer {
handler(result, promise)
}
self.buffer = []
}
Expand All @@ -49,15 +48,15 @@ extension Request {
/// - chunk: A `HTTPChunkedStreamResult` to write to the stream.
/// - returns: A `Future` that will be completed when the write was successful.
/// You must wait for this future to complete before calling `write(_:)` again.
func write(_ chunk: BodyStreamResult) {
func write(_ chunk: BodyStreamResult, promise: EventLoopPromise<Void>?) {
if case .end = chunk {
self.isClosed = true
}

if let handler = handler {
handler(chunk)
handler(chunk, promise)
} else {
self.buffer.append(chunk)
self.buffer.append((chunk, promise))
}
}

Expand All @@ -74,7 +73,7 @@ extension Request {
func consume(max: Int, on eventLoop: EventLoop) -> EventLoopFuture<ByteBuffer> {
let promise = eventLoop.makePromise(of: ByteBuffer.self)
var data = ByteBufferAllocator().buffer(capacity: 0)
self.read { chunk in
self.read { chunk, next in
switch chunk {
case .buffer(var buffer):
if data.readableBytes + buffer.readableBytes >= max {
Expand All @@ -85,6 +84,7 @@ extension Request {
case .error(let error): promise.fail(error)
case .end: promise.succeed(data)
}
next?.succeed(())
}
return promise.futureResult
}
Expand Down
47 changes: 41 additions & 6 deletions Sources/Vapor/Server/HTTPServerRequestDecoder.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import NIO
import NIOHTTP1

final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHandler {
final class HTTPServerRequestDecoder: ChannelDuplexHandler, RemovableChannelHandler {
typealias InboundIn = HTTPServerRequestPart
typealias InboundOut = Request
typealias OutboundIn = Never

/// Tracks current HTTP server state
enum RequestState {
Expand All @@ -21,11 +22,16 @@ final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHan
private let maxBodySize: Int

private let logger: Logger

var isWritable: Bool
var hasReadPending: Bool

init(maxBodySize: Int) {
self.maxBodySize = maxBodySize
self.requestState = .ready
self.logger = Logger(label: "codes.vapor.server")
self.isWritable = true
self.hasReadPending = false
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
Expand Down Expand Up @@ -58,14 +64,18 @@ final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHan
case .awaitingBody(let request):
self.requestState = .awaitingEnd(request, buffer)
case .awaitingEnd(let request, let previousBuffer):
let stream = Request.BodyStream()
let stream = Request.BodyStream(on: context.eventLoop)
request.bodyStorage = .stream(stream)
context.fireChannelRead(self.wrapInboundOut(request))
stream.write(.buffer(previousBuffer))
stream.write(.buffer(buffer))
let done = stream.write(.buffer(previousBuffer)).flatMap {
stream.write(.buffer(buffer))
}
self.updateReadability(done, context: context)
self.requestState = .streamingBody(stream)
case .streamingBody(let stream):
stream.write(.buffer(buffer))
self.isWritable = false
let done = stream.write(.buffer(buffer))
self.updateReadability(done, context: context)
}
case .end(let tailHeaders):
assert(tailHeaders == nil, "Tail headers are not supported.")
Expand All @@ -77,9 +87,34 @@ final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHan
request.bodyStorage = .collected(buffer)
context.fireChannelRead(self.wrapInboundOut(request))
case .streamingBody(let stream):
stream.write(.end)
let done = stream.write(.end)
self.updateReadability(done, context: context)
}
self.requestState = .ready
}
}

func read(context: ChannelHandlerContext) {
if self.isWritable {
context.read()
} else {
self.hasReadPending = true
}
}

func updateReadability(_ future: EventLoopFuture<Void>, context: ChannelHandlerContext) {
self.isWritable = false
future.whenComplete { result in
self.isWritable = true
if self.hasReadPending {
self.hasReadPending = false
context.read()
}
switch result {
case .failure(let error):
self.logger.error("Could not write body: \(error)")
case .success: break
}
}
}
}
12 changes: 8 additions & 4 deletions Sources/Vapor/Server/HTTPServerResponseEncoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,21 @@ private struct ChannelResponseBodyStream: BodyStreamWriter {
let context: ChannelHandlerContext
let handler: HTTPServerResponseEncoder
let promise: EventLoopPromise<Void>?

var eventLoop: EventLoop {
return self.context.eventLoop
}

func write(_ result: BodyStreamResult) {
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
switch result {
case .buffer(let buffer):
self.context.writeAndFlush(self.handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
self.context.writeAndFlush(self.handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
case .end:
self.promise?.succeed(())
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: nil)
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: promise)
case .error(let error):
self.promise?.fail(error)
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: nil)
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: promise)
}
}
}
18 changes: 11 additions & 7 deletions Sources/Vapor/Utilities/FileIO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public struct FileIO {
return self.readFile(at: file) { new in
var new = new
data.writeBuffer(&new)
return self.eventLoop.makeSucceededFuture(())
}.map { data }
}

Expand All @@ -70,7 +71,11 @@ public struct FileIO {
/// - chunkSize: Maximum size for the file data chunks.
/// - onRead: Closure to be called sequentially for each file data chunk.
/// - returns: `Future` that will complete when the file read is finished.
public func readFile(at path: String, chunkSize: Int = NonBlockingFileIO.defaultChunkSize, onRead: @escaping (ByteBuffer) -> Void) -> EventLoopFuture<Void> {
public func readFile(
at path: String,
chunkSize: Int = NonBlockingFileIO.defaultChunkSize,
onRead: @escaping (ByteBuffer) -> EventLoopFuture<Void>
) -> EventLoopFuture<Void> {
guard
let attributes = try? FileManager.default.attributesOfItem(atPath: path),
let fileSize = attributes[.size] as? NSNumber
Expand Down Expand Up @@ -130,13 +135,13 @@ public struct FileIO {

response.body = .init(stream: { stream in
self.read(path: path, fileSize: fileSize, chunkSize: chunkSize) { chunk in
stream.write(.buffer(chunk))
return stream.write(.buffer(chunk))
}.whenComplete { result in
switch result {
case .failure(let error):
stream.write(.error(error))
stream.write(.error(error), promise: nil)
case .success:
stream.write(.end)
stream.write(.end, promise: nil)
}
}
}, count: fileSize)
Expand All @@ -146,12 +151,11 @@ public struct FileIO {

/// Private read method. `onRead` closure uses ByteBuffer and expects future return.
/// There may be use in publicizing this in the future for reads that must be async.
private func read(path: String, fileSize: Int, chunkSize: Int, onRead: @escaping (ByteBuffer) -> ()) -> EventLoopFuture<Void> {
private func read(path: String, fileSize: Int, chunkSize: Int, onRead: @escaping (ByteBuffer) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
do {
let fd = try NIOFileHandle(path: path)
let done = self.io.readChunked(fileHandle: fd, byteCount: fileSize, chunkSize: chunkSize, allocator: allocator, eventLoop: eventLoop) { chunk in
onRead(chunk)
return self.eventLoop.makeSucceededFuture(())
return onRead(chunk)
}
done.whenComplete { _ in
try? fd.close()
Expand Down
46 changes: 40 additions & 6 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,59 @@
version: 2

jobs:
linux:
macos:
macos:
xcode: "10.2.0"
steps:
- checkout
- run: HOMEBREW_NO_AUTO_UPDATE=1 brew install [email protected]
- run: swift build
- run: swift test
macos-release:
macos:
xcode: "10.2.0"
steps:
- checkout
- run: HOMEBREW_NO_AUTO_UPDATE=1 brew install [email protected]
- run: swift build -c release
bionic:
docker:
- image: swift:5.0
- image: swift:5.0-bionic
steps:
- checkout
- run: apt-get update; apt-get install -y libssl-dev zlib1g-dev
- run: swift build
- run: swift test
linux-release:
bionic-release:
docker:
- image: swift:5.0
- image: swift:5.0-bionic
steps:
- checkout
- run: apt-get update; apt-get install -y libssl-dev zlib1g-dev
- run: swift build -c release
# xenial:
# docker:
# - image: swift:5.0-xenial
# steps:
# - checkout
# - run: apt-get update; apt-get install -y libssl-dev zlib1g-dev
# - run: swift build
# - run: swift test
# xenial-release:
# docker:
# - image: swift:5.0-xenial
# steps:
# - checkout
# - run: apt-get update; apt-get install -y libssl-dev zlib1g-dev
# - run: swift build -c release

workflows:
version: 2
tests:
jobs:
- linux
- linux-release
- macos
- macos-release
- bionic
- bionic-release
# - xenial
# - xenial-release

0 comments on commit d07a918

Please sign in to comment.