diff --git a/PubSub.cpp b/PubSub.cpp index c2ddd9dd1354..1092b2a3ca77 100644 --- a/PubSub.cpp +++ b/PubSub.cpp @@ -40,39 +40,30 @@ std::shared_ptr Publisher::Subscriber::getNext() { return nullptr; } -std::vector> -Publisher::Subscriber::getPending() { - std::vector> items; - - auto rlock = publisher_->state_.rlock(); - for (auto& item : rlock->items) { - if (item->serial > serial_) { - serial_ = item->serial; - items.push_back(item); +void Publisher::Subscriber::getPending( + std::vector>& pending) { + { + auto rlock = publisher_->state_.rlock(); + for (auto& item : rlock->items) { + auto serial = item->serial; + if (serial > serial_) { + pending.emplace_back(item); + serial_ = serial; + } } } - - return items; -} - -template -void moveVec(Vec& dest, Vec&& src) { - std::move(src.begin(), src.end(), std::back_inserter(dest)); } -std::vector> getPending( +void getPending( + std::vector>& items, const std::shared_ptr& sub1, const std::shared_ptr& sub2) { - std::vector> items; - if (sub1) { - moveVec(items, sub1->getPending()); + sub1->getPending(items); } if (sub2) { - moveVec(items, sub2->getPending()); + sub2->getPending(items); } - - return items; } std::shared_ptr Publisher::subscribe(Notifier notify) { diff --git a/PubSub.h b/PubSub.h index bfe57fa216f3..59a099b7d5a9 100644 --- a/PubSub.h +++ b/PubSub.h @@ -47,7 +47,7 @@ class Publisher : public std::enable_shared_from_this { // Returns all as yet unseen published items for this subscriber. // Equivalent to calling getNext in a loop and sticking the results // into a vector. - std::vector> getPending(); + void getPending(std::vector>& pending); inline uint64_t getSerial() const { return serial_; @@ -91,7 +91,8 @@ class Publisher : public std::enable_shared_from_this { // Equivalent to calling getPending on up to two Subscriber and // joining the resultant vectors together. -std::vector> getPending( +void getPending( + std::vector>& pending, const std::shared_ptr& sub1, const std::shared_ptr& sub2); } diff --git a/cmds/trigger.cpp b/cmds/trigger.cpp index cda9cfed2823..c8356622e9a2 100644 --- a/cmds/trigger.cpp +++ b/cmds/trigger.cpp @@ -31,6 +31,7 @@ bool watchman_trigger_command::waitNoIntr() { } void watchman_trigger_command::run(const std::shared_ptr& root) { + std::vector> pending; w_set_thread_name( "trigger %s %s", triggername.c_str(), root->root_path.c_str()); @@ -45,10 +46,17 @@ void watchman_trigger_command::run(const std::shared_ptr& root) { break; } while (ping_->testAndClear()) { - while (auto item = subscriber_->getNext()) { - if (!item->payload.get_default("settled")) { - continue; + pending.clear(); + subscriber_->getPending(pending); + bool seenSettle = false; + for (auto& item : pending) { + if (item->payload.get_default("settled")) { + seenSettle = true; + break; } + } + + if (seenSettle) { if (!maybeSpawn(root)) { continue; } diff --git a/listener.cpp b/listener.cpp index d4213807066b..174304cdcad5 100644 --- a/listener.cpp +++ b/listener.cpp @@ -125,6 +125,9 @@ static void client_thread(std::shared_ptr client) { struct watchman_event_poll pfd[2]; json_error_t jerr; bool send_ok = true; + // Keep a persistent vector around so that we can avoid allocating + // and releasing heap memory when we collect items from the publisher + std::vector> pending; client->stm->setNonBlock(true); w_set_thread_name( @@ -178,8 +181,9 @@ static void client_thread(std::shared_ptr client) { if (pfd[1].ready) { while (client->ping->testAndClear()) { // Enqueue refs to pending log payloads - auto items = watchman::getPending(client->debugSub, client->errorSub); - for (auto& item : items) { + pending.clear(); + getPending(pending, client->debugSub, client->errorSub); + for (auto& item : pending) { client->enqueueResponse(json_ref(item->payload), false); } @@ -196,12 +200,10 @@ static void client_thread(std::shared_ptr client) { watchman::log( watchman::DBG, "consider fan out sub ", sub->name, "\n"); - while (true) { - auto item = subStream->getNext(); - if (!item) { - break; - } - + pending.clear(); + subStream->getPending(pending); + bool seenSettle = false; + for (auto& item : pending) { auto dumped = json_dumps(item->payload, 0); watchman::log( watchman::DBG, @@ -250,10 +252,14 @@ static void client_thread(std::shared_ptr client) { } if (item->payload.get_default("settled")) { - sub->processSubscription(); + seenSettle = true; continue; } } + + if (seenSettle) { + sub->processSubscription(); + } } for (auto& name : subsToDelete) { diff --git a/log.cpp b/log.cpp index 6a4b4d016070..58e885d11e76 100644 --- a/log.cpp +++ b/log.cpp @@ -157,7 +157,8 @@ void Log::setStdErrLoggingLevel(enum LogLevel level) { } void Log::doLogToStdErr() { - auto items = getPending(errorSub_, debugSub_); + std::vector> items; + getPending(items, errorSub_, debugSub_); bool fatal = false; static w_string kFatal("fatal");