Skip to content

Commit

Permalink
Rework image & texture management to use concurrent message queues. (f…
Browse files Browse the repository at this point in the history
…lutter#9486)

This patch reworks image decompression and collection in the following ways
because of misbehavior in the described edge cases.

The current flow for realizing a texture on the GPU from a blob of compressed
bytes is to first pass it to the IO thread for image decompression and then
upload to the GPU. The handle to the texture on the GPU is then passed back to
the UI thread so that it can be included in subsequent layer trees for
rendering. The GPU contexts on the Render & IO threads are in the same
sharegroup so the texture ends up being visible to the Render Thread context
during rendering. This works fine and does not block the UI thread. All
references to the image are owned on UI thread by Dart objects. When the final
reference to the image is dropped, the texture cannot be collected on the UI
thread (because it has not GPU context). Instead, it must be passed to either
the GPU or IO threads. The GPU thread is usually in the middle of a frame
workload so we redirect the same to the IO thread for eventual collection. While
texture collections are usually (comparatively) fast, texture decompression and
upload are slow (order of magnitude of frame intervals).

For application that end up creating (by not necessarily using) numerous large
textures in straight-line execution, it could be the case that texture
collection tasks are pending on the IO task runner after all the image
decompressions (and upload) are done. Put simply, the collection of the first
image could be waiting for the decompression and upload of the last image in the
queue.

This is exacerbated by two other hacks added to workaround unrelated issues.
* First, creating a codec with a single image frame immediately kicks of
  decompression and upload of that frame image (even if the frame was never
  request from the codec). This hack was added because we wanted to get rid of
  the compressed image allocation ASAP. The expectation was codecs would only be
  created with the sole purpose of getting the decompressed image bytes.
  However, for applications that only create codecs to get image sizes (but
  never actually decompress the same), we would end up replacing the compressed
  image allocation with a larger allocation (device resident no less) for no
  obvious use. This issue is particularly insidious when you consider that the
  codec is usually asked for the native image size first before the frame is
  requested at a smaller size (usually using a new codec with same data but new
  targetsize). This would cause the creation of a whole extra texture (at 1:1)
  when the caller was trying to “optimize” for memory use by requesting a
  texture of a smaller size.
* Second, all image collections we delayed in by the unref queue by 250ms
  because of observations that the calling thread (the UI thread) was being
  descheduled unnecessarily when a task with a timeout of zero was posted from
  the same (recall that a task has to be posted to the IO thread for the
  collection of that texture). 250ms is multiple frame intervals worth of
  potentially unnecessary textures.

The net result of these issues is that we may end up creating textures when all
that the application needs is to ask it’s codec for details about the same (but
not necessarily access its bytes). Texture collection could also be delayed
behind other jobs to decompress the textures on the IO thread. Also, all texture
collections are delayed for an arbitrary amount of time.

These issues cause applications to be susceptible to OOM situations. These
situations manifest in various ways. Host memory exhaustion causes the usual OOM
issues. Device memory exhaustion seems to manifest in different ways on iOS and
Android. On Android, allocation of a new texture seems to be causing an
assertion (in the driver). On iOS, the call hangs (presumably waiting for
another thread to release textures which we won’t do because those tasks are
blocked behind the current task completing).

To address peak memory usage, the following changes have been made:
* Image decompression and upload/collection no longer happen on the same thread.
  All image decompression will now be handled on a workqueue. The number of
  worker threads in this workqueue is equal to the number of processors on the
  device. These threads have a lower priority that either the UI or Render
  threads. These workers are shared between all Flutter applications in the
  process.
* Both the images and their codec now report the correct allocation size to Dart
  for GC purposes. The Dart VM uses this to pick objects for collection. Earlier
  the image allocation was assumed to 32bpp with no mipmapping overhead
  reported. Now, the correct image size is reported and the mipmapping overhead
  is accounted for. Image codec sizes were not reported to the VM earlier and
  now are. Expect “External” VM allocations to be higher than previously
  reported and the numbers in Observatory to line up more closely with actual
  memory usage (device and host).
* Decoding images to a specific size used to decode to 1:1 before performing a
  resize to the correct dimensions before texture upload. This has now been
  reworked so that images are first decompressed to a smaller size supported
  natively by the codec before final resizing to the requested target size. The
  intermediate copy is now smaller and more promptly collected. Resizing also
  happens on the workqueue worker.
* The drain interval of the unref queue is now sub-frame-interval. I am hesitant
  to remove the delay entirely because I have not been able to instrument the
  performance overhead of the same. That is next on my list. But now, multiple
  frame intervals worth of textures no longer stick around.

The following issues have been addressed:
* flutter/flutter#34070 Since this was the first usage
  of the concurrent message loops, the number of idle wakes were determined to
  be too high and this component has been rewritten to be simpler and not use
  the existing task runner and MessageLoopImpl interface.
* Image decoding had no tests. The new `ui_unittests` harness has been added
  that sets up a GPU test harness on the host using SwiftShader. Tests have been
  added for image decompression, upload and resizing.
* The device memory exhaustion in this benchmark has been addressed. That
  benchmark is still not viable for inclusion in any harness however because it
  creates 9 million codecs in straight-line execution. Because these codecs are
  destroyed in the microtask callbacks, these are referenced till those
  callbacks are executed. So now, instead of device memory exhaustion, this will
  lead to (slower) exhaustion of host memory. This is expected and working as
  intended.

This patch only addresses peak memory use and makes collection of unused images
and textures more prompt. It does NOT address memory use by images referenced
strongly by the application or framework.
  • Loading branch information
chinmaygarde authored Jul 9, 2019
1 parent 1dcd5f5 commit ad582b5
Show file tree
Hide file tree
Showing 48 changed files with 1,680 additions and 687 deletions.
1 change: 1 addition & 0 deletions BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ group("flutter") {
public_deps += [
"$flutter_root/flow:flow_unittests",
"$flutter_root/fml:fml_unittests",
"$flutter_root/lib/ui:ui_unittests",
"$flutter_root/runtime:runtime_unittests",
"$flutter_root/shell/common:shell_unittests",
"$flutter_root/shell/platform/common/cpp/client_wrapper:client_wrapper_unittests",
Expand Down
9 changes: 9 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ FILE: ../../../flutter/lib/ui/dart_runtime_hooks.h
FILE: ../../../flutter/lib/ui/dart_ui.cc
FILE: ../../../flutter/lib/ui/dart_ui.h
FILE: ../../../flutter/lib/ui/dart_wrapper.h
FILE: ../../../flutter/lib/ui/fixtures/DashInNooglerHat.jpg
FILE: ../../../flutter/lib/ui/fixtures/ui_test.dart
FILE: ../../../flutter/lib/ui/geometry.dart
FILE: ../../../flutter/lib/ui/hash_codes.dart
FILE: ../../../flutter/lib/ui/hooks.dart
Expand All @@ -342,6 +344,9 @@ FILE: ../../../flutter/lib/ui/painting/gradient.cc
FILE: ../../../flutter/lib/ui/painting/gradient.h
FILE: ../../../flutter/lib/ui/painting/image.cc
FILE: ../../../flutter/lib/ui/painting/image.h
FILE: ../../../flutter/lib/ui/painting/image_decoder.cc
FILE: ../../../flutter/lib/ui/painting/image_decoder.h
FILE: ../../../flutter/lib/ui/painting/image_decoder_unittests.cc
FILE: ../../../flutter/lib/ui/painting/image_encoding.cc
FILE: ../../../flutter/lib/ui/painting/image_encoding.h
FILE: ../../../flutter/lib/ui/painting/image_filter.cc
Expand All @@ -350,6 +355,8 @@ FILE: ../../../flutter/lib/ui/painting/image_shader.cc
FILE: ../../../flutter/lib/ui/painting/image_shader.h
FILE: ../../../flutter/lib/ui/painting/matrix.cc
FILE: ../../../flutter/lib/ui/painting/matrix.h
FILE: ../../../flutter/lib/ui/painting/multi_frame_codec.cc
FILE: ../../../flutter/lib/ui/painting/multi_frame_codec.h
FILE: ../../../flutter/lib/ui/painting/paint.cc
FILE: ../../../flutter/lib/ui/painting/paint.h
FILE: ../../../flutter/lib/ui/painting/path.cc
Expand All @@ -364,6 +371,8 @@ FILE: ../../../flutter/lib/ui/painting/rrect.cc
FILE: ../../../flutter/lib/ui/painting/rrect.h
FILE: ../../../flutter/lib/ui/painting/shader.cc
FILE: ../../../flutter/lib/ui/painting/shader.h
FILE: ../../../flutter/lib/ui/painting/single_frame_codec.cc
FILE: ../../../flutter/lib/ui/painting/single_frame_codec.h
FILE: ../../../flutter/lib/ui/painting/vertices.cc
FILE: ../../../flutter/lib/ui/painting/vertices.h
FILE: ../../../flutter/lib/ui/plugins.dart
Expand Down
112 changes: 84 additions & 28 deletions fml/concurrent_message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@

namespace fml {

ConcurrentMessageLoop::ConcurrentMessageLoop()
: worker_count_(std::max(std::thread::hardware_concurrency(), 1u)),
shutdown_latch_(worker_count_),
shutdown_(false) {
std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
size_t worker_count) {
return std::shared_ptr<ConcurrentMessageLoop>{
new ConcurrentMessageLoop(worker_count)};
}

ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
: worker_count_(std::max<size_t>(worker_count, 1ul)) {
for (size_t i = 0; i < worker_count_; ++i) {
workers_.emplace_back([i, this]() {
fml::Thread::SetCurrentThreadName(
Expand All @@ -26,45 +30,97 @@ ConcurrentMessageLoop::ConcurrentMessageLoop()

ConcurrentMessageLoop::~ConcurrentMessageLoop() {
Terminate();
shutdown_latch_.Wait();
for (auto& worker : workers_) {
worker.join();
}
}

// |fml::MessageLoopImpl|
void ConcurrentMessageLoop::Run() {
FML_CHECK(false);
size_t ConcurrentMessageLoop::GetWorkerCount() const {
return worker_count_;
}

// |fml::MessageLoopImpl|
void ConcurrentMessageLoop::Terminate() {
std::scoped_lock lock(wait_condition_mutex_);
shutdown_ = true;
wait_condition_.notify_all();
std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
}

// |fml::MessageLoopImpl|
void ConcurrentMessageLoop::WakeUp(fml::TimePoint time_point) {
// Assume that the clocks are not the same.
const auto duration = std::chrono::nanoseconds(
(time_point - fml::TimePoint::Now()).ToNanoseconds());
next_wake_ = std::chrono::high_resolution_clock::now() + duration;
wait_condition_.notify_all();
void ConcurrentMessageLoop::PostTask(fml::closure task) {
if (!task) {
return;
}

std::unique_lock lock(tasks_mutex_);

// Don't just drop tasks on the floor in case of shutdown.
if (shutdown_) {
FML_DLOG(WARNING)
<< "Tried to post a task to shutdown concurrent message "
"loop. The task will be executed on the callers thread.";
lock.unlock();
task();
return;
}

tasks_.push(task);

// Unlock the mutex before notifying the condition variable because that mutex
// has to be acquired on the other thread anyway. Waiting in this scope till
// it is acquired there is a pessimization.
lock.unlock();

tasks_condition_.notify_one();
}

void ConcurrentMessageLoop::WorkerMain() {
while (!shutdown_) {
std::unique_lock<std::mutex> lock(wait_condition_mutex_);
if (!shutdown_) {
wait_condition_.wait(lock);
while (true) {
std::unique_lock lock(tasks_mutex_);
tasks_condition_.wait(lock,
[&]() { return tasks_.size() > 0 || shutdown_; });

if (tasks_.size() == 0) {
// This can only be caused by shutdown.
FML_DCHECK(shutdown_);
break;
}
TRACE_EVENT0("fml", "ConcurrentWorkerWake");
RunSingleExpiredTaskNow();

auto task = tasks_.front();
tasks_.pop();

// Don't hold onto the mutex while the task is being executed as it could
// itself try to post another tasks to this message loop.
lock.unlock();

TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
// Execute the one tasks we woke up for.
task();
}
}

void ConcurrentMessageLoop::Terminate() {
std::scoped_lock lock(tasks_mutex_);
shutdown_ = true;
tasks_condition_.notify_all();
}

ConcurrentTaskRunner::ConcurrentTaskRunner(
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
: weak_loop_(std::move(weak_loop)) {}

ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;

void ConcurrentTaskRunner::PostTask(fml::closure task) {
if (!task) {
return;
}

if (auto loop = weak_loop_.lock()) {
loop->PostTask(task);
return;
}

RunExpiredTasksNow();
shutdown_latch_.CountDown();
FML_DLOG(WARNING)
<< "Tried to post to a concurrent message loop that has already died. "
"Executing the task on the callers thread.";
task();
}

} // namespace fml
64 changes: 40 additions & 24 deletions fml/concurrent_message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,67 @@
#ifndef FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_
#define FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <queue>
#include <thread>
#include <vector>

#include "flutter/fml/closure.h"
#include "flutter/fml/macros.h"
#include "flutter/fml/message_loop_impl.h"
#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/thread_annotations.h"

namespace fml {

class ConcurrentMessageLoop : public MessageLoopImpl {
private:
const size_t worker_count_;
std::mutex wait_condition_mutex_;
std::condition_variable wait_condition_;
std::vector<std::thread> workers_;
CountDownLatch shutdown_latch_;
std::chrono::high_resolution_clock::time_point next_wake_;
std::atomic_bool shutdown_;
class ConcurrentTaskRunner;

ConcurrentMessageLoop();
class ConcurrentMessageLoop
: public std::enable_shared_from_this<ConcurrentMessageLoop> {
public:
static std::shared_ptr<ConcurrentMessageLoop> Create(
size_t worker_count = std::thread::hardware_concurrency());

~ConcurrentMessageLoop();

// |fml::MessageLoopImpl|
void Run() override;
size_t GetWorkerCount() const;

std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner();

void Terminate();

// |fml::MessageLoopImpl|
void Terminate() override;
private:
friend ConcurrentTaskRunner;

// |fml::MessageLoopImpl|
void WakeUp(fml::TimePoint time_point) override;
size_t worker_count_ = 0;
std::vector<std::thread> workers_;
std::mutex tasks_mutex_;
std::condition_variable tasks_condition_;
std::queue<fml::closure> tasks_;
bool shutdown_ = false;

static void WorkerMain(ConcurrentMessageLoop* loop);
ConcurrentMessageLoop(size_t worker_count);

void WorkerMain();

FML_FRIEND_MAKE_REF_COUNTED(ConcurrentMessageLoop);
FML_FRIEND_REF_COUNTED_THREAD_SAFE(ConcurrentMessageLoop);
void PostTask(fml::closure task);

FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentMessageLoop);
};

class ConcurrentTaskRunner {
public:
ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop);

~ConcurrentTaskRunner();

void PostTask(fml::closure task);

private:
friend ConcurrentMessageLoop;

std::weak_ptr<ConcurrentMessageLoop> weak_loop_;

FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentTaskRunner);
};

} // namespace fml

#endif // FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_
8 changes: 0 additions & 8 deletions fml/message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

#include <utility>

#include "flutter/fml/concurrent_message_loop.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/memory/ref_ptr.h"
#include "flutter/fml/message_loop_impl.h"
Expand Down Expand Up @@ -44,13 +43,6 @@ MessageLoop::MessageLoop()
FML_CHECK(task_runner_);
}

MessageLoop::MessageLoop(Type)
: loop_(fml::MakeRefCounted<ConcurrentMessageLoop>()),
task_runner_(fml::MakeRefCounted<fml::TaskRunner>(loop_)) {
FML_CHECK(loop_);
FML_CHECK(task_runner_);
}

MessageLoop::~MessageLoop() = default;

void MessageLoop::Run() {
Expand Down
4 changes: 0 additions & 4 deletions fml/message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ class MessageLoop {
FML_EMBEDDER_ONLY
static MessageLoop& GetCurrent();

enum class Type { kConcurrent };

MessageLoop(Type type);

bool IsValid() const;

void Run();
Expand Down
21 changes: 17 additions & 4 deletions fml/message_loop_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <iostream>
#include <thread>

#include "flutter/fml/concurrent_message_loop.h"
#include "flutter/fml/message_loop.h"
#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/waitable_event.h"
Expand Down Expand Up @@ -281,19 +282,31 @@ TEST(MessageLoop, TaskObserverFire) {
ASSERT_TRUE(terminated);
}

TEST(MessageLoop, CanCreateAndShutdownConcurrentMessageLoopsOverAndOver) {
for (size_t i = 0; i < 10; ++i) {
auto loop = fml::ConcurrentMessageLoop::Create(i + 1);
ASSERT_EQ(loop->GetWorkerCount(), i + 1);
}
}

TEST(MessageLoop, CanCreateConcurrentMessageLoop) {
fml::MessageLoop loop(fml::MessageLoop::Type::kConcurrent);
auto task_runner = loop.GetTaskRunner();
auto loop = fml::ConcurrentMessageLoop::Create();
auto task_runner = loop->GetTaskRunner();
const size_t kCount = 10;
fml::CountDownLatch latch(kCount);
std::mutex thread_ids_mutex;
std::set<std::thread::id> thread_ids;
for (size_t i = 0; i < kCount; ++i) {
task_runner->PostTask([&latch]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
task_runner->PostTask([&]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Ran on thread: " << std::this_thread::get_id() << std::endl;
std::scoped_lock lock(thread_ids_mutex);
thread_ids.insert(std::this_thread::get_id());
latch.CountDown();
});
}
latch.Wait();
ASSERT_GE(thread_ids.size(), 1u);
}

TEST(MessageLoop, CanSwapMessageLoopsAndPreserveThreadConfiguration) {
Expand Down
35 changes: 35 additions & 0 deletions fml/trace_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,41 @@ class ScopedInstantEnd {
FML_DISALLOW_COPY_AND_ASSIGN(ScopedInstantEnd);
};

// A move-only utility object that creates a new flow with a unique ID and
// automatically ends it when it goes out of scope. When tracing using multiple
// overlapping flows, it often gets hard to make sure to end the flow
// (especially with early returns), or, end/step on the wrong flow. This
// leads to corrupted or missing traces in the UI.
class TraceFlow {
public:
TraceFlow(const char* label) : label_(label), nonce_(TraceNonce()) {
TraceEventFlowBegin0("flutter", label_, nonce_);
}

~TraceFlow() { End(label_); }

TraceFlow(TraceFlow&& other) : label_(other.label_), nonce_(other.nonce_) {
other.nonce_ = 0;
}

void Step(const char* label) const {
TraceEventFlowStep0("flutter", label, nonce_);
}

void End(const char* label = nullptr) {
if (nonce_ != 0) {
TraceEventFlowEnd0("flutter", label == nullptr ? label_ : label, nonce_);
nonce_ = 0;
}
}

private:
const char* label_;
size_t nonce_;

FML_DISALLOW_COPY_AND_ASSIGN(TraceFlow);
};

} // namespace tracing
} // namespace fml

Expand Down
Loading

0 comments on commit ad582b5

Please sign in to comment.