diff --git a/synchronization/pipeline.h b/synchronization/pipeline.h index 6b108b9a51594..55e9fc558d115 100644 --- a/synchronization/pipeline.h +++ b/synchronization/pipeline.h @@ -34,28 +34,32 @@ class Pipeline : public ftl::RefCountedThreadSafe> { /// preparing a completed pipeline resource. class ProducerContinuation { public: - ProducerContinuation() = default; + ProducerContinuation() : trace_id_(0) {} ProducerContinuation(ProducerContinuation&& other) - : continuation_(other.continuation_) { + : continuation_(other.continuation_), trace_id_(other.trace_id_) { other.continuation_ = nullptr; + other.trace_id_ = 0; } ProducerContinuation& operator=(ProducerContinuation&& other) { std::swap(continuation_, other.continuation_); + std::swap(trace_id_, other.trace_id_); return *this; } ~ProducerContinuation() { if (continuation_) { - continuation_(nullptr); + continuation_(nullptr, trace_id_); + TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_); } } void Complete(ResourcePtr resource) { if (continuation_) { - continuation_(std::move(resource)); + continuation_(std::move(resource), trace_id_); continuation_ = nullptr; + TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_); } } @@ -63,16 +67,22 @@ class Pipeline : public ftl::RefCountedThreadSafe> { private: friend class Pipeline; + using Continuation = std::function; - std::function continuation_; + Continuation continuation_; + size_t trace_id_; - ProducerContinuation(std::function continuation) - : continuation_(continuation) {} + ProducerContinuation(Continuation continuation, size_t trace_id) + : continuation_(continuation), trace_id_(trace_id) { + TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineItem", trace_id_); + TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineProduce", trace_id_); + } FTL_DISALLOW_COPY_AND_ASSIGN(ProducerContinuation); }; - explicit Pipeline(uint32_t depth) : empty_(depth), available_(0) {} + explicit Pipeline(uint32_t depth) + : empty_(depth), available_(0), last_trace_id_(0) {} ~Pipeline() = default; @@ -83,7 +93,10 @@ class Pipeline : public ftl::RefCountedThreadSafe> { return {}; } - return {std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1)}; + return ProducerContinuation{ + std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1, + std::placeholders::_2), // continuation + ++last_trace_id_}; // trace id } using Consumer = std::function; @@ -99,11 +112,12 @@ class Pipeline : public ftl::RefCountedThreadSafe> { } ResourcePtr resource; + size_t trace_id = 0; size_t items_count = 0; { ftl::MutexLocker lock(&queue_mutex_); - resource = std::move(queue_.front()); + std::tie(resource, trace_id) = std::move(queue_.front()); queue_.pop(); items_count = queue_.size(); } @@ -115,6 +129,8 @@ class Pipeline : public ftl::RefCountedThreadSafe> { empty_.Signal(); + TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id); + return items_count > 0 ? PipelineConsumeResult::MoreAvailable : PipelineConsumeResult::Done; } @@ -123,12 +139,13 @@ class Pipeline : public ftl::RefCountedThreadSafe> { Semaphore empty_; Semaphore available_; ftl::Mutex queue_mutex_; - std::queue queue_; + std::queue> queue_; + std::atomic_size_t last_trace_id_; - void ProducerCommit(ResourcePtr resource) { + void ProducerCommit(ResourcePtr resource, size_t trace_id) { { ftl::MutexLocker lock(&queue_mutex_); - queue_.emplace(std::move(resource)); + queue_.emplace(std::move(resource), trace_id); } // Ensure the queue mutex is not held as that would be a pessimization.