diff --git a/call/call.cc b/call/call.cc index b885e3722f..4068db9f00 100644 --- a/call/call.cc +++ b/call/call.cc @@ -58,7 +58,7 @@ #include "system_wrappers/include/cpu_info.h" #include "system_wrappers/include/field_trial.h" #include "system_wrappers/include/metrics.h" -#include "video/call_stats.h" +#include "video/call_stats2.h" #include "video/send_delay_stats.h" #include "video/stats_counter.h" #include "video/video_receive_stream2.h" @@ -157,6 +157,13 @@ bool IsRtcp(const uint8_t* packet, size_t length) { return rtp_parser.RTCP(); } +TaskQueueBase* GetCurrentTaskQueueOrThread() { + TaskQueueBase* current = TaskQueueBase::Current(); + if (!current) + current = rtc::ThreadManager::Instance()->CurrentThread(); + return current; +} + } // namespace namespace internal { @@ -440,7 +447,7 @@ Call::Call(Clock* clock, task_queue_factory_(task_queue_factory), num_cpu_cores_(CpuInfo::DetectNumberOfCores()), module_process_thread_(std::move(module_process_thread)), - call_stats_(new CallStats(clock_, module_process_thread_.get())), + call_stats_(new CallStats(clock_, GetCurrentTaskQueueOrThread())), bitrate_allocator_(new BitrateAllocator(this)), config_(config), audio_network_state_(kNetworkDown), @@ -472,7 +479,6 @@ Call::Call(Clock* clock, module_process_thread_->RegisterModule( receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE); - module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE); module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE); } @@ -489,7 +495,6 @@ Call::~Call() { module_process_thread_->DeRegisterModule( receive_side_cc_.GetRemoteBitrateEstimator(true)); module_process_thread_->DeRegisterModule(&receive_side_cc_); - module_process_thread_->DeRegisterModule(call_stats_.get()); call_stats_->DeregisterStatsObserver(&receive_side_cc_); absl::optional first_sent_packet_ms = @@ -625,11 +630,11 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( } } - AudioSendStream* send_stream = - new AudioSendStream(clock_, config, config_.audio_state, - task_queue_factory_, module_process_thread_.get(), - transport_send_ptr_, bitrate_allocator_.get(), - event_log_, call_stats_.get(), suspended_rtp_state); + AudioSendStream* send_stream = new AudioSendStream( + clock_, config, config_.audio_state, task_queue_factory_, + module_process_thread_.get(), transport_send_ptr_, + bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(), + suspended_rtp_state); { WriteLockScoped write_lock(*send_crit_); RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == @@ -757,9 +762,9 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( VideoSendStream* send_stream = new VideoSendStream( clock_, num_cpu_cores_, module_process_thread_.get(), task_queue_factory_, - call_stats_.get(), transport_send_ptr_, bitrate_allocator_.get(), - video_send_delay_stats_.get(), event_log_, std::move(config), - std::move(encoder_config), suspended_video_send_ssrcs_, + call_stats_->AsRtcpRttStats(), transport_send_ptr_, + bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_, + std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, suspended_video_payload_states_, std::move(fec_controller)); { @@ -837,9 +842,7 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( RegisterRateObserver(); - TaskQueueBase* current = TaskQueueBase::Current(); - if (!current) - current = rtc::ThreadManager::Instance()->CurrentThread(); + TaskQueueBase* current = GetCurrentTaskQueueOrThread(); RTC_CHECK(current); VideoReceiveStream2* receive_stream = new VideoReceiveStream2( task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_, @@ -918,7 +921,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( // this locked scope. receive_stream = new FlexfecReceiveStreamImpl( clock_, &video_receiver_controller_, config, recovered_packet_receiver, - call_stats_.get(), module_process_thread_.get()); + call_stats_->AsRtcpRttStats(), module_process_thread_.get()); RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == receive_rtp_config_.end()); diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 20c6b54e73..a87196111a 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -196,6 +196,7 @@ class RtpVideoSenderTestFixture { const FieldTrialBasedConfig field_trials_; RtpTransportControllerSend transport_controller_; std::unique_ptr process_thread_; + // TODO(tommi): Use internal::CallStats. CallStats call_stats_; SendStatisticsProxy stats_proxy_; RateLimiter retransmission_rate_limiter_; diff --git a/video/BUILD.gn b/video/BUILD.gn index 2e355f7e98..933f676b1f 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -14,6 +14,8 @@ rtc_library("video") { "buffered_frame_decryptor.h", "call_stats.cc", "call_stats.h", + "call_stats2.cc", + "call_stats2.h", "encoder_rtcp_feedback.cc", "encoder_rtcp_feedback.h", "quality_limitation_reason_tracker.cc", @@ -492,6 +494,7 @@ if (rtc_include_tests) { defines = [] sources = [ "buffered_frame_decryptor_unittest.cc", + "call_stats2_unittest.cc", "call_stats_unittest.cc", "cpu_scaling_tests.cc", "encoder_bitrate_adjuster_unittest.cc", diff --git a/video/call_stats2.cc b/video/call_stats2.cc new file mode 100644 index 0000000000..af0da0f702 --- /dev/null +++ b/video/call_stats2.cc @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/call_stats2.h" + +#include +#include + +#include "absl/algorithm/container.h" +#include "modules/utility/include/process_thread.h" +#include "rtc_base/checks.h" +#include "rtc_base/location.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "system_wrappers/include/metrics.h" + +namespace webrtc { +namespace internal { +namespace { + +void RemoveOldReports(int64_t now, std::list* reports) { + static constexpr const int64_t kRttTimeoutMs = 1500; + reports->remove_if( + [&now](CallStats::RttTime& r) { return now - r.time > kRttTimeoutMs; }); +} + +int64_t GetMaxRttMs(const std::list& reports) { + int64_t max_rtt_ms = -1; + for (const CallStats::RttTime& rtt_time : reports) + max_rtt_ms = std::max(rtt_time.rtt, max_rtt_ms); + return max_rtt_ms; +} + +int64_t GetAvgRttMs(const std::list& reports) { + RTC_DCHECK(!reports.empty()); + int64_t sum = 0; + for (std::list::const_iterator it = reports.begin(); + it != reports.end(); ++it) { + sum += it->rtt; + } + return sum / reports.size(); +} + +int64_t GetNewAvgRttMs(const std::list& reports, + int64_t prev_avg_rtt) { + if (reports.empty()) + return -1; // Reset (invalid average). + + int64_t cur_rtt_ms = GetAvgRttMs(reports); + if (prev_avg_rtt == -1) + return cur_rtt_ms; // New initial average value. + + // Weight factor to apply to the average rtt. + // We weigh the old average at 70% against the new average (30%). + constexpr const float kWeightFactor = 0.3f; + return prev_avg_rtt * (1.0f - kWeightFactor) + cur_rtt_ms * kWeightFactor; +} + +} // namespace + +CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue) + : clock_(clock), + last_process_time_(clock_->TimeInMilliseconds()), + max_rtt_ms_(-1), + avg_rtt_ms_(-1), + sum_avg_rtt_ms_(0), + num_avg_rtt_(0), + time_of_first_rtt_ms_(-1), + task_queue_(task_queue) { + RTC_DCHECK(task_queue_); + process_thread_checker_.Detach(); + task_queue_->PostDelayedTask( + ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), + kUpdateIntervalMs); +} + +CallStats::~CallStats() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + RTC_DCHECK(observers_.empty()); + + task_safety_flag_->SetNotAlive(); + + UpdateHistograms(); +} + +void CallStats::RunTimer() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + + UpdateAndReport(); + + uint32_t interval = + last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds(); + + task_queue_->PostDelayedTask( + ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), interval); +} + +void CallStats::UpdateAndReport() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + + int64_t now = clock_->TimeInMilliseconds(); + last_process_time_ = now; + + // |avg_rtt_ms_| is allowed to be read on the construction thread since that's + // the only thread that modifies the value. + int64_t avg_rtt_ms = avg_rtt_ms_; + RemoveOldReports(now, &reports_); + max_rtt_ms_ = GetMaxRttMs(reports_); + avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms); + { + rtc::CritScope lock(&avg_rtt_ms_lock_); + avg_rtt_ms_ = avg_rtt_ms; + } + + // If there is a valid rtt, update all observers with the max rtt. + if (max_rtt_ms_ >= 0) { + RTC_DCHECK_GE(avg_rtt_ms, 0); + for (CallStatsObserver* observer : observers_) + observer->OnRttUpdate(avg_rtt_ms, max_rtt_ms_); + // Sum for Histogram of average RTT reported over the entire call. + sum_avg_rtt_ms_ += avg_rtt_ms; + ++num_avg_rtt_; + } +} + +void CallStats::RegisterStatsObserver(CallStatsObserver* observer) { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + if (!absl::c_linear_search(observers_, observer)) + observers_.push_back(observer); +} + +void CallStats::DeregisterStatsObserver(CallStatsObserver* observer) { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + observers_.remove(observer); +} + +int64_t CallStats::LastProcessedRtt() const { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + // No need for locking since we're on the construction thread. + return avg_rtt_ms_; +} + +int64_t CallStats::LastProcessedRttFromProcessThread() const { + RTC_DCHECK_RUN_ON(&process_thread_checker_); + rtc::CritScope lock(&avg_rtt_ms_lock_); + return avg_rtt_ms_; +} + +void CallStats::OnRttUpdate(int64_t rtt) { + RTC_DCHECK_RUN_ON(&process_thread_checker_); + + int64_t now_ms = clock_->TimeInMilliseconds(); + task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this, rtt, now_ms]() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + reports_.push_back(RttTime(rtt, now_ms)); + if (time_of_first_rtt_ms_ == -1) + time_of_first_rtt_ms_ = now_ms; + UpdateAndReport(); + })); +} + +void CallStats::UpdateHistograms() { + RTC_DCHECK_RUN_ON(&construction_thread_checker_); + + if (time_of_first_rtt_ms_ == -1 || num_avg_rtt_ < 1) + return; + + int64_t elapsed_sec = + (clock_->TimeInMilliseconds() - time_of_first_rtt_ms_) / 1000; + if (elapsed_sec >= metrics::kMinRunTimeInSeconds) { + int64_t avg_rtt_ms = (sum_avg_rtt_ms_ + num_avg_rtt_ / 2) / num_avg_rtt_; + RTC_HISTOGRAM_COUNTS_10000( + "WebRTC.Video.AverageRoundTripTimeInMilliseconds", avg_rtt_ms); + } +} + +} // namespace internal +} // namespace webrtc diff --git a/video/call_stats2.h b/video/call_stats2.h new file mode 100644 index 0000000000..f06d33daf7 --- /dev/null +++ b/video/call_stats2.h @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef VIDEO_CALL_STATS2_H_ +#define VIDEO_CALL_STATS2_H_ + +#include +#include + +#include "modules/include/module_common_types.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "rtc_base/constructor_magic.h" +#include "rtc_base/critical_section.h" +#include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_queue.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "system_wrappers/include/clock.h" + +namespace webrtc { +namespace internal { + +class CallStats { + public: + // Time interval for updating the observers. + static constexpr int64_t kUpdateIntervalMs = 1000; + + CallStats(Clock* clock, TaskQueueBase* task_queue); + ~CallStats(); + + // Expose an RtcpRttStats implementation without inheriting from RtcpRttStats. + // That allows us to separate the threading model of how RtcpRttStats is + // used (mostly on a process thread) and how CallStats is used (mostly on + // the TQ/worker thread). Since for both cases, there is a LastProcessedRtt() + // method, this separation allows us to not need a lock for either. + RtcpRttStats* AsRtcpRttStats() { return &rtcp_rtt_stats_impl_; } + + // Registers/deregisters a new observer to receive statistics updates. + // Must be called from the construction thread. + void RegisterStatsObserver(CallStatsObserver* observer); + void DeregisterStatsObserver(CallStatsObserver* observer); + + // Expose |LastProcessedRtt()| from RtcpRttStats to the public interface, as + // it is the part of the API that is needed by direct users of CallStats. + // TODO(tommi): Threading or lifetime guarantees are not explicit in how + // CallStats is used as RtcpRttStats or how pointers are cached in a + // few different places (distributed via Call). It would be good to clarify + // from what thread/TQ calls to OnRttUpdate and LastProcessedRtt need to be + // allowed. + int64_t LastProcessedRtt() const; + + // Exposed for tests to test histogram support. + void UpdateHistogramsForTest() { UpdateHistograms(); } + + // Helper struct keeping track of the time a rtt value is reported. + struct RttTime { + RttTime(int64_t new_rtt, int64_t rtt_time) : rtt(new_rtt), time(rtt_time) {} + const int64_t rtt; + const int64_t time; + }; + + private: + // Part of the RtcpRttStats implementation. Called by RtcpRttStatsImpl. + void OnRttUpdate(int64_t rtt); + int64_t LastProcessedRttFromProcessThread() const; + + void RunTimer(); + + void UpdateAndReport(); + + // This method must only be called when the process thread is not + // running, and from the construction thread. + void UpdateHistograms(); + + class RtcpRttStatsImpl : public RtcpRttStats { + public: + explicit RtcpRttStatsImpl(CallStats* owner) : owner_(owner) { + process_thread_checker_.Detach(); + } + ~RtcpRttStatsImpl() override = default; + + private: + void OnRttUpdate(int64_t rtt) override { + RTC_DCHECK_RUN_ON(&process_thread_checker_); + owner_->OnRttUpdate(rtt); + } + + int64_t LastProcessedRtt() const override { + RTC_DCHECK_RUN_ON(&process_thread_checker_); + return owner_->LastProcessedRttFromProcessThread(); + } + + CallStats* const owner_; + SequenceChecker process_thread_checker_; + } rtcp_rtt_stats_impl_{this}; + + Clock* const clock_; + + // The last time 'Process' resulted in statistic update. + int64_t last_process_time_ RTC_GUARDED_BY(construction_thread_checker_); + // The last RTT in the statistics update (zero if there is no valid estimate). + int64_t max_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_); + + // Accessed from two separate threads. + // |avg_rtt_ms_| may be read on the construction thread without a lock. + // |avg_rtt_ms_lock_| must be held elsewhere for reading. + // |avg_rtt_ms_lock_| must be held on the construction thread for writing. + int64_t avg_rtt_ms_; + + // Protects |avg_rtt_ms_|. + rtc::CriticalSection avg_rtt_ms_lock_; + + // |sum_avg_rtt_ms_|, |num_avg_rtt_| and |time_of_first_rtt_ms_| are only used + // on the ProcessThread when running. When the Process Thread is not running, + // (and only then) they can be used in UpdateHistograms(), usually called from + // the dtor. + int64_t sum_avg_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_); + int64_t num_avg_rtt_ RTC_GUARDED_BY(construction_thread_checker_); + int64_t time_of_first_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_); + + // All Rtt reports within valid time interval, oldest first. + std::list reports_ RTC_GUARDED_BY(construction_thread_checker_); + + // Observers getting stats reports. + // When attached to ProcessThread, this is read-only. In order to allow + // modification, we detach from the process thread while the observer + // list is updated, to avoid races. This allows us to not require a lock + // for the observers_ list, which makes the most common case lock free. + std::list observers_; + + SequenceChecker construction_thread_checker_; + SequenceChecker process_thread_checker_; + TaskQueueBase* const task_queue_; + + // Used to signal destruction to potentially pending tasks. + PendingTaskSafetyFlag::Pointer task_safety_flag_ = + PendingTaskSafetyFlag::Create(); + + RTC_DISALLOW_COPY_AND_ASSIGN(CallStats); +}; + +} // namespace internal +} // namespace webrtc + +#endif // VIDEO_CALL_STATS2_H_ diff --git a/video/call_stats2_unittest.cc b/video/call_stats2_unittest.cc new file mode 100644 index 0000000000..58af6fd386 --- /dev/null +++ b/video/call_stats2_unittest.cc @@ -0,0 +1,311 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "video/call_stats2.h" + +#include + +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/utility/include/process_thread.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "rtc_base/thread.h" +#include "system_wrappers/include/metrics.h" +#include "test/gmock.h" +#include "test/gtest.h" +#include "test/run_loop.h" + +using ::testing::AnyNumber; +using ::testing::InvokeWithoutArgs; +using ::testing::Return; + +namespace webrtc { +namespace internal { + +class MockStatsObserver : public CallStatsObserver { + public: + MockStatsObserver() {} + virtual ~MockStatsObserver() {} + + MOCK_METHOD2(OnRttUpdate, void(int64_t, int64_t)); +}; + +class CallStats2Test : public ::testing::Test { + public: + CallStats2Test() { process_thread_->Start(); } + + ~CallStats2Test() override { process_thread_->Stop(); } + + // Queues an rtt update call on the process thread. + void AsyncSimulateRttUpdate(int64_t rtt) { + RtcpRttStats* rtcp_rtt_stats = call_stats_.AsRtcpRttStats(); + process_thread_->PostTask(ToQueuedTask( + [rtcp_rtt_stats, rtt] { rtcp_rtt_stats->OnRttUpdate(rtt); })); + } + + protected: + void FlushProcessAndWorker() { + process_thread_->PostTask( + ToQueuedTask([this] { loop_.PostTask([this]() { loop_.Quit(); }); })); + loop_.Run(); + } + + test::RunLoop loop_; + std::unique_ptr process_thread_{ + ProcessThread::Create("CallStats")}; + // Note: Since rtc::Thread doesn't support injecting a Clock, we're going + // to be using a mix of the fake clock (used by CallStats) as well as the + // system clock (used by rtc::Thread). This isn't ideal and will result in + // the tests taking longer to execute in some cases than they need to. + SimulatedClock fake_clock_{12345}; + CallStats call_stats_{&fake_clock_, loop_.task_queue()}; +}; + +TEST_F(CallStats2Test, AddAndTriggerCallback) { + static constexpr const int64_t kRtt = 25; + + MockStatsObserver stats_observer; + EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { loop_.Quit(); })); + + call_stats_.RegisterStatsObserver(&stats_observer); + EXPECT_EQ(-1, call_stats_.LastProcessedRtt()); + + AsyncSimulateRttUpdate(kRtt); + loop_.Run(); + + EXPECT_EQ(kRtt, call_stats_.LastProcessedRtt()); + + call_stats_.DeregisterStatsObserver(&stats_observer); +} + +TEST_F(CallStats2Test, ProcessTime) { + static constexpr const int64_t kRtt = 100; + static constexpr const int64_t kRtt2 = 80; + + MockStatsObserver stats_observer; + + EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)) + .Times(2) + .WillOnce(InvokeWithoutArgs([this] { + // Advance clock and verify we get an update. + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + })) + .WillRepeatedly(InvokeWithoutArgs([this] { + AsyncSimulateRttUpdate(kRtt2); + // Advance clock just too little to get an update. + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs - 1); + })); + + // In case you're reading this and wondering how this number is arrived at, + // please see comments in the ChangeRtt test that go into some detail. + static constexpr const int64_t kLastAvg = 94; + EXPECT_CALL(stats_observer, OnRttUpdate(kLastAvg, kRtt2)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { loop_.Quit(); })); + + call_stats_.RegisterStatsObserver(&stats_observer); + + AsyncSimulateRttUpdate(kRtt); + loop_.Run(); + + call_stats_.DeregisterStatsObserver(&stats_observer); +} + +// Verify all observers get correct estimates and observers can be added and +// removed. +TEST_F(CallStats2Test, MultipleObservers) { + MockStatsObserver stats_observer_1; + call_stats_.RegisterStatsObserver(&stats_observer_1); + // Add the second observer twice, there should still be only one report to the + // observer. + MockStatsObserver stats_observer_2; + call_stats_.RegisterStatsObserver(&stats_observer_2); + call_stats_.RegisterStatsObserver(&stats_observer_2); + + static constexpr const int64_t kRtt = 100; + + // Verify both observers are updated. + EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillRepeatedly(Return()); + EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([this] { loop_.Quit(); })) + .WillRepeatedly(Return()); + AsyncSimulateRttUpdate(kRtt); + loop_.Run(); + + // Deregister the second observer and verify update is only sent to the first + // observer. + call_stats_.DeregisterStatsObserver(&stats_observer_2); + + EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([this] { loop_.Quit(); })) + .WillRepeatedly(Return()); + EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(0); + AsyncSimulateRttUpdate(kRtt); + loop_.Run(); + + // Deregister the first observer. + call_stats_.DeregisterStatsObserver(&stats_observer_1); + + // Now make sure we don't get any callbacks. + EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)).Times(0); + EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(0); + AsyncSimulateRttUpdate(kRtt); + + // Flush the queue on the process thread to make sure we return after + // Process() has been called. + FlushProcessAndWorker(); +} + +// Verify increasing and decreasing rtt triggers callbacks with correct values. +TEST_F(CallStats2Test, ChangeRtt) { + // NOTE: This test assumes things about how old reports are removed + // inside of call_stats.cc. The threshold ms value is 1500ms, but it's not + // clear here that how the clock is advanced, affects that algorithm and + // subsequently the average reported rtt. + + MockStatsObserver stats_observer; + call_stats_.RegisterStatsObserver(&stats_observer); + + static constexpr const int64_t kFirstRtt = 100; + static constexpr const int64_t kLowRtt = kFirstRtt - 20; + static constexpr const int64_t kHighRtt = kFirstRtt + 20; + + EXPECT_CALL(stats_observer, OnRttUpdate(kFirstRtt, kFirstRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { + fake_clock_.AdvanceTimeMilliseconds(1000); + AsyncSimulateRttUpdate(kHighRtt); // Reported at T1 (1000ms). + })); + + // NOTE: This relies on the internal algorithms of call_stats.cc. + // There's a weight factor there (0.3), that weighs the previous average to + // the new one by 70%, so the number 103 in this case is arrived at like so: + // (100) / 1 * 0.7 + (100+120)/2 * 0.3 = 103 + static constexpr const int64_t kAvgRtt1 = 103; + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kHighRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { + // This interacts with an internal implementation detail in call_stats + // that decays the oldest rtt value. See more below. + fake_clock_.AdvanceTimeMilliseconds(1000); + AsyncSimulateRttUpdate(kLowRtt); // Reported at T2 (2000ms). + })); + + // Increase time enough for a new update, but not too much to make the + // rtt invalid. Report a lower rtt and verify the old/high value still is sent + // in the callback. + + // Here, enough time must have passed in order to remove exactly the first + // report and nothing else (>1500ms has passed since the first rtt). + // So, this value is arrived by doing: + // (kAvgRtt1)/1 * 0.7 + (kHighRtt+kLowRtt)/2 * 0.3 = 102.1 + static constexpr const int64_t kAvgRtt2 = 102; + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kHighRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { + // Advance time to make the high report invalid, the lower rtt should + // now be in the callback. + fake_clock_.AdvanceTimeMilliseconds(1000); + })); + + static constexpr const int64_t kAvgRtt3 = 95; + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt3, kLowRtt)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { loop_.Quit(); })); + + // Trigger the first rtt value and set off the chain of callbacks. + AsyncSimulateRttUpdate(kFirstRtt); // Reported at T0 (0ms). + loop_.Run(); + + call_stats_.DeregisterStatsObserver(&stats_observer); +} + +TEST_F(CallStats2Test, LastProcessedRtt) { + MockStatsObserver stats_observer; + call_stats_.RegisterStatsObserver(&stats_observer); + + static constexpr const int64_t kRttLow = 10; + static constexpr const int64_t kRttHigh = 30; + // The following two average numbers dependend on average + weight + // calculations in call_stats.cc. + static constexpr const int64_t kAvgRtt1 = 13; + static constexpr const int64_t kAvgRtt2 = 15; + + EXPECT_CALL(stats_observer, OnRttUpdate(kRttLow, kRttLow)) + .Times(1) + .WillOnce(InvokeWithoutArgs([this] { + EXPECT_EQ(kRttLow, call_stats_.LastProcessedRtt()); + // Don't advance the clock to make sure that low and high rtt values + // are associated with the same time stamp. + AsyncSimulateRttUpdate(kRttHigh); + })); + + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kRttHigh)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([this] { + EXPECT_EQ(kAvgRtt1, call_stats_.LastProcessedRtt()); + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + AsyncSimulateRttUpdate(kRttLow); + AsyncSimulateRttUpdate(kRttHigh); + })) + .WillRepeatedly(Return()); + + EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kRttHigh)) + .Times(AnyNumber()) + .WillOnce(InvokeWithoutArgs([this] { + EXPECT_EQ(kAvgRtt2, call_stats_.LastProcessedRtt()); + loop_.Quit(); + })) + .WillRepeatedly(Return()); + + // Set a first values and verify that LastProcessedRtt initially returns the + // average rtt. + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + AsyncSimulateRttUpdate(kRttLow); + loop_.Run(); + EXPECT_EQ(kAvgRtt2, call_stats_.LastProcessedRtt()); + + call_stats_.DeregisterStatsObserver(&stats_observer); +} + +TEST_F(CallStats2Test, ProducesHistogramMetrics) { + metrics::Reset(); + static constexpr const int64_t kRtt = 123; + MockStatsObserver stats_observer; + call_stats_.RegisterStatsObserver(&stats_observer); + EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)) + .Times(AnyNumber()) + .WillRepeatedly(InvokeWithoutArgs([this] { loop_.Quit(); })); + + AsyncSimulateRttUpdate(kRtt); + loop_.Run(); + fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds * + CallStats::kUpdateIntervalMs); + AsyncSimulateRttUpdate(kRtt); + loop_.Run(); + + call_stats_.DeregisterStatsObserver(&stats_observer); + + call_stats_.UpdateHistogramsForTest(); + + EXPECT_METRIC_EQ(1, metrics::NumSamples( + "WebRTC.Video.AverageRoundTripTimeInMilliseconds")); + EXPECT_METRIC_EQ( + 1, metrics::NumEvents("WebRTC.Video.AverageRoundTripTimeInMilliseconds", + kRtt)); +} + +} // namespace internal +} // namespace webrtc diff --git a/video/receive_statistics_proxy2.cc b/video/receive_statistics_proxy2.cc index 0fda7c8aee..31a0dc17d7 100644 --- a/video/receive_statistics_proxy2.cc +++ b/video/receive_statistics_proxy2.cc @@ -1022,11 +1022,11 @@ void ReceiveStatisticsProxy::OnStreamInactive() { } void ReceiveStatisticsProxy::OnRttUpdate(int64_t avg_rtt_ms) { - // TODO(bugs.webrtc.org/11489): This method is currently never called except - // from a unit test, GetStatsReportsDecodeTimingStats, and even then it has no - // effect. Once 11490 items in video_receive_stream2.cc have been addressed, - // we can uncomment the following: - // RTC_DCHECK_RUN_ON(&main_thread_); + RTC_DCHECK_RUN_ON(&main_thread_); + // TODO(bugs.webrtc.org/11489): Now that this method is being called, as part + // of fixing 11490, we can uncomment the below line. However, since it will + // affect stats, that change will be landed as a separate CL. + // avg_rtt_ms_ = avg_rtt_ms; } diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc index 1a9a3e8026..116cf2879b 100644 --- a/video/rtp_streams_synchronizer2.cc +++ b/video/rtp_streams_synchronizer2.cc @@ -75,6 +75,15 @@ void RtpStreamsSynchronizer::QueueTimer() { timer_running_ = true; uint32_t delay = kSyncIntervalMs - (rtc::TimeNanos() - last_sync_time_) / rtc::kNumNanosecsPerMillisec; + if (delay > kSyncIntervalMs) { + // TODO(tommi): |linux_chromium_tsan_rel_ng| bot has shown a failure when + // running WebRtcBrowserTest.CallAndModifyStream, indicating that the + // underlying clock is not reliable. Possibly there's a fake clock being + // used as the tests are flaky. Look into and fix. + RTC_LOG(LS_ERROR) << "Unexpected timer value: " << delay; + delay = kSyncIntervalMs; + } + RTC_DCHECK_LE(delay, kSyncIntervalMs); task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] { if (!safety->alive()) diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index a6c3f6533b..510c2602c4 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -47,7 +47,7 @@ #include "rtc_base/trace_event.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/field_trial.h" -#include "video/call_stats.h" +#include "video/call_stats2.h" #include "video/frame_dumping_decoder.h" #include "video/receive_statistics_proxy.h" @@ -203,7 +203,7 @@ VideoReceiveStream2::VideoReceiveStream2( video_receiver_(clock_, timing_.get()), rtp_video_stream_receiver_(clock_, &transport_adapter_, - call_stats, + call_stats->AsRtcpRttStats(), packet_router, &config_, rtp_receive_statistics_.get(), @@ -364,8 +364,6 @@ void VideoReceiveStream2::Start() { // Make sure we register as a stats observer *after* we've prepared the // |video_stream_decoder_|. - // TODO(webrtc:11489): Make call_stats_ not depend on ProcessThread and - // make callbacks on the worker thread (TQ). call_stats_->RegisterStatsObserver(this); // Start decoding on task queue. @@ -568,12 +566,10 @@ void VideoReceiveStream2::OnCompleteFrame( } void VideoReceiveStream2::OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) { - RTC_DCHECK_RUN_ON(&module_process_sequence_checker_); - // TODO(webrtc:11489, webrtc:11490): Once call_stats_ does not depend on - // ProcessThread, this callback should happen on the worker thread. Then we - // can share the avg_rtt_ms with ReceiveStatisticsProxy. + RTC_DCHECK_RUN_ON(&worker_sequence_checker_); frame_buffer_->UpdateRtt(max_rtt_ms); rtp_video_stream_receiver_.UpdateRtt(max_rtt_ms); + stats_proxy_.OnRttUpdate(avg_rtt_ms); } uint32_t VideoReceiveStream2::id() const { diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index 9f32c1d6e4..bbed08a7a6 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -36,7 +36,6 @@ namespace webrtc { -class CallStats; class ProcessThread; class RTPFragmentationHeader; class RtpStreamReceiverInterface; @@ -46,6 +45,8 @@ class VCMTiming; namespace internal { +class CallStats; + // Utility struct for grabbing metadata from a VideoFrame and processing it // asynchronously without needing the actual frame data. // Additionally the caller can bundle information from the current clock diff --git a/video/video_send_stream_impl_unittest.cc b/video/video_send_stream_impl_unittest.cc index 5c5ca1eaee..532e035e2b 100644 --- a/video/video_send_stream_impl_unittest.cc +++ b/video/video_send_stream_impl_unittest.cc @@ -155,6 +155,7 @@ class VideoSendStreamImplTest : public ::testing::Test { SendDelayStats send_delay_stats_; TaskQueueForTest test_queue_; std::unique_ptr process_thread_; + // TODO(tommi): Use internal::CallStats CallStats call_stats_; SendStatisticsProxy stats_proxy_; PacketRouter packet_router_;