Skip to content

Commit

Permalink
improve perf for subscriptions and triggers
Browse files Browse the repository at this point in the history
Summary:
We noticed that client threads could apparently spin calling PubSub::getNext.

Looking at the `log-level debug` output while this is happening in one of our big
repos shows that the trigger assessment is very aggressively calling getNext in
a loop, closely followed by the approximately 10 subscriptions that are also
associated with this repo.  These dispatches have mostly nothing to do in
these situations; the wakeups are just due to the settle notifications that
are exponentially backing off.  So if you have a default or relatively small
settle period configured in the root, you'll see these run quite hot for a while
until the sleep interval backs off sufficiently.

To help avoid this being so busy, rather than calling getNext() in a loop we
can call the getPending() method to return the vector of all pending items
in a chunk (which only requires one lock call to get all items).

Then we can process that chunk and collect whether the settle payload appears,
dispatching the trigger or subscription just once for the batch of items.

Reviewed By: sid0

Differential Revision: D4302635

fbshipit-source-id: 7f187f0578d17f1dc2a932e1ff8337fc0e3ed514
  • Loading branch information
wez authored and Facebook Github Bot committed Dec 13, 2016
1 parent 8fbd9f0 commit 3cc6b8d
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 38 deletions.
37 changes: 14 additions & 23 deletions PubSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,30 @@ std::shared_ptr<const Publisher::Item> Publisher::Subscriber::getNext() {
return nullptr;
}

std::vector<std::shared_ptr<const Publisher::Item>>
Publisher::Subscriber::getPending() {
std::vector<std::shared_ptr<const Publisher::Item>> items;

auto rlock = publisher_->state_.rlock();
for (auto& item : rlock->items) {
if (item->serial > serial_) {
serial_ = item->serial;
items.push_back(item);
void Publisher::Subscriber::getPending(
std::vector<std::shared_ptr<const Item>>& pending) {
{
auto rlock = publisher_->state_.rlock();
for (auto& item : rlock->items) {
auto serial = item->serial;
if (serial > serial_) {
pending.emplace_back(item);
serial_ = serial;
}
}
}

return items;
}

template <typename Vec>
void moveVec(Vec& dest, Vec&& src) {
std::move(src.begin(), src.end(), std::back_inserter(dest));
}

std::vector<std::shared_ptr<const Publisher::Item>> getPending(
void getPending(
std::vector<std::shared_ptr<const Publisher::Item>>& items,
const std::shared_ptr<Publisher::Subscriber>& sub1,
const std::shared_ptr<Publisher::Subscriber>& sub2) {
std::vector<std::shared_ptr<const Publisher::Item>> items;

if (sub1) {
moveVec(items, sub1->getPending());
sub1->getPending(items);
}
if (sub2) {
moveVec(items, sub2->getPending());
sub2->getPending(items);
}

return items;
}

std::shared_ptr<Publisher::Subscriber> Publisher::subscribe(Notifier notify) {
Expand Down
5 changes: 3 additions & 2 deletions PubSub.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Publisher : public std::enable_shared_from_this<Publisher> {
// Returns all as yet unseen published items for this subscriber.
// Equivalent to calling getNext in a loop and sticking the results
// into a vector.
std::vector<std::shared_ptr<const Item>> getPending();
void getPending(std::vector<std::shared_ptr<const Item>>& pending);

inline uint64_t getSerial() const {
return serial_;
Expand Down Expand Up @@ -91,7 +91,8 @@ class Publisher : public std::enable_shared_from_this<Publisher> {

// Equivalent to calling getPending on up to two Subscriber and
// joining the resultant vectors together.
std::vector<std::shared_ptr<const Publisher::Item>> getPending(
void getPending(
std::vector<std::shared_ptr<const Publisher::Item>>& pending,
const std::shared_ptr<Publisher::Subscriber>& sub1,
const std::shared_ptr<Publisher::Subscriber>& sub2);
}
14 changes: 11 additions & 3 deletions cmds/trigger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ bool watchman_trigger_command::waitNoIntr() {
}

void watchman_trigger_command::run(const std::shared_ptr<w_root_t>& root) {
std::vector<std::shared_ptr<const watchman::Publisher::Item>> pending;
w_set_thread_name(
"trigger %s %s", triggername.c_str(), root->root_path.c_str());

Expand All @@ -45,10 +46,17 @@ void watchman_trigger_command::run(const std::shared_ptr<w_root_t>& root) {
break;
}
while (ping_->testAndClear()) {
while (auto item = subscriber_->getNext()) {
if (!item->payload.get_default("settled")) {
continue;
pending.clear();
subscriber_->getPending(pending);
bool seenSettle = false;
for (auto& item : pending) {
if (item->payload.get_default("settled")) {
seenSettle = true;
break;
}
}

if (seenSettle) {
if (!maybeSpawn(root)) {
continue;
}
Expand Down
24 changes: 15 additions & 9 deletions listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ static void client_thread(std::shared_ptr<watchman_client> client) {
struct watchman_event_poll pfd[2];
json_error_t jerr;
bool send_ok = true;
// Keep a persistent vector around so that we can avoid allocating
// and releasing heap memory when we collect items from the publisher
std::vector<std::shared_ptr<const watchman::Publisher::Item>> pending;

client->stm->setNonBlock(true);
w_set_thread_name(
Expand Down Expand Up @@ -178,8 +181,9 @@ static void client_thread(std::shared_ptr<watchman_client> client) {
if (pfd[1].ready) {
while (client->ping->testAndClear()) {
// Enqueue refs to pending log payloads
auto items = watchman::getPending(client->debugSub, client->errorSub);
for (auto& item : items) {
pending.clear();
getPending(pending, client->debugSub, client->errorSub);
for (auto& item : pending) {
client->enqueueResponse(json_ref(item->payload), false);
}

Expand All @@ -196,12 +200,10 @@ static void client_thread(std::shared_ptr<watchman_client> client) {
watchman::log(
watchman::DBG, "consider fan out sub ", sub->name, "\n");

while (true) {
auto item = subStream->getNext();
if (!item) {
break;
}

pending.clear();
subStream->getPending(pending);
bool seenSettle = false;
for (auto& item : pending) {
auto dumped = json_dumps(item->payload, 0);
watchman::log(
watchman::DBG,
Expand Down Expand Up @@ -250,10 +252,14 @@ static void client_thread(std::shared_ptr<watchman_client> client) {
}

if (item->payload.get_default("settled")) {
sub->processSubscription();
seenSettle = true;
continue;
}
}

if (seenSettle) {
sub->processSubscription();
}
}

for (auto& name : subsToDelete) {
Expand Down
3 changes: 2 additions & 1 deletion log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ void Log::setStdErrLoggingLevel(enum LogLevel level) {
}

void Log::doLogToStdErr() {
auto items = getPending(errorSub_, debugSub_);
std::vector<std::shared_ptr<const watchman::Publisher::Item>> items;
getPending(items, errorSub_, debugSub_);

bool fatal = false;
static w_string kFatal("fatal");
Expand Down

0 comments on commit 3cc6b8d

Please sign in to comment.