Skip to content

Commit

Permalink
Change the first-packet-received notification in Channel.
Browse files Browse the repository at this point in the history
This changes the notification to a single std::function pointer
instead of being a sigslot::signal1<> collection.

Summary:

* Remove SignalFirstPacketReceived_, the last sigslot member variable.
  (still inherits from sigslot::has_slots<>)
* BaseChannel doesn't post to the signaling thread anymore. The only
  reason that remains for the signaling_thread_ variable, is for
  thread checking.
* Remove BaseChannel's reliance on MessageHandlerAutoCleanup
  (still inherits from MessageHandler)

RtpTransceiver is the consumer of this event. That class is also the
class that sits between the PC classes and the channel object, holding
a pointer to the channel and managing calls that come in on the
signaling thread, such as SetChannel. The responsibility of delivering
the first packet received on the signaling thread is now with
RtpTransceiver:

* RtpTransceiver always requires a ChannelManager instance. Previously
  this variable was sometimes set, but it's now required.
* Updated tests in rtp_transceiver_unittest.cc to include a
  ChannelManager as well as fix them to include call expectations for
  mock sender and receivers.

Bug: webrtc:11993, webrtc:11988
Change-Id: If49d6be157cd7599fa6fe3a42cd0a363464e3a74
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215979
Commit-Queue: Tommi <[email protected]>
Reviewed-by: Niels Moller <[email protected]>
Cr-Commit-Position: refs/heads/master@{#33853}
  • Loading branch information
Tommi authored and WebRTC LUCI CQ committed Apr 27, 2021
1 parent 87f7090 commit 99c8a80
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 68 deletions.
2 changes: 2 additions & 0 deletions pc/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ rtc_library("rtp_transceiver") {
"../rtc_base:macromagic",
"../rtc_base:refcount",
"../rtc_base:threading",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/third_party/sigslot",
]
absl_deps = [
Expand Down
23 changes: 10 additions & 13 deletions pc/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ enum {
MSG_SEND_RTCP_PACKET,
MSG_READYTOSENDDATA,
MSG_DATARECEIVED,
MSG_FIRSTPACKETRECEIVED,
};

static void SafeSetError(const std::string& message, std::string* error_desc) {
Expand Down Expand Up @@ -156,7 +155,6 @@ BaseChannel::~BaseChannel() {

// Eats any outstanding messages or packets.
alive_->SetNotAlive();
signaling_thread_->Clear(this);
// The media channel is destroyed at the end of the destructor, since it
// is a std::unique_ptr. The transport channel (rtp_transport) must outlive
// the media channel.
Expand Down Expand Up @@ -411,9 +409,11 @@ void BaseChannel::OnNetworkRouteChanged(
media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
}

sigslot::signal1<ChannelInterface*>& BaseChannel::SignalFirstPacketReceived() {
RTC_DCHECK_RUN_ON(signaling_thread_);
return SignalFirstPacketReceived_;
void BaseChannel::SetFirstPacketReceivedCallback(
std::function<void()> callback) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(!on_first_packet_received_ || !callback);
on_first_packet_received_ = std::move(callback);
}

void BaseChannel::OnTransportReadyToSend(bool ready) {
Expand Down Expand Up @@ -490,16 +490,18 @@ bool BaseChannel::SendPacket(bool rtcp,
}

void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) {
RTC_DCHECK_RUN_ON(network_thread());

// Take packet time from the |parsed_packet|.
// RtpPacketReceived.arrival_time_ms = (timestamp_us + 500) / 1000;
int64_t packet_time_us = -1;
if (parsed_packet.arrival_time_ms() > 0) {
packet_time_us = parsed_packet.arrival_time_ms() * 1000;
}

if (!has_received_packet_) {
has_received_packet_ = true;
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED);
if (on_first_packet_received_) {
on_first_packet_received_();
on_first_packet_received_ = nullptr;
}

if (!srtp_active() && srtp_required_) {
Expand Down Expand Up @@ -830,11 +832,6 @@ void BaseChannel::OnMessage(rtc::Message* pmsg) {
delete data;
break;
}
case MSG_FIRSTPACKETRECEIVED: {
RTC_DCHECK_RUN_ON(signaling_thread_);
SignalFirstPacketReceived_(this);
break;
}
}
}

Expand Down
13 changes: 8 additions & 5 deletions pc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,12 @@ struct CryptoParams;
// NetworkInterface.

class BaseChannel : public ChannelInterface,
public rtc::MessageHandlerAutoCleanup,
// TODO(tommi): Remove MessageHandler inheritance.
public rtc::MessageHandler,
// TODO(tommi): Remove has_slots inheritance.
public sigslot::has_slots<>,
// TODO(tommi): Consider implementing these interfaces
// via composition.
public MediaChannel::NetworkInterface,
public webrtc::RtpPacketSinkInterface {
public:
Expand Down Expand Up @@ -175,7 +179,7 @@ class BaseChannel : public ChannelInterface,
}

// Used for latency measurements.
sigslot::signal1<ChannelInterface*>& SignalFirstPacketReceived() override;
void SetFirstPacketReceivedCallback(std::function<void()> callback) override;

// From RtpTransport - public for testing only
void OnTransportReadyToSend(bool ready);
Expand Down Expand Up @@ -319,12 +323,11 @@ class BaseChannel : public ChannelInterface,
rtc::Thread* const network_thread_;
rtc::Thread* const signaling_thread_;
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> alive_;
sigslot::signal1<ChannelInterface*> SignalFirstPacketReceived_
RTC_GUARDED_BY(signaling_thread_);

const std::string content_name_;

bool has_received_packet_ = false;
std::function<void()> on_first_packet_received_
RTC_GUARDED_BY(network_thread());

// Won't be set when using raw packet transports. SDP-specific thing.
// TODO(bugs.webrtc.org/12230): Written on network thread, read on
Expand Down
3 changes: 2 additions & 1 deletion pc/channel_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class ChannelInterface {
virtual void Enable(bool enable) = 0;

// Used for latency measurements.
virtual sigslot::signal1<ChannelInterface*>& SignalFirstPacketReceived() = 0;
virtual void SetFirstPacketReceivedCallback(
std::function<void()> callback) = 0;

// Channel control
virtual bool SetLocalContent(const MediaContentDescription* content,
Expand Down
6 changes: 4 additions & 2 deletions pc/peer_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,12 @@ RTCError PeerConnection::Initialize(
if (!IsUnifiedPlan()) {
rtp_manager()->transceivers()->Add(
RtpTransceiverProxyWithInternal<RtpTransceiver>::Create(
signaling_thread(), new RtpTransceiver(cricket::MEDIA_TYPE_AUDIO)));
signaling_thread(),
new RtpTransceiver(cricket::MEDIA_TYPE_AUDIO, channel_manager())));
rtp_manager()->transceivers()->Add(
RtpTransceiverProxyWithInternal<RtpTransceiver>::Create(
signaling_thread(), new RtpTransceiver(cricket::MEDIA_TYPE_VIDEO)));
signaling_thread(),
new RtpTransceiver(cricket::MEDIA_TYPE_VIDEO, channel_manager())));
}

int delay_ms = configuration.report_usage_pattern_delay_ms
Expand Down
61 changes: 47 additions & 14 deletions pc/rtp_transceiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "pc/session_description.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread.h"

namespace webrtc {
Expand Down Expand Up @@ -112,12 +113,16 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() {

} // namespace

RtpTransceiver::RtpTransceiver(cricket::MediaType media_type)
RtpTransceiver::RtpTransceiver(
cricket::MediaType media_type,
cricket::ChannelManager* channel_manager /* = nullptr*/)
: thread_(GetCurrentTaskQueueOrThread()),
unified_plan_(false),
media_type_(media_type) {
media_type_(media_type),
channel_manager_(channel_manager) {
RTC_DCHECK(media_type == cricket::MEDIA_TYPE_AUDIO ||
media_type == cricket::MEDIA_TYPE_VIDEO);
RTC_DCHECK(channel_manager_);
}

RtpTransceiver::RtpTransceiver(
Expand All @@ -136,46 +141,73 @@ RtpTransceiver::RtpTransceiver(
RTC_DCHECK(media_type_ == cricket::MEDIA_TYPE_AUDIO ||
media_type_ == cricket::MEDIA_TYPE_VIDEO);
RTC_DCHECK_EQ(sender->media_type(), receiver->media_type());
RTC_DCHECK(channel_manager_);
senders_.push_back(sender);
receivers_.push_back(receiver);
}

RtpTransceiver::~RtpTransceiver() {
// TODO(tommi): On Android, when running PeerConnectionClientTest (e.g.
// PeerConnectionClientTest#testCameraSwitch), the instance doesn't get
// deleted on `thread_`. See if we can fix that.
if (!stopped_) {
RTC_DCHECK_RUN_ON(thread_);
StopInternal();
}
}

void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) {
RTC_DCHECK_RUN_ON(thread_);
// Cannot set a non-null channel on a stopped transceiver.
if (stopped_ && channel) {
return;
}

RTC_DCHECK(channel || channel_);

RTC_LOG_THREAD_BLOCK_COUNT();

if (channel_) {
signaling_thread_safety_->SetNotAlive();
signaling_thread_safety_ = nullptr;
}

if (channel) {
RTC_DCHECK_EQ(media_type(), channel->media_type());
signaling_thread_safety_ = PendingTaskSafetyFlag::Create();
}

if (channel_) {
channel_->SignalFirstPacketReceived().disconnect(this);
}
// An alternative to this, could be to require SetChannel to be called
// on the network thread. The channel object operates for the most part
// on the network thread, as part of its initialization being on the network
// thread is required, so setting a channel object as part of the construction
// (without thread hopping) might be the more efficient thing to do than
// how SetChannel works today.
// Similarly, if the channel() accessor is limited to the network thread, that
// helps with keeping the channel implementation requirements being met and
// avoids synchronization for accessing the pointer or network related state.
channel_manager_->network_thread()->Invoke<void>(RTC_FROM_HERE, [&]() {
if (channel_) {
channel_->SetFirstPacketReceivedCallback(nullptr);
}

channel_ = channel;
channel_ = channel;

if (channel_) {
channel_->SignalFirstPacketReceived().connect(
this, &RtpTransceiver::OnFirstPacketReceived);
}
if (channel_) {
channel_->SetFirstPacketReceivedCallback(
[thread = thread_, flag = signaling_thread_safety_, this]() mutable {
thread->PostTask(ToQueuedTask(
std::move(flag), [this]() { OnFirstPacketReceived(); }));
});
}
});

for (const auto& sender : senders_) {
sender->internal()->SetMediaChannel(channel_ ? channel_->media_channel()
: nullptr);
}

RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0);
RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1);

for (const auto& receiver : receivers_) {
if (!channel_) {
Expand All @@ -188,12 +220,11 @@ void RtpTransceiver::SetChannel(cricket::ChannelInterface* channel) {
receiver->internal()->SetMediaChannel(channel_ ? channel_->media_channel()
: nullptr);
}

RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(receivers_.size() * 2);
}

void RtpTransceiver::AddSender(
rtc::scoped_refptr<RtpSenderProxyWithInternal<RtpSenderInternal>> sender) {
RTC_DCHECK_RUN_ON(thread_);
RTC_DCHECK(!stopped_);
RTC_DCHECK(!unified_plan_);
RTC_DCHECK(sender);
Expand All @@ -219,6 +250,7 @@ bool RtpTransceiver::RemoveSender(RtpSenderInterface* sender) {
void RtpTransceiver::AddReceiver(
rtc::scoped_refptr<RtpReceiverProxyWithInternal<RtpReceiverInternal>>
receiver) {
RTC_DCHECK_RUN_ON(thread_);
RTC_DCHECK(!stopped_);
RTC_DCHECK(!unified_plan_);
RTC_DCHECK(receiver);
Expand Down Expand Up @@ -267,7 +299,7 @@ absl::optional<std::string> RtpTransceiver::mid() const {
return mid_;
}

void RtpTransceiver::OnFirstPacketReceived(cricket::ChannelInterface*) {
void RtpTransceiver::OnFirstPacketReceived() {
for (const auto& receiver : receivers_) {
receiver->internal()->NotifyFirstPacketReceived();
}
Expand Down Expand Up @@ -304,6 +336,7 @@ void RtpTransceiver::set_fired_direction(RtpTransceiverDirection direction) {
}

bool RtpTransceiver::stopped() const {
RTC_DCHECK_RUN_ON(thread_);
return stopped_;
}

Expand Down
11 changes: 7 additions & 4 deletions pc/rtp_transceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "pc/rtp_receiver.h"
#include "pc/rtp_sender.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread_annotations.h"

Expand Down Expand Up @@ -78,7 +79,8 @@ class RtpTransceiver final
// channel set.
// |media_type| specifies the type of RtpTransceiver (and, by transitivity,
// the type of senders, receivers, and channel). Can either by audio or video.
explicit RtpTransceiver(cricket::MediaType media_type);
RtpTransceiver(cricket::MediaType media_type,
cricket::ChannelManager* channel_manager);
// Construct a Unified Plan-style RtpTransceiver with the given sender and
// receiver. The media type will be derived from the media types of the sender
// and receiver. The sender and receiver should have the same media type.
Expand Down Expand Up @@ -232,20 +234,21 @@ class RtpTransceiver final
header_extensions_to_offer) override;

private:
void OnFirstPacketReceived(cricket::ChannelInterface* channel);
void OnFirstPacketReceived();
void StopSendingAndReceiving();

// Enforce that this object is created, used and destroyed on one thread.
const TaskQueueBase* thread_;
TaskQueueBase* const thread_;
const bool unified_plan_;
const cricket::MediaType media_type_;
rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_thread_safety_;
std::vector<rtc::scoped_refptr<RtpSenderProxyWithInternal<RtpSenderInternal>>>
senders_;
std::vector<
rtc::scoped_refptr<RtpReceiverProxyWithInternal<RtpReceiverInternal>>>
receivers_;

bool stopped_ = false;
bool stopped_ RTC_GUARDED_BY(thread_) = false;
bool stopping_ RTC_GUARDED_BY(thread_) = false;
bool is_pc_closed_ = false;
RtpTransceiverDirection direction_ = RtpTransceiverDirection::kInactive;
Expand Down
Loading

0 comments on commit 99c8a80

Please sign in to comment.