Skip to content

Commit

Permalink
Add the functionality to merge and unmerge MessageLoopTaskQueues (flu…
Browse files Browse the repository at this point in the history
…tter#9436)

- Add the functionality to merge and unmerge MessageLoopTaskQueues

This introduces a notion of a "owning" and "subsumed" queue ids.
Owning queue will take care of the tasks submitted to both that and it's
subsumed queue.

- The tasks submitted still maintain the queue affinity
- Same for the task observers

- Also adds MergedQueuesRunner which grabs both the locks owner
  and subsumed queues in RAII fashion.

- Also use task queue id to verify if we are running
  in the same thread.

- This is to enable merging the backed message loop task
  queues to enable dynamic thread merging in IOS.
  • Loading branch information
iskakaushik authored Jul 12, 2019
1 parent 8abe85b commit 379028a
Show file tree
Hide file tree
Showing 12 changed files with 522 additions and 41 deletions.
2 changes: 2 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ FILE: ../../../flutter/fml/memory/weak_ptr.h
FILE: ../../../flutter/fml/memory/weak_ptr_internal.cc
FILE: ../../../flutter/fml/memory/weak_ptr_internal.h
FILE: ../../../flutter/fml/memory/weak_ptr_unittest.cc
FILE: ../../../flutter/fml/merged_queues_runner.cc
FILE: ../../../flutter/fml/message.cc
FILE: ../../../flutter/fml/message.h
FILE: ../../../flutter/fml/message_loop.cc
Expand All @@ -133,6 +134,7 @@ FILE: ../../../flutter/fml/message_loop_impl.h
FILE: ../../../flutter/fml/message_loop_task_queues.cc
FILE: ../../../flutter/fml/message_loop_task_queues.h
FILE: ../../../flutter/fml/message_loop_task_queues_benchmark.cc
FILE: ../../../flutter/fml/message_loop_task_queues_merge_unmerge_unittests.cc
FILE: ../../../flutter/fml/message_loop_task_queues_unittests.cc
FILE: ../../../flutter/fml/message_loop_unittests.cc
FILE: ../../../flutter/fml/message_unittests.cc
Expand Down
2 changes: 2 additions & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ source_set("fml") {
"memory/weak_ptr.h",
"memory/weak_ptr_internal.cc",
"memory/weak_ptr_internal.h",
"merged_queues_runner.cc",
"message.cc",
"message.h",
"message_loop.cc",
Expand Down Expand Up @@ -199,6 +200,7 @@ executable("fml_unittests") {
"file_unittest.cc",
"memory/ref_counted_unittest.cc",
"memory/weak_ptr_unittest.cc",
"message_loop_task_queues_merge_unmerge_unittests.cc",
"message_loop_task_queues_unittests.cc",
"message_loop_unittests.cc",
"message_unittests.cc",
Expand Down
58 changes: 58 additions & 0 deletions fml/merged_queues_runner.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queues.h"

namespace fml {

// RAII class for managing merged locks.
class MessageLoopTaskQueues::MergedQueuesRunner {
public:
// TODO (kaushikiska): refactor mutexes out side of MessageLoopTaskQueues
// for better DI.
MergedQueuesRunner(MessageLoopTaskQueues& task_queues,
TaskQueueId owner,
MutexType type = MutexType::kTasks)
: owner_(owner),
subsumed_(task_queues_._kUnmerged),
task_queues_(task_queues),
type_(type) {
task_queues_.GetMutex(owner, type).lock();
subsumed_ = task_queues_.owner_to_subsumed_[owner];
if (isMerged(subsumed_)) {
task_queues_.GetMutex(subsumed_, type).lock();
}
}

// First invokes on owner and then subsumed (if present).
void InvokeMerged(std::function<void(const TaskQueueId)> closure) {
closure(owner_);
if (isMerged(subsumed_)) {
closure(subsumed_);
}
}

~MergedQueuesRunner() {
if (isMerged(subsumed_)) {
task_queues_.GetMutex(subsumed_, type_).unlock();
}
task_queues_.GetMutex(owner_, type_).unlock();
}

private:
bool isMerged(TaskQueueId queue_id) {
return queue_id != MessageLoopTaskQueues::_kUnmerged;
}

const TaskQueueId owner_;
TaskQueueId subsumed_;
MessageLoopTaskQueues& task_queues_;
const MutexType type_;

FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MergedQueuesRunner);
};

} // namespace fml
4 changes: 4 additions & 0 deletions fml/message_loop_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,8 @@ void MessageLoopImpl::RunSingleExpiredTaskNow() {
FlushTasks(FlushType::kSingle);
}

TaskQueueId MessageLoopImpl::GetTaskQueueId() const {
return queue_id_;
}

} // namespace fml
2 changes: 2 additions & 0 deletions fml/message_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class MessageLoopImpl : public Wakeable,

void DoTerminate();

virtual TaskQueueId GetTaskQueueId() const;

void SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other);

protected:
Expand Down
190 changes: 170 additions & 20 deletions fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queues.h"
#include "flutter/fml/merged_queues_runner.cc"
#include "flutter/fml/message_loop_impl.h"

namespace fml {

std::mutex MessageLoopTaskQueues::creation_mutex_;
const size_t TaskQueueId::kUnmerged = ULONG_MAX;
const TaskQueueId MessageLoopTaskQueues::_kUnmerged =
TaskQueueId(TaskQueueId::kUnmerged);
fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;

fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
Expand All @@ -22,7 +26,7 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {

TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
std::scoped_lock creation(queue_meta_mutex_);
TaskQueueId loop_id = task_queue_id_counter_;
TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
++task_queue_id_counter_;

observers_mutexes_.push_back(std::make_unique<std::mutex>());
Expand All @@ -33,6 +37,9 @@ TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
delayed_tasks_.push_back(DelayedTaskQueue());
wakeables_.push_back(NULL);

owner_to_subsumed_.push_back(_kUnmerged);
subsumed_to_owner_.push_back(_kUnmerged);

return loop_id;
}

Expand All @@ -42,8 +49,9 @@ MessageLoopTaskQueues::MessageLoopTaskQueues()
MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;

void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
delayed_tasks_[queue_id] = {};
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
merged_tasks.InvokeMerged(
[&](TaskQueueId queue_id) { delayed_tasks_[queue_id] = {}; });
}

void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
Expand All @@ -52,39 +60,47 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
size_t order = order_++;
delayed_tasks_[queue_id].push({order, std::move(task), target_time});
WakeUp(queue_id, delayed_tasks_[queue_id].top().GetTargetTime());
TaskQueueId loop_to_wake = queue_id;
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
loop_to_wake = subsumed_to_owner_[queue_id];
}
WakeUp(loop_to_wake, delayed_tasks_[queue_id].top().GetTargetTime());
}

bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
return !delayed_tasks_[queue_id].empty();
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
return HasPendingTasksUnlocked(queue_id);
}

void MessageLoopTaskQueues::GetTasksToRunNow(
TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);

if (!HasPendingTasksUnlocked(queue_id)) {
return;
}

const auto now = fml::TimePoint::Now();
DelayedTaskQueue& tasks = delayed_tasks_[queue_id];

while (!tasks.empty()) {
const auto& top = tasks.top();
while (HasPendingTasksUnlocked(queue_id)) {
TaskQueueId top_queue = _kUnmerged;
const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
if (top.GetTargetTime() > now) {
break;
}
invocations.emplace_back(std::move(top.GetTask()));
tasks.pop();
delayed_tasks_[top_queue].pop();
if (type == FlushType::kSingle) {
break;
}
}

if (tasks.empty()) {
if (!HasPendingTasksUnlocked(queue_id)) {
WakeUp(queue_id, fml::TimePoint::Max());
} else {
WakeUp(queue_id, tasks.top().GetTargetTime());
WakeUp(queue_id, GetNextWakeTimeUnlocked(queue_id));
}
}

Expand All @@ -96,8 +112,14 @@ void MessageLoopTaskQueues::WakeUp(TaskQueueId queue_id, fml::TimePoint time) {
}

size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
return delayed_tasks_[queue_id].size();
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
return 0;
}
size_t total_tasks = 0;
merged_tasks.InvokeMerged(
[&](TaskQueueId queue) { total_tasks += delayed_tasks_[queue].size(); });
return total_tasks;
}

void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
Expand All @@ -114,10 +136,14 @@ void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
}

void MessageLoopTaskQueues::NotifyObservers(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kObservers));
for (const auto& observer : task_observers_[queue_id]) {
observer.second();
}
MergedQueuesRunner merged_observers =
MergedQueuesRunner(*this, queue_id, MutexType::kObservers);

merged_observers.InvokeMerged([&](TaskQueueId queue) {
for (const auto& observer : task_observers_[queue]) {
observer.second();
}
});
}

// Thread safety analysis disabled as it does not account for defered locks.
Expand All @@ -131,7 +157,7 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary)
std::mutex& t1 = GetMutex(primary, MutexType::kTasks);
std::mutex& t2 = GetMutex(secondary, MutexType::kTasks);

std::scoped_lock(o1, o2, t1, t2);
std::scoped_lock lock(o1, o2, t1, t2);

std::swap(task_observers_[primary], task_observers_[secondary]);
std::swap(delayed_tasks_[primary], delayed_tasks_[secondary]);
Expand All @@ -140,9 +166,133 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary)
void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
fml::Wakeable* wakeable) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kWakeables));
FML_CHECK(!wakeables_[queue_id]) << "Wakeable can only be set once.";
wakeables_[queue_id] = wakeable;
}

bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
// task_observers locks
std::mutex& o1 = GetMutex(owner, MutexType::kObservers);
std::mutex& o2 = GetMutex(subsumed, MutexType::kObservers);

// delayed_tasks locks
std::mutex& t1 = GetMutex(owner, MutexType::kTasks);
std::mutex& t2 = GetMutex(subsumed, MutexType::kTasks);

std::scoped_lock lock(o1, o2, t1, t2);

if (owner == subsumed) {
return true;
}

if (owner_to_subsumed_[owner] == subsumed) {
return true;
}

std::vector<TaskQueueId> owner_subsumed_keys = {
owner_to_subsumed_[owner], owner_to_subsumed_[subsumed],
subsumed_to_owner_[owner], subsumed_to_owner_[subsumed]};

for (auto key : owner_subsumed_keys) {
if (key != _kUnmerged) {
return false;
}
}

owner_to_subsumed_[owner] = subsumed;
subsumed_to_owner_[subsumed] = owner;

if (HasPendingTasksUnlocked(owner)) {
WakeUp(owner, GetNextWakeTimeUnlocked(owner));
}

return true;
}

bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
MergedQueuesRunner merged_observers =
MergedQueuesRunner(*this, owner, MutexType::kObservers);
MergedQueuesRunner merged_tasks =
MergedQueuesRunner(*this, owner, MutexType::kTasks);

const TaskQueueId subsumed = owner_to_subsumed_[owner];
if (subsumed == _kUnmerged) {
return false;
}

subsumed_to_owner_[subsumed] = _kUnmerged;
owner_to_subsumed_[owner] = _kUnmerged;

if (HasPendingTasksUnlocked(owner)) {
WakeUp(owner, GetNextWakeTimeUnlocked(owner));
}

if (HasPendingTasksUnlocked(subsumed)) {
WakeUp(subsumed, GetNextWakeTimeUnlocked(subsumed));
}

return true;
}

bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) {
MergedQueuesRunner merged_observers = MergedQueuesRunner(*this, owner);
return subsumed == owner_to_subsumed_[owner] || owner == subsumed;
}

// Subsumed queues will never have pending tasks.
// Owning queues will consider both their and their subsumed tasks.
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) {
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
return false;
}

if (!delayed_tasks_[queue_id].empty()) {
return true;
}

const TaskQueueId subsumed = owner_to_subsumed_[queue_id];
if (subsumed == _kUnmerged) {
// this is not an owner and queue is empty.
return false;
} else {
return !delayed_tasks_[subsumed].empty();
}
}

fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
TaskQueueId queue_id) {
TaskQueueId tmp = _kUnmerged;
return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
}

const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
TaskQueueId owner,
TaskQueueId& top_queue_id) {
FML_DCHECK(HasPendingTasksUnlocked(owner));
const TaskQueueId subsumed = owner_to_subsumed_[owner];
if (subsumed == _kUnmerged) {
top_queue_id = owner;
return delayed_tasks_[owner].top();
}
// we are owning another task queue
const bool subsumed_has_task = !delayed_tasks_[subsumed].empty();
const bool owner_has_task = !delayed_tasks_[owner].empty();
if (owner_has_task && subsumed_has_task) {
const auto owner_task = delayed_tasks_[owner].top();
const auto subsumed_task = delayed_tasks_[subsumed].top();
if (owner_task > subsumed_task) {
top_queue_id = subsumed;
} else {
top_queue_id = owner;
}
} else if (owner_has_task) {
top_queue_id = owner;
} else {
top_queue_id = subsumed;
}
return delayed_tasks_[top_queue_id].top();
}

std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id,
MutexType type) {
std::scoped_lock lock(queue_meta_mutex_);
Expand Down
Loading

0 comments on commit 379028a

Please sign in to comment.