Skip to content

Commit

Permalink
Implementation of two or more threads merging for multiple platform v…
Browse files Browse the repository at this point in the history
…iews (flutter#27662)

Implementation of two or more threads merging for both lightweight multiple engines and standalone multiple engines.
  • Loading branch information
eggfly authored Aug 6, 2021
1 parent 4fef55d commit e4a8d23
Show file tree
Hide file tree
Showing 21 changed files with 969 additions and 472 deletions.
2 changes: 2 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ FILE: ../../../flutter/fml/posix_wrappers.h
FILE: ../../../flutter/fml/raster_thread_merger.cc
FILE: ../../../flutter/fml/raster_thread_merger.h
FILE: ../../../flutter/fml/raster_thread_merger_unittests.cc
FILE: ../../../flutter/fml/shared_thread_merger.cc
FILE: ../../../flutter/fml/shared_thread_merger.h
FILE: ../../../flutter/fml/size.h
FILE: ../../../flutter/fml/status.h
FILE: ../../../flutter/fml/synchronization/atomic_object.h
Expand Down
2 changes: 2 additions & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ source_set("fml") {
"posix_wrappers.h",
"raster_thread_merger.cc",
"raster_thread_merger.h",
"shared_thread_merger.cc",
"shared_thread_merger.h",
"size.h",
"synchronization/atomic_object.h",
"synchronization/count_down_latch.cc",
Expand Down
13 changes: 10 additions & 3 deletions fml/memory/task_runner_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace fml {

TaskRunnerChecker::TaskRunnerChecker()
: initialized_queue_id_(InitTaskQueueId()),
subsumed_queue_id_(
subsumed_queue_ids_(
MessageLoopTaskQueues::GetInstance()->GetSubsumedTaskQueueId(
initialized_queue_id_)){};

Expand All @@ -17,8 +17,15 @@ TaskRunnerChecker::~TaskRunnerChecker() = default;
bool TaskRunnerChecker::RunsOnCreationTaskRunner() const {
FML_CHECK(fml::MessageLoop::IsInitializedForCurrentThread());
const auto current_queue_id = MessageLoop::GetCurrentTaskQueueId();
return RunsOnTheSameThread(current_queue_id, initialized_queue_id_) ||
RunsOnTheSameThread(current_queue_id, subsumed_queue_id_);
if (RunsOnTheSameThread(current_queue_id, initialized_queue_id_)) {
return true;
}
for (auto& subsumed : subsumed_queue_ids_) {
if (RunsOnTheSameThread(current_queue_id, subsumed)) {
return true;
}
}
return false;
};

bool TaskRunnerChecker::RunsOnTheSameThread(TaskQueueId queue_a,
Expand Down
2 changes: 1 addition & 1 deletion fml/memory/task_runner_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TaskRunnerChecker final {

private:
TaskQueueId initialized_queue_id_;
TaskQueueId subsumed_queue_id_;
std::set<TaskQueueId> subsumed_queue_ids_;

TaskQueueId InitTaskQueueId();
};
Expand Down
6 changes: 3 additions & 3 deletions fml/memory/task_runner_checker_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ TEST(TaskRunnerCheckerTests, MergedTaskRunnersRunsOnTheSameThread) {
fml::TaskQueueId qid2 = loop2->GetTaskRunner()->GetTaskQueueId();
const auto raster_thread_merger_ =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid1, qid2);
const int kNumFramesMerged = 5;
const size_t kNumFramesMerged = 5;

raster_thread_merger_->MergeWithLease(kNumFramesMerged);

// merged, running on the same thread
EXPECT_EQ(TaskRunnerChecker::RunsOnTheSameThread(qid1, qid2), true);

for (int i = 0; i < kNumFramesMerged; i++) {
for (size_t i = 0; i < kNumFramesMerged; i++) {
ASSERT_TRUE(raster_thread_merger_->IsMerged());
raster_thread_merger_->DecrementLease();
}
Expand Down Expand Up @@ -154,7 +154,7 @@ TEST(TaskRunnerCheckerTests,
});
latch3.Wait();

fml::MessageLoopTaskQueues::GetInstance()->Unmerge(qid1);
fml::MessageLoopTaskQueues::GetInstance()->Unmerge(qid1, qid2);

fml::AutoResetWaitableEvent latch4;
loop2->GetTaskRunner()->PostTask([&]() {
Expand Down
4 changes: 2 additions & 2 deletions fml/memory/weak_ptr_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ TEST(TaskRunnerAffineWeakPtrTest, ShouldNotCrashIfRunningOnTheSameTaskRunner) {
fml::TaskQueueId qid2 = loop2->GetTaskRunner()->GetTaskQueueId();
const auto raster_thread_merger_ =
fml::MakeRefCounted<fml::RasterThreadMerger>(qid1, qid2);
const int kNumFramesMerged = 5;
const size_t kNumFramesMerged = 5;

raster_thread_merger_->MergeWithLease(kNumFramesMerged);

loop2_task_start_latch.Signal();
loop2_task_finish_latch.Wait();

for (int i = 0; i < kNumFramesMerged; i++) {
for (size_t i = 0; i < kNumFramesMerged; i++) {
ASSERT_TRUE(raster_thread_merger_->IsMerged());
raster_thread_merger_->DecrementLease();
}
Expand Down
163 changes: 103 additions & 60 deletions fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

#include <iostream>
#include <memory>
#include <optional>

#include "flutter/fml/make_copyable.h"
#include "flutter/fml/message_loop_impl.h"
#include "flutter/fml/task_source.h"
#include "flutter/fml/thread_local.h"

Expand All @@ -25,7 +25,7 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;

namespace {

// iOS prior to version 9 prevents c++11 thread_local and __thread specefier,
// iOS prior to version 9 prevents c++11 thread_local and __thread specifier,
// having us resort to boxed enum containers.
class TaskSourceGradeHolder {
public:
Expand All @@ -41,9 +41,7 @@ FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder>
tls_task_source_grade;

TaskQueueEntry::TaskQueueEntry(TaskQueueId created_for_arg)
: owner_of(_kUnmerged),
subsumed_by(_kUnmerged),
created_for(created_for_arg) {
: subsumed_by(_kUnmerged), created_for(created_for_arg) {
wakeable = NULL;
task_observers = TaskObservers();
task_source = std::make_unique<TaskSource>(created_for);
Expand Down Expand Up @@ -76,20 +74,21 @@ void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entries_.erase(queue_id);
if (subsumed != _kUnmerged) {
auto& subsumed_set = queue_entry->owner_of;
for (auto& subsumed : subsumed_set) {
queue_entries_.erase(subsumed);
}
// Erase owner queue_id at last to avoid &subsumed_set from being invalid
queue_entries_.erase(queue_id);
}

void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
std::lock_guard guard(queue_mutex_);
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
auto& subsumed_set = queue_entry->owner_of;
queue_entry->task_source->ShutDown();
if (subsumed != _kUnmerged) {
for (auto& subsumed : subsumed_set) {
queue_entries_.at(subsumed)->task_source->ShutDown();
}
}
Expand Down Expand Up @@ -170,8 +169,8 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
size_t total_tasks = 0;
total_tasks += queue_entry->task_source->GetNumPendingTasks();

TaskQueueId subsumed = queue_entry->owner_of;
if (subsumed != _kUnmerged) {
auto& subsumed_set = queue_entry->owner_of;
for (auto& subsumed : subsumed_set) {
const auto& subsumed_entry = queue_entries_.at(subsumed);
total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
}
Expand Down Expand Up @@ -205,8 +204,8 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
observers.push_back(observer.second);
}

TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
if (subsumed != _kUnmerged) {
auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
for (auto& subsumed : subsumed_set) {
for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
observers.push_back(observer.second);
}
Expand All @@ -230,22 +229,41 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
std::lock_guard guard(queue_mutex_);
auto& owner_entry = queue_entries_.at(owner);
auto& subsumed_entry = queue_entries_.at(subsumed);

if (owner_entry->owner_of == subsumed) {
auto& subsumed_set = owner_entry->owner_of;
if (subsumed_set.find(subsumed) != subsumed_set.end()) {
return true;
}

std::vector<TaskQueueId> owner_subsumed_keys = {
owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
subsumed_entry->subsumed_by};
// Won't check owner_entry->owner_of, because it may contains items when
// merged with other different queues.

for (auto key : owner_subsumed_keys) {
if (key != _kUnmerged) {
return false;
}
// Ensure owner_entry->subsumed_by being _kUnmerged
if (owner_entry->subsumed_by != _kUnmerged) {
FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
"subsumed by others, owner="
<< owner << ", subsumed=" << subsumed
<< ", owner->subsumed_by=" << owner_entry->subsumed_by;
return false;
}

owner_entry->owner_of = subsumed;
// Ensure subsumed_entry->owner_of being empty
if (!subsumed_entry->owner_of.empty()) {
FML_LOG(WARNING)
<< "Thread merging failed: subsumed_entry already owns others, owner="
<< owner << ", subsumed=" << subsumed
<< ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
return false;
}
// Ensure subsumed_entry->subsumed_by being _kUnmerged
if (subsumed_entry->subsumed_by != _kUnmerged) {
FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
"subsumed by others, owner="
<< owner << ", subsumed=" << subsumed
<< ", subsumed->subsumed_by="
<< subsumed_entry->subsumed_by;
return false;
}
// All checking is OK, set merged state.
owner_entry->owner_of.insert(subsumed);
subsumed_entry->subsumed_by = owner;

if (HasPendingTasksUnlocked(owner)) {
Expand All @@ -255,16 +273,37 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
return true;
}

bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner, TaskQueueId subsumed) {
std::lock_guard guard(queue_mutex_);
const auto& owner_entry = queue_entries_.at(owner);
const TaskQueueId subsumed = owner_entry->owner_of;
if (subsumed == _kUnmerged) {
if (owner_entry->owner_of.empty()) {
FML_LOG(WARNING)
<< "Thread unmerging failed: owner_entry doesn't own anyone, owner="
<< owner << ", subsumed=" << subsumed;
return false;
}
if (owner_entry->subsumed_by != _kUnmerged) {
FML_LOG(WARNING)
<< "Thread unmerging failed: owner_entry was subsumed by others, owner="
<< owner << ", subsumed=" << subsumed
<< ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
return false;
}
if (queue_entries_.at(subsumed)->subsumed_by == _kUnmerged) {
FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
"subsumed by others, owner="
<< owner << ", subsumed=" << subsumed;
return false;
}
if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
"given subsumed queue id, owner="
<< owner << ", subsumed=" << subsumed;
return false;
}

queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
owner_entry->owner_of = _kUnmerged;
owner_entry->owner_of.erase(subsumed);

if (HasPendingTasksUnlocked(owner)) {
WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
Expand All @@ -280,11 +319,14 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
TaskQueueId subsumed) const {
std::lock_guard guard(queue_mutex_);
return owner != _kUnmerged && subsumed != _kUnmerged &&
subsumed == queue_entries_.at(owner)->owner_of;
if (owner == _kUnmerged || subsumed == _kUnmerged) {
return false;
}
auto& subsumed_set = queue_entries_.at(owner)->owner_of;
return subsumed_set.find(subsumed) != subsumed_set.end();
}

TaskQueueId MessageLoopTaskQueues::GetSubsumedTaskQueueId(
std::set<TaskQueueId> MessageLoopTaskQueues::GetSubsumedTaskQueueId(
TaskQueueId owner) const {
std::lock_guard guard(queue_mutex_);
return queue_entries_.at(owner)->owner_of;
Expand Down Expand Up @@ -318,13 +360,11 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
return true;
}

const TaskQueueId subsumed = entry->owner_of;
if (subsumed == _kUnmerged) {
// this is not an owner and queue is empty.
return false;
} else {
return !queue_entries_.at(subsumed)->task_source->IsEmpty();
}
auto& subsumed_set = entry->owner_of;
return std::any_of(
subsumed_set.begin(), subsumed_set.end(), [&](const auto& subsumed) {
return !queue_entries_.at(subsumed)->task_source->IsEmpty();
});
}

fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
Expand All @@ -336,32 +376,35 @@ TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
TaskQueueId owner) const {
FML_DCHECK(HasPendingTasksUnlocked(owner));
const auto& entry = queue_entries_.at(owner);
const TaskQueueId subsumed = entry->owner_of;
if (subsumed == _kUnmerged) {
if (entry->owner_of.empty()) {
FML_CHECK(!entry->task_source->IsEmpty());
return entry->task_source->Top();
}

// Use optional for the memory of TopTask object.
std::optional<TaskSource::TopTask> top_task;

std::function<void(const TaskSource*)> top_task_updater =
[&top_task](const TaskSource* source) {
if (source && !source->IsEmpty()) {
TaskSource::TopTask other_task = source->Top();
if (!top_task.has_value() || top_task->task > other_task.task) {
top_task.emplace(other_task);
}
}
};

TaskSource* owner_tasks = entry->task_source.get();
TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();

// we are owning another task queue
const bool subsumed_has_task = !subsumed_tasks->IsEmpty();
const bool owner_has_task = !owner_tasks->IsEmpty();
fml::TaskQueueId top_queue_id = owner;
if (owner_has_task && subsumed_has_task) {
const auto owner_task = owner_tasks->Top();
const auto subsumed_task = subsumed_tasks->Top();
if (owner_task.task > subsumed_task.task) {
top_queue_id = subsumed;
} else {
top_queue_id = owner;
}
} else if (owner_has_task) {
top_queue_id = owner;
} else {
top_queue_id = subsumed;
top_task_updater(owner_tasks);

for (TaskQueueId subsumed : entry->owner_of) {
TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
top_task_updater(subsumed_tasks);
}
return queue_entries_.at(top_queue_id)->task_source->Top();
// At least one task at the top because PeekNextTaskUnlocked() is called after
// HasPendingTasksUnlocked()
FML_CHECK(top_task.has_value());
return top_task.value();
}

} // namespace fml
Loading

0 comments on commit e4a8d23

Please sign in to comment.