Skip to content

Commit

Permalink
Switch CallStats to TQ interface + callbacks on the worker thread.
Browse files Browse the repository at this point in the history
Bug: webrtc:11489
Change-Id: I08c4cd42dfa28d88ed9f0aa8c8b2cfb606bf00df
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174240
Commit-Queue: Tommi <[email protected]>
Reviewed-by: Magnus Flodman <[email protected]>
Cr-Commit-Position: refs/heads/master@{#31203}
  • Loading branch information
Tommi authored and Commit Bot committed May 10, 2020
1 parent 674b0c8 commit 822a874
Show file tree
Hide file tree
Showing 11 changed files with 690 additions and 30 deletions.
35 changes: 19 additions & 16 deletions call/call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
#include "system_wrappers/include/cpu_info.h"
#include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/metrics.h"
#include "video/call_stats.h"
#include "video/call_stats2.h"
#include "video/send_delay_stats.h"
#include "video/stats_counter.h"
#include "video/video_receive_stream2.h"
Expand Down Expand Up @@ -157,6 +157,13 @@ bool IsRtcp(const uint8_t* packet, size_t length) {
return rtp_parser.RTCP();
}

TaskQueueBase* GetCurrentTaskQueueOrThread() {
TaskQueueBase* current = TaskQueueBase::Current();
if (!current)
current = rtc::ThreadManager::Instance()->CurrentThread();
return current;
}

} // namespace

namespace internal {
Expand Down Expand Up @@ -440,7 +447,7 @@ Call::Call(Clock* clock,
task_queue_factory_(task_queue_factory),
num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
module_process_thread_(std::move(module_process_thread)),
call_stats_(new CallStats(clock_, module_process_thread_.get())),
call_stats_(new CallStats(clock_, GetCurrentTaskQueueOrThread())),
bitrate_allocator_(new BitrateAllocator(this)),
config_(config),
audio_network_state_(kNetworkDown),
Expand Down Expand Up @@ -472,7 +479,6 @@ Call::Call(Clock* clock,

module_process_thread_->RegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE);
module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE);
}

Expand All @@ -489,7 +495,6 @@ Call::~Call() {
module_process_thread_->DeRegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true));
module_process_thread_->DeRegisterModule(&receive_side_cc_);
module_process_thread_->DeRegisterModule(call_stats_.get());
call_stats_->DeregisterStatsObserver(&receive_side_cc_);

absl::optional<Timestamp> first_sent_packet_ms =
Expand Down Expand Up @@ -625,11 +630,11 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
}
}

AudioSendStream* send_stream =
new AudioSendStream(clock_, config, config_.audio_state,
task_queue_factory_, module_process_thread_.get(),
transport_send_ptr_, bitrate_allocator_.get(),
event_log_, call_stats_.get(), suspended_rtp_state);
AudioSendStream* send_stream = new AudioSendStream(
clock_, config, config_.audio_state, task_queue_factory_,
module_process_thread_.get(), transport_send_ptr_,
bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(),
suspended_rtp_state);
{
WriteLockScoped write_lock(*send_crit_);
RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
Expand Down Expand Up @@ -757,9 +762,9 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(

VideoSendStream* send_stream = new VideoSendStream(
clock_, num_cpu_cores_, module_process_thread_.get(), task_queue_factory_,
call_stats_.get(), transport_send_ptr_, bitrate_allocator_.get(),
video_send_delay_stats_.get(), event_log_, std::move(config),
std::move(encoder_config), suspended_video_send_ssrcs_,
call_stats_->AsRtcpRttStats(), transport_send_ptr_,
bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_,
std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,
suspended_video_payload_states_, std::move(fec_controller));

{
Expand Down Expand Up @@ -837,9 +842,7 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(

RegisterRateObserver();

TaskQueueBase* current = TaskQueueBase::Current();
if (!current)
current = rtc::ThreadManager::Instance()->CurrentThread();
TaskQueueBase* current = GetCurrentTaskQueueOrThread();
RTC_CHECK(current);
VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_,
Expand Down Expand Up @@ -918,7 +921,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
// this locked scope.
receive_stream = new FlexfecReceiveStreamImpl(
clock_, &video_receiver_controller_, config, recovered_packet_receiver,
call_stats_.get(), module_process_thread_.get());
call_stats_->AsRtcpRttStats(), module_process_thread_.get());

RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
receive_rtp_config_.end());
Expand Down
1 change: 1 addition & 0 deletions call/rtp_video_sender_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class RtpVideoSenderTestFixture {
const FieldTrialBasedConfig field_trials_;
RtpTransportControllerSend transport_controller_;
std::unique_ptr<ProcessThread> process_thread_;
// TODO(tommi): Use internal::CallStats.
CallStats call_stats_;
SendStatisticsProxy stats_proxy_;
RateLimiter retransmission_rate_limiter_;
Expand Down
3 changes: 3 additions & 0 deletions video/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ rtc_library("video") {
"buffered_frame_decryptor.h",
"call_stats.cc",
"call_stats.h",
"call_stats2.cc",
"call_stats2.h",
"encoder_rtcp_feedback.cc",
"encoder_rtcp_feedback.h",
"quality_limitation_reason_tracker.cc",
Expand Down Expand Up @@ -492,6 +494,7 @@ if (rtc_include_tests) {
defines = []
sources = [
"buffered_frame_decryptor_unittest.cc",
"call_stats2_unittest.cc",
"call_stats_unittest.cc",
"cpu_scaling_tests.cc",
"encoder_bitrate_adjuster_unittest.cc",
Expand Down
184 changes: 184 additions & 0 deletions video/call_stats2.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/

#include "video/call_stats2.h"

#include <algorithm>
#include <memory>

#include "absl/algorithm/container.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "system_wrappers/include/metrics.h"

namespace webrtc {
namespace internal {
namespace {

void RemoveOldReports(int64_t now, std::list<CallStats::RttTime>* reports) {
static constexpr const int64_t kRttTimeoutMs = 1500;
reports->remove_if(
[&now](CallStats::RttTime& r) { return now - r.time > kRttTimeoutMs; });
}

int64_t GetMaxRttMs(const std::list<CallStats::RttTime>& reports) {
int64_t max_rtt_ms = -1;
for (const CallStats::RttTime& rtt_time : reports)
max_rtt_ms = std::max(rtt_time.rtt, max_rtt_ms);
return max_rtt_ms;
}

int64_t GetAvgRttMs(const std::list<CallStats::RttTime>& reports) {
RTC_DCHECK(!reports.empty());
int64_t sum = 0;
for (std::list<CallStats::RttTime>::const_iterator it = reports.begin();
it != reports.end(); ++it) {
sum += it->rtt;
}
return sum / reports.size();
}

int64_t GetNewAvgRttMs(const std::list<CallStats::RttTime>& reports,
int64_t prev_avg_rtt) {
if (reports.empty())
return -1; // Reset (invalid average).

int64_t cur_rtt_ms = GetAvgRttMs(reports);
if (prev_avg_rtt == -1)
return cur_rtt_ms; // New initial average value.

// Weight factor to apply to the average rtt.
// We weigh the old average at 70% against the new average (30%).
constexpr const float kWeightFactor = 0.3f;
return prev_avg_rtt * (1.0f - kWeightFactor) + cur_rtt_ms * kWeightFactor;
}

} // namespace

CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue)
: clock_(clock),
last_process_time_(clock_->TimeInMilliseconds()),
max_rtt_ms_(-1),
avg_rtt_ms_(-1),
sum_avg_rtt_ms_(0),
num_avg_rtt_(0),
time_of_first_rtt_ms_(-1),
task_queue_(task_queue) {
RTC_DCHECK(task_queue_);
process_thread_checker_.Detach();
task_queue_->PostDelayedTask(
ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }),
kUpdateIntervalMs);
}

CallStats::~CallStats() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
RTC_DCHECK(observers_.empty());

task_safety_flag_->SetNotAlive();

UpdateHistograms();
}

void CallStats::RunTimer() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);

UpdateAndReport();

uint32_t interval =
last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds();

task_queue_->PostDelayedTask(
ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), interval);
}

void CallStats::UpdateAndReport() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);

int64_t now = clock_->TimeInMilliseconds();
last_process_time_ = now;

// |avg_rtt_ms_| is allowed to be read on the construction thread since that's
// the only thread that modifies the value.
int64_t avg_rtt_ms = avg_rtt_ms_;
RemoveOldReports(now, &reports_);
max_rtt_ms_ = GetMaxRttMs(reports_);
avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms);
{
rtc::CritScope lock(&avg_rtt_ms_lock_);
avg_rtt_ms_ = avg_rtt_ms;
}

// If there is a valid rtt, update all observers with the max rtt.
if (max_rtt_ms_ >= 0) {
RTC_DCHECK_GE(avg_rtt_ms, 0);
for (CallStatsObserver* observer : observers_)
observer->OnRttUpdate(avg_rtt_ms, max_rtt_ms_);
// Sum for Histogram of average RTT reported over the entire call.
sum_avg_rtt_ms_ += avg_rtt_ms;
++num_avg_rtt_;
}
}

void CallStats::RegisterStatsObserver(CallStatsObserver* observer) {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
if (!absl::c_linear_search(observers_, observer))
observers_.push_back(observer);
}

void CallStats::DeregisterStatsObserver(CallStatsObserver* observer) {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
observers_.remove(observer);
}

int64_t CallStats::LastProcessedRtt() const {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
// No need for locking since we're on the construction thread.
return avg_rtt_ms_;
}

int64_t CallStats::LastProcessedRttFromProcessThread() const {
RTC_DCHECK_RUN_ON(&process_thread_checker_);
rtc::CritScope lock(&avg_rtt_ms_lock_);
return avg_rtt_ms_;
}

void CallStats::OnRttUpdate(int64_t rtt) {
RTC_DCHECK_RUN_ON(&process_thread_checker_);

int64_t now_ms = clock_->TimeInMilliseconds();
task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this, rtt, now_ms]() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
reports_.push_back(RttTime(rtt, now_ms));
if (time_of_first_rtt_ms_ == -1)
time_of_first_rtt_ms_ = now_ms;
UpdateAndReport();
}));
}

void CallStats::UpdateHistograms() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);

if (time_of_first_rtt_ms_ == -1 || num_avg_rtt_ < 1)
return;

int64_t elapsed_sec =
(clock_->TimeInMilliseconds() - time_of_first_rtt_ms_) / 1000;
if (elapsed_sec >= metrics::kMinRunTimeInSeconds) {
int64_t avg_rtt_ms = (sum_avg_rtt_ms_ + num_avg_rtt_ / 2) / num_avg_rtt_;
RTC_HISTOGRAM_COUNTS_10000(
"WebRTC.Video.AverageRoundTripTimeInMilliseconds", avg_rtt_ms);
}
}

} // namespace internal
} // namespace webrtc
Loading

0 comments on commit 822a874

Please sign in to comment.