Skip to content

Commit

Permalink
Merge pull request pistacheio#405 from knowledge4igor/fix_data_races_…
Browse files Browse the repository at this point in the history
…on_client_destruction

Fix data races in Client desctruction
  • Loading branch information
dennisjenkins75 authored Jan 4, 2019
2 parents 3f29899 + 347378e commit 0a95085
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 1 deletion.
1 change: 1 addition & 0 deletions include/pistache/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ class Client {

Lock queuesLock;
std::unordered_map<std::string, MPMCQueue<std::shared_ptr<Connection::RequestData>, 2048>> requestsQueues;
bool stopProcessPequestsQueues;

RequestBuilder prepareRequest(const std::string& resource, Http::Method method);

Expand Down
7 changes: 7 additions & 0 deletions src/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,11 @@ Client::Client()
, ioIndex(0)
, queuesLock()
, requestsQueues()
, stopProcessPequestsQueues(false)
{ }

Client::~Client() {
assert(stopProcessPequestsQueues == true && "You must explicitly call shutdown method of Client object");
}

Client::Options
Expand All @@ -726,6 +728,8 @@ Client::init(const Client::Options& options) {
void
Client::shutdown() {
reactor_->shutdown();
Guard guard(queuesLock);
stopProcessPequestsQueues = true;
}

RequestBuilder
Expand Down Expand Up @@ -821,6 +825,9 @@ void
Client::processRequestQueue() {
Guard guard(queuesLock);

if (stopProcessPequestsQueues)
return;

for (auto& queues: requestsQueues) {
for (;;) {
const auto& domain = queues.first;
Expand Down
48 changes: 47 additions & 1 deletion tests/http_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ TEST(http_client_test, timeout_reject)
ASSERT_TRUE(is_reject);
}

TEST(http_client_test, one_client_with_multiple_requests_and_one_connection_per_host)
TEST(http_client_test, one_client_with_multiple_requests_and_one_connection_per_host_and_two_threads)
{
const Pistache::Address address("localhost", Pistache::Port(0));

Expand Down Expand Up @@ -248,3 +248,49 @@ TEST(http_client_test, one_client_with_multiple_requests_and_one_connection_per_

ASSERT_TRUE(response_counter == RESPONSE_SIZE);
}

TEST(http_client_test, one_client_with_multiple_requests_and_two_connections_per_host_and_one_thread)
{
const Pistache::Address address("localhost", Pistache::Port(0));

Http::Endpoint server(address);
auto flags = Tcp::Options::InstallSignalHandler | Tcp::Options::ReuseAddr;
auto server_opts = Http::Endpoint::options().flags(flags);
server.init(server_opts);
server.setHandler(Http::make_handler<HelloHandler>());
server.serveThreaded();

const std::string server_address = "localhost:" + server.getPort().toString();
std::cout << "Server address: " << server_address << "\n";

Http::Client client;
auto opts = Http::Client::options().maxConnectionsPerHost(2).threads(1);
client.init(opts);

std::vector<Async::Promise<Http::Response>> responses;
const int RESPONSE_SIZE = 6;
std::atomic<int> response_counter(0);

auto rb = client.get(server_address);
for (int i = 0; i < RESPONSE_SIZE; ++i)
{
auto response = rb.send();
response.then([&](Http::Response rsp)
{
if (rsp.code() == Http::Code::Ok)
++response_counter;
},
Async::IgnoreException);
responses.push_back(std::move(response));
}

auto sync = Async::whenAll(responses.begin(), responses.end());
Async::Barrier<std::vector<Http::Response>> barrier(sync);

barrier.wait_for(std::chrono::seconds(5));

server.shutdown();
client.shutdown();

ASSERT_TRUE(response_counter == RESPONSE_SIZE);
}

0 comments on commit 0a95085

Please sign in to comment.