Skip to content

Commit

Permalink
Added a StreamingResponseDecoder.
Browse files Browse the repository at this point in the history
  • Loading branch information
bmahler committed Mar 30, 2015
1 parent 8b0bba1 commit 6ac8eb1
Show file tree
Hide file tree
Showing 2 changed files with 323 additions and 0 deletions.
232 changes: 232 additions & 0 deletions 3rdparty/libprocess/src/decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <stout/foreach.hpp>
#include <stout/gzip.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>


Expand Down Expand Up @@ -469,6 +470,237 @@ class ResponseDecoder
};


// Provides a response decoder that returns 'PIPE' responses once
// the response headers are received, but before the body data
// is received. Callers are expected to read the body from the
// Pipe::Reader in the response.
//
// TODO(bmahler): Consolidate streaming and non-streaming response
// decoders.
class StreamingResponseDecoder
{
public:
StreamingResponseDecoder()
: failure(false), header(HEADER_FIELD), response(NULL)
{
settings.on_message_begin =
&StreamingResponseDecoder::on_message_begin;

#if !(HTTP_PARSER_VERSION_MAJOR >=2)
settings.on_path =
&StreamingResponseDecoder::on_path;
settings.on_fragment =
&StreamingResponseDecoder::on_fragment;
settings.on_query_string =
&StreamingResponseDecoder::on_query_string;
#endif

settings.on_url =
&StreamingResponseDecoder::on_url;
settings.on_header_field =
&StreamingResponseDecoder::on_header_field;
settings.on_header_value =
&StreamingResponseDecoder::on_header_value;
settings.on_headers_complete =
&StreamingResponseDecoder::on_headers_complete;
settings.on_body =
&StreamingResponseDecoder::on_body;
settings.on_message_complete =
&StreamingResponseDecoder::on_message_complete;

http_parser_init(&parser, HTTP_RESPONSE);

parser.data = this;
}

std::deque<http::Response*> decode(const char* data, size_t length)
{
size_t parsed = http_parser_execute(&parser, &settings, data, length);

if (parsed != length) {
// TODO(bmahler): joyent/http-parser exposes error reasons.
failure = true;

// If we're still writing the body, fail the writer!
if (writer.isSome()) {
http::Pipe::Writer writer_ = writer.get(); // Remove const.
writer_.fail("failed to decode body");
writer = None();
}
}

if (!responses.empty()) {
std::deque<http::Response*> result = responses;
responses.clear();
return result;
}

return std::deque<http::Response*>();
}

bool failed() const
{
return failure;
}

private:
static int on_message_begin(http_parser* p)
{
StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;

CHECK(!decoder->failure);

decoder->header = HEADER_FIELD;
decoder->field.clear();
decoder->value.clear();

CHECK(decoder->response == NULL);
CHECK(decoder->writer.isNone());

decoder->response = new http::Response();
decoder->response->type = http::Response::PIPE;
decoder->writer = None();

return 0;
}

#if !(HTTP_PARSER_VERSION_MAJOR >= 2)
static int on_path(http_parser* p, const char* data, size_t length)
{
return 0;
}

static int on_query_string(http_parser* p, const char* data, size_t length)
{
return 0;
}

static int on_fragment(http_parser* p, const char* data, size_t length)
{
return 0;
}
#endif // !(HTTP_PARSER_VERSION_MAJOR >= 2)

static int on_url(http_parser* p, const char* data, size_t length)
{
return 0;
}

static int on_header_field(http_parser* p, const char* data, size_t length)
{
StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;

CHECK_NOTNULL(decoder->response);

if (decoder->header != HEADER_FIELD) {
decoder->response->headers[decoder->field] = decoder->value;
decoder->field.clear();
decoder->value.clear();
}

decoder->field.append(data, length);
decoder->header = HEADER_FIELD;

return 0;
}

static int on_header_value(http_parser* p, const char* data, size_t length)
{
StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;

CHECK_NOTNULL(decoder->response);

decoder->value.append(data, length);
decoder->header = HEADER_VALUE;
return 0;
}

static int on_headers_complete(http_parser* p)
{
StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;

CHECK_NOTNULL(decoder->response);

// Add final header.
decoder->response->headers[decoder->field] = decoder->value;
decoder->field.clear();
decoder->value.clear();

// Get the response status string.
if (http::statuses.contains(decoder->parser.status_code)) {
decoder->response->status = http::statuses[decoder->parser.status_code];
} else {
decoder->failure = true;
return 1;
}

// We cannot provide streaming gzip decompression!
Option<std::string> encoding =
decoder->response->headers.get("Content-Encoding");
if (encoding.isSome() && encoding.get() == "gzip") {
decoder->failure = true;
return 1;
}

CHECK(decoder->writer.isNone());

http::Pipe pipe;
decoder->writer = pipe.writer();
decoder->response->reader = pipe.reader();

// Send the response to the caller, but keep a Pipe::Writer for
// streaming the body content into the response.
decoder->responses.push_back(decoder->response);
decoder->response = NULL;

return 0;
}

static int on_body(http_parser* p, const char* data, size_t length)
{
StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;

CHECK_SOME(decoder->writer);

http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
writer.write(std::string(data, length));

return 0;
}

static int on_message_complete(http_parser* p)
{
StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;

CHECK_SOME(decoder->writer);

http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
writer.close();

decoder->writer = None();

return 0;
}

bool failure;

http_parser parser;
http_parser_settings settings;

enum {
HEADER_FIELD,
HEADER_VALUE
} header;

std::string field;
std::string value;

http::Response* response;
Option<http::Pipe::Writer> writer;

std::deque<http::Response*> responses;
};

} // namespace process {

#endif // __DECODER_HPP__
91 changes: 91 additions & 0 deletions 3rdparty/libprocess/src/tests/decoder_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,94 @@ TEST(Decoder, Response)

delete response;
}


TEST(Decoder, StreamingResponse)
{
StreamingResponseDecoder decoder;

const string& headers =
"HTTP/1.1 200 OK\r\n"
"Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: 2\r\n"
"\r\n";

const string& body = "hi";

deque<Response*> responses = decoder.decode(headers.data(), headers.length());
ASSERT_FALSE(decoder.failed());
ASSERT_EQ(1, responses.size());

Response* response = responses[0];

EXPECT_EQ("200 OK", response->status);
EXPECT_EQ(3, response->headers.size());

ASSERT_EQ(Response::PIPE, response->type);
ASSERT_SOME(response->reader);

http::Pipe::Reader reader = response->reader.get();
Future<string> read = reader.read();
EXPECT_TRUE(read.isPending());

decoder.decode(body.data(), body.length());

// Feeding EOF to the decoder should be ok.
decoder.decode("", 0);

EXPECT_TRUE(read.isReady());
EXPECT_EQ("hi", read.get());

// Response should be complete.
read = reader.read();
EXPECT_TRUE(read.isReady());
EXPECT_EQ("", read.get());
}


TEST(Decoder, StreamingResponseFailure)
{
StreamingResponseDecoder decoder;

const string& headers =
"HTTP/1.1 200 OK\r\n"
"Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: 2\r\n"
"\r\n";

// The body is shorter than the content length!
const string& body = "1";

deque<Response*> responses = decoder.decode(headers.data(), headers.length());
ASSERT_FALSE(decoder.failed());
ASSERT_EQ(1, responses.size());

Response* response = responses[0];

EXPECT_EQ("200 OK", response->status);
EXPECT_EQ(3, response->headers.size());

ASSERT_EQ(Response::PIPE, response->type);
ASSERT_SOME(response->reader);

http::Pipe::Reader reader = response->reader.get();
Future<string> read = reader.read();
EXPECT_TRUE(read.isPending());

decoder.decode(body.data(), body.length());

EXPECT_TRUE(read.isReady());
EXPECT_EQ("1", read.get());

// Body is not yet complete.
read = reader.read();
EXPECT_TRUE(read.isPending());

// Feeding EOF to the decoder should trigger a failure!
decoder.decode("", 0);

EXPECT_TRUE(read.isFailed());
EXPECT_EQ("failed to decode body", read.failure());
}

0 comments on commit 6ac8eb1

Please sign in to comment.