Skip to content

Commit

Permalink
Several changes:
Browse files Browse the repository at this point in the history
    - WResource: added static resource concurrency test
    - WResource: reorganized resource/continuations concurrency
    - WResource: allow aborted requests to be handled, and allow an individual request to be continued
    - Http::Client: allow response data to be processed incrementally
  • Loading branch information
Koen Deforche committed Nov 18, 2014
1 parent eaad228 commit 637b626
Show file tree
Hide file tree
Showing 14 changed files with 388 additions and 201 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ IF(BOOST_WT_MT_FOUND)
IF(MULTI_THREADED)
MESSAGE("** Enabling multi threading.")
SET(MULTI_THREADED_BUILD true)

ADD_DEFINITIONS(-DWT_THREADED -D_REENTRANT -DBOOST_SPIRIT_THREADSAFE)
SET(WT_THREADED true)
ADD_DEFINITIONS(-D_REENTRANT -DBOOST_SPIRIT_THREADSAFE)
ELSE(MULTI_THREADED)
MESSAGE("** Disabling multi threading.")
SET(MULTI_THREADED_BUILD false)
Expand Down
1 change: 1 addition & 0 deletions WConfig.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#cmakedefine WT_NO_STD_WSTRING
#cmakedefine WT_USE_OPENGL
#cmakedefine WT_DEBUG_ENABLED
#cmakedefine WT_THREADED

#cmakedefine WT_USE_BOOST_SIGNALS
#cmakedefine WT_USE_BOOST_SIGNALS2
Expand Down
12 changes: 6 additions & 6 deletions src/Wt/Http/Response
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ public:
WT_BOSTREAM& bout() { return out(); }

private:
WResource *resource_;
WebResponse *response_;
ResponseContinuation *continuation_;
WT_BOSTREAM *out_;
bool headersCommitted_;
WResource *resource_;
WebResponse *response_;
ResponseContinuationPtr continuation_;
WT_BOSTREAM *out_;
bool headersCommitted_;

Response(WResource *resource, WebResponse *response,
ResponseContinuation *continuation);
ResponseContinuationPtr continuation);
Response(WResource *resource, WT_BOSTREAM& out);

friend class Wt::WResource;
Expand Down
14 changes: 7 additions & 7 deletions src/Wt/Http/Response.C
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ void Response::addHeader(const std::string& name, const std::string& value)

ResponseContinuation *Response::createContinuation()
{
if (!continuation_)
continuation_ = new ResponseContinuation(resource_, response_);
else
if (!continuation_) {
ResponseContinuation *c = new ResponseContinuation(resource_, response_);
continuation_ = resource_->addContinuation(c);
} else
continuation_->resource_ = resource_;

return continuation_;
return continuation_.get();
}

ResponseContinuation *Response::continuation() const
{
if (continuation_ && continuation_->resource_)
return continuation_;
return continuation_.get();
else
return 0;
}
Expand Down Expand Up @@ -126,7 +127,7 @@ WT_BOSTREAM& Response::out()
}

Response::Response(WResource *resource, WebResponse *response,
ResponseContinuation *continuation)
ResponseContinuationPtr continuation)
: resource_(resource),
response_(response),
continuation_(continuation),
Expand All @@ -137,7 +138,6 @@ Response::Response(WResource *resource, WebResponse *response,
Response::Response(WResource *resource, WT_BOSTREAM& out)
: resource_(resource),
response_(0),
continuation_(0),
out_(&out),
headersCommitted_(false)
{ }
Expand Down
31 changes: 21 additions & 10 deletions src/Wt/Http/ResponseContinuation
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
#include <Wt/WGlobal>
#include <boost/any.hpp>

#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>

namespace boost {
class recursive_mutex;
}

namespace Wt {

class WResource;
Expand Down Expand Up @@ -42,9 +49,12 @@ namespace Wt {
*
* \ingroup http
*/
class WT_API ResponseContinuation
class WT_API ResponseContinuation
: public boost::enable_shared_from_this<ResponseContinuation>
{
public:
~ResponseContinuation();

/*! \brief Set data associated with the continuation.
*
* You could do this to keep track of the state of sending the data
Expand Down Expand Up @@ -83,30 +93,31 @@ public:
*/
bool isWaitingForMoreData() const { return waiting_; }

/*
* Only useful for server-side events: force continuation.
*/
void doContinue(WebWriteEvent event);

private:
#ifdef WT_THREADED
boost::shared_ptr<boost::recursive_mutex> mutex_;
#endif

WResource *resource_;
WebResponse *response_;
boost::any data_;
bool waiting_, readyToContinue_, needsContinue_;
bool waiting_, readyToContinue_;

ResponseContinuation(WResource *resource, WebResponse *response);
ResponseContinuation(const ResponseContinuation&);
~ResponseContinuation();

void cancel();
void flagReadyToContinue(WebWriteEvent);
void cancel(bool resourceIsBeingDeleted);
void readyToContinue(WebWriteEvent writeResult);

WebResponse *response() { return response_; }

friend class Wt::WResource;
friend class Wt::WebSession;
friend class Response;
};

typedef boost::shared_ptr<ResponseContinuation> ResponseContinuationPtr;

}
}

Expand Down
129 changes: 80 additions & 49 deletions src/Wt/Http/ResponseContinuation.C
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

#include "WebRequest.h"

#ifdef WT_THREADED
#include <boost/thread.hpp>
#endif

namespace Wt {

LOGGER("Http::ResponseContinuation");
Expand All @@ -23,79 +27,106 @@ void ResponseContinuation::setData(const boost::any& data)

void ResponseContinuation::haveMoreData()
{
if (isWaitingForMoreData())
doContinue(WriteCompleted);
WResource::UseLock useLock;
WResource *resource = 0;

{
#ifdef WT_THREADED
boost::recursive_mutex::scoped_lock lock(*mutex_);
#endif // WT_THREADED

if (!useLock.use(resource_))
return;

if (waiting_) {
waiting_ = false;
if (readyToContinue_) {
readyToContinue_ = false;
resource = resource_;
resource_ = 0;
}
}
}

if (resource)
resource->doContinue(shared_from_this());
}

void ResponseContinuation::doContinue(WebWriteEvent event)
void ResponseContinuation::readyToContinue(WebWriteEvent event)
{
if (event == WriteError) {
LOG_ERROR("WriteError");
cancel();
cancel(false);
return;
}

/*
* Although we are waiting for more data, we're not yet ready to continue
* We'll remember to continue as soon as we become ready.
*/
if (waiting_ && !readyToContinue_) {
needsContinue_ = true;
return;
}
WResource::UseLock useLock;
WResource *resource = 0;

{
#ifdef WT_THREADED
boost::recursive_mutex::scoped_lock lock(*mutex_);
#endif // WT_THREADED

waiting_ = false;
needsContinue_ = false;
if (!useLock.use(resource_))
return;

// We are certain that the continuation is still "alive" because it is
// protected by a mutex, and thus a simultaneous change with
// WebResponse::flush() is not possible: ResponseContinuation::cancel(),
// called from beingDeleted() and protected by the same mutex
// will not be called while we are here.
resource_->doContinue(this);
readyToContinue_ = true;

if (!waiting_) {
readyToContinue_ = false;
resource = resource_;
resource_ = 0;
}
}

if (resource)
resource->doContinue(shared_from_this());
}

ResponseContinuation::ResponseContinuation(WResource *resource,
WebResponse *response)
: resource_(resource),
:
#ifdef WT_THREADED
mutex_(resource->mutex_),
#endif
resource_(resource),
response_(response),
waiting_(false),
readyToContinue_(false),
needsContinue_(false)
{
resource_->continuations_.push_back(this);
}
readyToContinue_(false)
{ }

void ResponseContinuation::cancel()
void ResponseContinuation::cancel(bool resourceIsBeingDeleted)
{
Http::Request request(*response_, this);
resource_->handleAbort(request);
resource_->removeContinuation(this);

response_->flush(WebResponse::ResponseDone);
WResource::UseLock useLock;
WResource *resource = 0;

{
#ifdef WT_THREADED
boost::recursive_mutex::scoped_lock lock(*mutex_);
#endif // WT_THREADED

if (resourceIsBeingDeleted) {
if (!resource_)
return;
} else if (!useLock.use(resource_))
return;

resource = resource_;
resource_ = 0;
}

delete this;
if (resource) {
Http::Request request(*response_, this);
resource->handleAbort(request);
resource->removeContinuation(shared_from_this());
response_->flush(WebResponse::ResponseDone);
}
}

void ResponseContinuation::waitForMoreData()
{
waiting_ = true;
needsContinue_ = false;
readyToContinue_ = false;
}

void ResponseContinuation::flagReadyToContinue(WebWriteEvent event)
{
if (event == WriteError) {
LOG_ERROR("WriteError");
cancel();
return;
}

readyToContinue_ = true;

if (needsContinue_)
doContinue(event);
}

ResponseContinuation::~ResponseContinuation()
Expand Down
3 changes: 1 addition & 2 deletions src/Wt/WAbstractMedia.C
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ void WAbstractMedia::setFormData(const FormData& formData)
try {
readyState_ = intToReadyState(boost::lexical_cast<int>(attributes[5]));
} catch (const std::exception& e) {
throw WException("WAbstractMedia: error parsing: " + formData.values[0]
+ ": " + e.what());
readyState_ = HaveNothing;
}
} else
throw WException("WAbstractMedia: error parsing: " + formData.values[0]);
Expand Down
2 changes: 1 addition & 1 deletion src/Wt/WBoostAny.C
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <map>
#include <boost/lexical_cast.hpp>
#include <stdio.h>

#include "Wt/WConfig.h"
#ifdef WT_THREADED
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
Expand Down
Loading

0 comments on commit 637b626

Please sign in to comment.