Skip to content

Commit

Permalink
Support message loops whose tasks are executed concurrently. (flutter…
Browse files Browse the repository at this point in the history
…#8419)

The number of workers depends on what the platform deem appropriate for the system at runtime.
  • Loading branch information
chinmaygarde authored Apr 10, 2019
1 parent 8ae84ec commit ca1d163
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 11 deletions.
2 changes: 2 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ FILE: ../../../flutter/fml/command_line.cc
FILE: ../../../flutter/fml/command_line.h
FILE: ../../../flutter/fml/command_line_unittest.cc
FILE: ../../../flutter/fml/compiler_specific.h
FILE: ../../../flutter/fml/concurrent_message_loop.cc
FILE: ../../../flutter/fml/concurrent_message_loop.h
FILE: ../../../flutter/fml/eintr_wrapper.h
FILE: ../../../flutter/fml/file.cc
FILE: ../../../flutter/fml/file.h
Expand Down
2 changes: 2 additions & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ source_set("fml") {
"command_line.cc",
"command_line.h",
"compiler_specific.h",
"concurrent_message_loop.cc",
"concurrent_message_loop.h",
"eintr_wrapper.h",
"file.cc",
"file.h",
Expand Down
70 changes: 70 additions & 0 deletions fml/concurrent_message_loop.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "flutter/fml/concurrent_message_loop.h"

#include <algorithm>

#include "flutter/fml/thread.h"
#include "flutter/fml/trace_event.h"

namespace fml {

ConcurrentMessageLoop::ConcurrentMessageLoop()
: worker_count_(std::max(std::thread::hardware_concurrency(), 1u)),
shutdown_latch_(worker_count_),
shutdown_(false) {
for (size_t i = 0; i < worker_count_; ++i) {
workers_.emplace_back([i, this]() {
fml::Thread::SetCurrentThreadName(
std::string{"io.flutter.worker." + std::to_string(i + 1)});
WorkerMain();
});
}
}

ConcurrentMessageLoop::~ConcurrentMessageLoop() {
Terminate();
shutdown_latch_.Wait();
for (auto& worker : workers_) {
worker.join();
}
}

// |fml::MessageLoopImpl|
void ConcurrentMessageLoop::Run() {
FML_CHECK(false);
}

// |fml::MessageLoopImpl|
void ConcurrentMessageLoop::Terminate() {
std::lock_guard<std::mutex> lock(wait_condition_mutex_);
shutdown_ = true;
wait_condition_.notify_all();
}

// |fml::MessageLoopImpl|
void ConcurrentMessageLoop::WakeUp(fml::TimePoint time_point) {
// Assume that the clocks are not the same.
const auto duration = std::chrono::nanoseconds(
(time_point - fml::TimePoint::Now()).ToNanoseconds());
next_wake_ = std::chrono::high_resolution_clock::now() + duration;
wait_condition_.notify_all();
}

void ConcurrentMessageLoop::WorkerMain() {
while (!shutdown_) {
std::unique_lock<std::mutex> lock(wait_condition_mutex_);
if (!shutdown_) {
wait_condition_.wait(lock);
}
TRACE_EVENT0("fml", "ConcurrentWorkerWake");
RunSingleExpiredTaskNow();
}

RunExpiredTasksNow();
shutdown_latch_.CountDown();
}

} // namespace fml
55 changes: 55 additions & 0 deletions fml/concurrent_message_loop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_
#define FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <thread>
#include <vector>

#include "flutter/fml/macros.h"
#include "flutter/fml/message_loop_impl.h"
#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/thread_annotations.h"

namespace fml {

class ConcurrentMessageLoop : public MessageLoopImpl {
private:
const size_t worker_count_;
std::mutex wait_condition_mutex_;
std::condition_variable wait_condition_;
std::vector<std::thread> workers_;
CountDownLatch shutdown_latch_;
std::chrono::high_resolution_clock::time_point next_wake_;
std::atomic_bool shutdown_;

ConcurrentMessageLoop();

~ConcurrentMessageLoop();

// |fml::MessageLoopImpl|
void Run() override;

// |fml::MessageLoopImpl|
void Terminate() override;

// |fml::MessageLoopImpl|
void WakeUp(fml::TimePoint time_point) override;

static void WorkerMain(ConcurrentMessageLoop* loop);

void WorkerMain();

FML_FRIEND_MAKE_REF_COUNTED(ConcurrentMessageLoop);
FML_FRIEND_REF_COUNTED_THREAD_SAFE(ConcurrentMessageLoop);
FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentMessageLoop);
};

} // namespace fml

#endif // FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_
8 changes: 8 additions & 0 deletions fml/message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <utility>

#include "flutter/fml/concurrent_message_loop.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/memory/ref_ptr.h"
#include "flutter/fml/message_loop_impl.h"
Expand Down Expand Up @@ -45,6 +46,13 @@ MessageLoop::MessageLoop()
FML_CHECK(task_runner_);
}

MessageLoop::MessageLoop(Type)
: loop_(fml::MakeRefCounted<ConcurrentMessageLoop>()),
task_runner_(fml::MakeRefCounted<fml::TaskRunner>(loop_)) {
FML_CHECK(loop_);
FML_CHECK(task_runner_);
}

MessageLoop::~MessageLoop() = default;

void MessageLoop::Run() {
Expand Down
4 changes: 4 additions & 0 deletions fml/message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class MessageLoop {
FML_EMBEDDER_ONLY
static MessageLoop& GetCurrent();

enum class Type { kConcurrent };

MessageLoop(Type type);

bool IsValid() const;

void Run();
Expand Down
19 changes: 13 additions & 6 deletions fml/message_loop_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) {
RegisterTask(task, target_time);
}

void MessageLoopImpl::RunExpiredTasksNow() {
RunExpiredTasks();
}

void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) {
FML_DCHECK(callback != nullptr);
FML_DCHECK(MessageLoop::GetCurrent().GetLoopImpl().get() == this)
Expand Down Expand Up @@ -112,8 +108,8 @@ void MessageLoopImpl::RegisterTask(fml::closure task,
WakeUp(delayed_tasks_.top().target_time);
}

void MessageLoopImpl::RunExpiredTasks() {
TRACE_EVENT0("fml", "MessageLoop::RunExpiredTasks");
void MessageLoopImpl::FlushTasks(FlushType type) {
TRACE_EVENT0("fml", "MessageLoop::FlushTasks");
std::vector<fml::closure> invocations;

{
Expand All @@ -131,6 +127,9 @@ void MessageLoopImpl::RunExpiredTasks() {
}
invocations.emplace_back(std::move(top.task));
delayed_tasks_.pop();
if (type == FlushType::kSingle) {
break;
}
}

WakeUp(delayed_tasks_.empty() ? fml::TimePoint::Max()
Expand All @@ -145,6 +144,14 @@ void MessageLoopImpl::RunExpiredTasks() {
}
}

void MessageLoopImpl::RunExpiredTasksNow() {
FlushTasks(FlushType::kAll);
}

void MessageLoopImpl::RunSingleExpiredTaskNow() {
FlushTasks(FlushType::kSingle);
}

MessageLoopImpl::DelayedTask::DelayedTask(size_t p_order,
fml::closure p_task,
fml::TimePoint p_target_time)
Expand Down
16 changes: 13 additions & 3 deletions fml/message_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "flutter/fml/macros.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/message_loop.h"
#include "flutter/fml/synchronization/thread_annotations.h"
#include "flutter/fml/time/time_point.h"

namespace fml {
Expand All @@ -42,10 +43,15 @@ class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {

void DoTerminate();

protected:
// Exposed for the embedder shell which allows clients to poll for events
// instead of dedicating a thread to the message loop.
friend class MessageLoop;

void RunExpiredTasksNow();

void RunSingleExpiredTaskNow();

protected:
MessageLoopImpl();

Expand Down Expand Up @@ -76,13 +82,17 @@ class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {

std::map<intptr_t, fml::closure> task_observers_;
std::mutex delayed_tasks_mutex_;
DelayedTaskQueue delayed_tasks_;
size_t order_;
DelayedTaskQueue delayed_tasks_ FML_GUARDED_BY(delayed_tasks_mutex_);
size_t order_ FML_GUARDED_BY(delayed_tasks_mutex_);
std::atomic_bool terminated_;

void RegisterTask(fml::closure task, fml::TimePoint target_time);

void RunExpiredTasks();
enum class FlushType {
kSingle,
kAll,
};
void FlushTasks(FlushType type);

FML_DISALLOW_COPY_AND_ASSIGN(MessageLoopImpl);
};
Expand Down
17 changes: 17 additions & 0 deletions fml/message_loop_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

#define FML_USED_ON_EMBEDDER

#include <iostream>
#include <thread>

#include "flutter/fml/message_loop.h"
#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/waitable_event.h"
#include "flutter/fml/task_runner.h"
#include "gtest/gtest.h"
Expand Down Expand Up @@ -278,3 +280,18 @@ TEST(MessageLoop, TaskObserverFire) {
ASSERT_TRUE(started);
ASSERT_TRUE(terminated);
}

TEST(MessageLoop, CanCreateConcurrentMessageLoop) {
fml::MessageLoop loop(fml::MessageLoop::Type::kConcurrent);
auto task_runner = loop.GetTaskRunner();
const size_t kCount = 10;
fml::CountDownLatch latch(kCount);
for (size_t i = 0; i < kCount; ++i) {
task_runner->PostTask([&latch]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "Ran on thread: " << std::this_thread::get_id() << std::endl;
latch.CountDown();
});
}
latch.Wait();
}
3 changes: 3 additions & 0 deletions fml/platform/darwin/message_loop_darwin.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ class MessageLoopDarwin : public MessageLoopImpl {

~MessageLoopDarwin() override;

// |fml::MessageLoopImpl|
void Run() override;

// |fml::MessageLoopImpl|
void Terminate() override;

// |fml::MessageLoopImpl|
void WakeUp(fml::TimePoint time_point) override;

static void OnTimerFire(CFRunLoopTimerRef timer, MessageLoopDarwin* loop);
Expand Down
3 changes: 3 additions & 0 deletions fml/platform/linux/message_loop_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ bool MessageLoopLinux::AddOrRemoveTimerSource(bool add) {
return ctl_result == 0;
}

// |fml::MessageLoopImpl|
void MessageLoopLinux::Run() {
running_ = true;

Expand Down Expand Up @@ -71,11 +72,13 @@ void MessageLoopLinux::Run() {
}
}

// |fml::MessageLoopImpl|
void MessageLoopLinux::Terminate() {
running_ = false;
WakeUp(fml::TimePoint::Now());
}

// |fml::MessageLoopImpl|
void MessageLoopLinux::WakeUp(fml::TimePoint time_point) {
bool result = TimerRearm(timer_fd_.get(), time_point);
FML_DCHECK(result);
Expand Down
3 changes: 3 additions & 0 deletions fml/platform/linux/message_loop_linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ class MessageLoopLinux : public MessageLoopImpl {

~MessageLoopLinux() override;

// |fml::MessageLoopImpl|
void Run() override;

// |fml::MessageLoopImpl|
void Terminate() override;

// |fml::MessageLoopImpl|
void WakeUp(fml::TimePoint time_point) override;

void OnEventFired();
Expand Down
4 changes: 2 additions & 2 deletions fml/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class Thread {

void Join();

static void SetCurrentThreadName(const std::string& name);

private:
std::unique_ptr<std::thread> thread_;
fml::RefPtr<fml::TaskRunner> task_runner_;
std::atomic_bool joined_;

static void SetCurrentThreadName(const std::string& name);

FML_DISALLOW_COPY_AND_ASSIGN(Thread);
};

Expand Down

0 comments on commit ca1d163

Please sign in to comment.