Skip to content

Commit

Permalink
Implement Unified Plan (lynckia#1602)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored May 27, 2021
1 parent da4c767 commit 279d4d8
Show file tree
Hide file tree
Showing 86 changed files with 8,467 additions and 3,098 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
build/
licode_config.js
bw_distributor_config.js
extras/basic_example/node_modules
extras/basic_example/public/erizo.js
extras/basic_example/public/erizo.js.map
Expand Down
31 changes: 28 additions & 3 deletions erizo/src/erizo/MediaDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ struct DataPacket {

DataPacket(int comp_, const char *data_, int length_, packetType type_, uint64_t received_time_ms_) :
comp{comp_}, length{length_}, type{type_}, priority{HIGH_PRIORITY}, received_time_ms{received_time_ms_},
is_keyframe{false}, ending_of_layer_frame{false}, picture_id{-1}, tl0_pic_idx{-1}, is_padding{false} {
is_keyframe{false}, ending_of_layer_frame{false}, picture_id{-1}, tl0_pic_idx{-1},
rid{"0"}, is_padding{false} {
memcpy(data, data_, length_);
}

DataPacket(int comp_, const char *data_, int length_, packetType type_) :
comp{comp_}, length{length_}, type{type_}, priority{HIGH_PRIORITY},
received_time_ms{ClockUtils::timePointToMs(clock::now())}, is_keyframe{false},
ending_of_layer_frame{false}, picture_id{-1}, tl0_pic_idx{-1}, is_padding{false} {
ending_of_layer_frame{false}, picture_id{-1}, tl0_pic_idx{-1}, rid{"0"}, is_padding{false} {
memcpy(data, data_, length_);
}

DataPacket(int comp_, const unsigned char *data_, int length_) :
comp{comp_}, length{length_}, type{VIDEO_PACKET}, priority{HIGH_PRIORITY},
received_time_ms{ClockUtils::timePointToMs(clock::now())}, is_keyframe{false},
ending_of_layer_frame{false}, picture_id{-1}, tl0_pic_idx{-1}, is_padding{false} {
ending_of_layer_frame{false}, picture_id{-1}, tl0_pic_idx{-1}, rid{"0"}, is_padding{false} {
memcpy(data, data_, length_);
}

Expand Down Expand Up @@ -78,6 +79,8 @@ struct DataPacket {
int picture_id;
int tl0_pic_idx;
std::string codec;
std::string mid;
std::string rid;
unsigned int clock_rate = 0;
bool is_padding;
};
Expand Down Expand Up @@ -225,10 +228,32 @@ class MediaSource: public virtual Monitor {
}
video_source_ssrc_list_[0] = ssrc;
}
void addVideoSourceSSRC(uint32_t ssrc) {
boost::mutex::scoped_lock lock(monitor_mutex_);
video_source_ssrc_list_.push_back(ssrc);
}
void setVideoSourceSSRC(uint32_t ssrc, int position) {
boost::mutex::scoped_lock lock(monitor_mutex_);
while (video_source_ssrc_list_.size() - 1 <= (uint)position) {
video_source_ssrc_list_.push_back(0);
}
video_source_ssrc_list_[position] = ssrc;
}
std::vector<uint32_t> getVideoSourceSSRCList() {
boost::mutex::scoped_lock lock(monitor_mutex_);
return video_source_ssrc_list_; // return by copy to avoid concurrent access
}
int getVideoSourceSSRCPositionInList(uint32_t ssrc) {
boost::mutex::scoped_lock lock(monitor_mutex_);
auto found_ssrc = std::find_if(video_source_ssrc_list_.begin(), video_source_ssrc_list_.end(),
[ssrc](uint32_t known_ssrc) {
return known_ssrc == ssrc;
});
if (found_ssrc != video_source_ssrc_list_.end()) {
return std::distance(video_source_ssrc_list_.begin(), found_ssrc);
}
return -1;
}
void setVideoSourceSSRCList(const std::vector<uint32_t>& new_ssrc_list) {
boost::mutex::scoped_lock lock(monitor_mutex_);
video_source_ssrc_list_ = new_ssrc_list;
Expand Down
95 changes: 63 additions & 32 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ MediaStream::MediaStream(std::shared_ptr<Worker> worker,
const std::string& media_stream_id,
const std::string& media_stream_label,
bool is_publisher,
int session_version,
std::string priority = "default",
bool has_audio,
bool has_video,
std::string priority,
std::vector<std::string> handler_order,
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handler_pointer_dic) :
audio_enabled_{false}, video_enabled_{false},
audio_enabled_{has_audio},
video_enabled_{has_video},
media_stream_event_listener_{nullptr},
connection_{std::move(connection)},
stream_id_{media_stream_id},
Expand All @@ -78,8 +80,7 @@ MediaStream::MediaStream(std::shared_ptr<Worker> worker,
random_generator_{random_device_()},
target_padding_bitrate_{0},
periodic_keyframes_requested_{false},
periodic_keyframe_interval_{0},
session_version_{session_version} {
periodic_keyframe_interval_{0} {
if (is_publisher) {
setVideoSinkSSRC(kDefaultVideoSinkSSRC);
setAudioSinkSSRC(kDefaultAudioSinkSSRC);
Expand Down Expand Up @@ -211,7 +212,7 @@ bool MediaStream::isSinkSSRC(uint32_t ssrc) {
return isVideoSinkSSRC(ssrc) || isAudioSinkSSRC(ssrc);
}

bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp, int session_version_negotiated = -1) {
bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp) {
ELOG_DEBUG("%s message: setting remote SDP to Stream, sending: %d, initialized: %d",
toLog(), sending_, pipeline_initialized_);
if (!sending_) {
Expand All @@ -233,11 +234,12 @@ bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp, int session_version
if (!stream_found) {
return true;
}
} else if (!isPublisher() && !ready_) {
// This Stream has not any sender associated yet.
return true;
}

if (!isPublisher() && session_version_negotiated >= 0 && session_version_ > session_version_negotiated) {
ELOG_WARN("%s message: too old session version, session_version_: %d, negotiated_session_version: %d",
toLog(), session_version_, session_version_negotiated);
if (remote_sdp_) {
return true;
}

Expand Down Expand Up @@ -274,9 +276,6 @@ bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp, int session_version
setAudioSourceSSRC(kDefaultAudioSinkSSRC);
}

audio_enabled_ = remote_sdp_->hasAudio;
video_enabled_ = remote_sdp_->hasVideo;

rtcp_processor_->addSourceSsrc(getAudioSourceSSRC());
std::for_each(video_source_ssrc_list_.begin(), video_source_ssrc_list_.end(), [this] (uint32_t new_ssrc){
rtcp_processor_->addSourceSsrc(new_ssrc);
Expand Down Expand Up @@ -501,25 +500,25 @@ int MediaStream::deliverVideoData_(std::shared_ptr<DataPacket> video_packet) {

int MediaStream::deliverFeedback_(std::shared_ptr<DataPacket> fb_packet) {
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(fb_packet->data);
uint32_t recvSSRC = chead->getSourceSSRC();
uint32_t recv_ssrc = chead->getSourceSSRC();
if (chead->isREMB()) {
for (uint8_t index = 0; index < chead->getREMBNumSSRC(); index++) {
uint32_t ssrc = chead->getREMBFeedSSRC(index);
if (isVideoSourceSSRC(ssrc)) {
recvSSRC = ssrc;
recv_ssrc = ssrc;
break;
}
}
}
if (isVideoSourceSSRC(recvSSRC)) {
if (isVideoSourceSSRC(recv_ssrc)) {
fb_packet->type = VIDEO_PACKET;
sendPacketAsync(fb_packet);
} else if (isAudioSourceSSRC(recvSSRC)) {
} else if (isAudioSourceSSRC(recv_ssrc)) {
fb_packet->type = AUDIO_PACKET;
sendPacketAsync(fb_packet);
} else {
ELOG_DEBUG("%s deliverFeedback unknownSSRC: %u, localVideoSSRC: %u, localAudioSSRC: %u",
toLog(), recvSSRC, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
toLog(), recv_ssrc, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
}
return fb_packet->length;
}
Expand Down Expand Up @@ -572,12 +571,44 @@ void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, T
char* buf = packet->data;
RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
if (!chead->isRtcp()) {
uint32_t recvSSRC = head->getSSRC();
if (stream_ptr->isVideoSourceSSRC(recvSSRC)) {
if (!chead->isFeedback()) {
uint32_t recv_ssrc;
if (chead->isRtcp()) {
recv_ssrc = chead->getSSRC();
} else {
recv_ssrc = head->getSSRC();
}

// Mid information is only sent at the beginning of the sessions or when the SSRC changes.
if (stream_ptr->isVideoSourceSSRC(recv_ssrc)) {
packet->type = VIDEO_PACKET;
} else if (stream_ptr->isAudioSourceSSRC(recvSSRC)) {
packet->mid = stream_ptr->video_mid_;
} else if (stream_ptr->isAudioSourceSSRC(recv_ssrc)) {
packet->type = AUDIO_PACKET;
packet->mid = stream_ptr->audio_mid_;
} else if (packet->mid == stream_ptr->audio_mid_) {
packet->type = AUDIO_PACKET;
stream_ptr->setAudioSourceSSRC(recv_ssrc);
}

if (packet->type == VIDEO_PACKET) {
if (packet->rid != "0") {
// We learn SSRCs from the packets that are received as they are not
// sent through the SDP anymore with Simulcast.
if (!stream_ptr->isVideoSourceSSRC(recv_ssrc) && packet->mid == stream_ptr->video_mid_) {
// We must preserve the order set by RID
stream_ptr->setVideoSourceSSRC(recv_ssrc, stoi(packet->rid) - 1);
}
} else {
// RID = 0 is used when no RID information is sent in the packet.
int position = stream_ptr->getVideoSourceSSRCPositionInList(recv_ssrc) + 1;
if (position > 0) {
packet->rid = std::to_string(position);
} else {
ELOG_WARN("%s message: No SSRC and no RID found for video packet", stream_ptr->toLog());
return;
}
}
}
}

Expand All @@ -593,13 +624,13 @@ void MediaStream::read(std::shared_ptr<DataPacket> packet) {
// PROCESS RTCP
RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
uint32_t recvSSRC = 0;
uint32_t recv_ssrc = 0;
auto video_sink = video_sink_.lock();
auto audio_sink = audio_sink_.lock();
if (!chead->isRtcp()) {
recvSSRC = head->getSSRC();
recv_ssrc = head->getSSRC();
} else if (chead->packettype == RTCP_Sender_PT || chead->packettype == RTCP_SDES_PT) { // Sender Report
recvSSRC = chead->getSSRC();
recv_ssrc = chead->getSSRC();
}
// DELIVER FEEDBACK (RR, FEEDBACK PACKETS)
if (chead->isFeedback()) {
Expand All @@ -612,35 +643,35 @@ void MediaStream::read(std::shared_ptr<DataPacket> packet) {
if (bundle_) {
// Check incoming SSRC
// Deliver data
if (isVideoSourceSSRC(recvSSRC) && video_sink) {
if (isVideoSourceSSRC(recv_ssrc) && video_sink) {
parseIncomingPayloadType(buf, len, VIDEO_PACKET);
parseIncomingExtensionId(buf, len, VIDEO_PACKET);
video_sink->deliverVideoData(std::move(packet));
} else if (isAudioSourceSSRC(recvSSRC) && audio_sink) {
} else if (isAudioSourceSSRC(recv_ssrc) && audio_sink) {
parseIncomingPayloadType(buf, len, AUDIO_PACKET);
parseIncomingExtensionId(buf, len, AUDIO_PACKET);
audio_sink->deliverAudioData(std::move(packet));
} else {
ELOG_DEBUG("%s read video unknownSSRC: %u, localVideoSSRC: %u, localAudioSSRC: %u",
toLog(), recvSSRC, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
toLog(), recv_ssrc, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
}
} else {
if (packet->type == AUDIO_PACKET && audio_sink) {
parseIncomingPayloadType(buf, len, AUDIO_PACKET);
parseIncomingExtensionId(buf, len, AUDIO_PACKET);
// Firefox does not send SSRC in SDP
if (getAudioSourceSSRC() == 0) {
ELOG_DEBUG("%s discoveredAudioSourceSSRC:%u", toLog(), recvSSRC);
setAudioSourceSSRC(recvSSRC);
ELOG_DEBUG("%s discoveredAudioSourceSSRC:%u", toLog(), recv_ssrc);
setAudioSourceSSRC(recv_ssrc);
}
audio_sink->deliverAudioData(std::move(packet));
} else if (packet->type == VIDEO_PACKET && video_sink) {
parseIncomingPayloadType(buf, len, VIDEO_PACKET);
parseIncomingExtensionId(buf, len, VIDEO_PACKET);
// Firefox does not send SSRC in SDP
if (getVideoSourceSSRC() == 0) {
ELOG_DEBUG("%s discoveredVideoSourceSSRC:%u", toLog(), recvSSRC);
setVideoSourceSSRC(recvSSRC);
ELOG_DEBUG("%s discoveredVideoSourceSSRC:%u", toLog(), recv_ssrc);
setVideoSourceSSRC(recv_ssrc);
}
// change ssrc for RTP packets, don't touch here if RTCP
video_sink->deliverVideoData(std::move(packet));
Expand Down Expand Up @@ -894,7 +925,7 @@ void MediaStream::changeDeliverExtensionId(DataPacket *dp, packetType type) {
void MediaStream::changeDeliverPayloadType(DataPacket *dp, packetType type) {
RtpHeader* h = reinterpret_cast<RtpHeader*>(dp->data);
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(dp->data);
if (!chead->isRtcp()) {
if (!chead->isRtcp() && remote_sdp_) {
int internalPT = h->getPayloadType();
int externalPT = internalPT;
if (type == AUDIO_PACKET) {
Expand Down
17 changes: 12 additions & 5 deletions erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,14 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

public:
typedef typename Handler::Context Context;
bool audio_enabled_;
bool video_enabled_;

/**
* Constructor.
* Constructs an empty MediaStream without any configuration.
*/
MediaStream(std::shared_ptr<Worker> worker, std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id, const std::string& media_stream_label,
bool is_publisher, int session_version, const std::string priority,
bool is_publisher, bool has_audio, bool has_video, const std::string priority = "default",
std::vector<std::string> handler_order = {},
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handler_pointer_dic = {});
/**
Expand All @@ -94,7 +92,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
void setVideoBitrate(uint32_t bitrate) { video_bitrate_ = bitrate; }
void setMaxVideoBW(uint32_t max_video_bw);
void syncClose();
bool setRemoteSdp(std::shared_ptr<SdpInfo> sdp, int session_version_negotiated);
bool setRemoteSdp(std::shared_ptr<SdpInfo> sdp);

/**
* Sends a PLI Packet
Expand Down Expand Up @@ -190,6 +188,8 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
Pipeline::Ptr getPipeline() { return pipeline_; }
bool isPublisher() { return is_publisher_; }
void setBitrateFromMaxQualityLayer(uint64_t bitrate) { bitrate_from_max_quality_layer_ = bitrate; }
bool hasAudio() { return audio_enabled_; }
bool hasVideo() { return video_enabled_; }
void setBitrateForLayer(int temporal_layer, int spatial_layer, uint64_t bitrate);
uint64_t getBitrateForLayer(int spatial_layer, int temporal_layer);
uint64_t getBitrateForHigherTemporalInSpatialLayer(int spatial_layer);
Expand All @@ -201,6 +201,10 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

virtual PublisherInfo getPublisherInfo() { return publisher_info_; }

void setVideoMid(std::string mid) { video_mid_ = mid; }
std::string getVideoMid() { return video_mid_; }
void setAudioMid(std::string mid) { audio_mid_ = mid; }
std::string getAudioMid() { return audio_mid_; }
private:
void sendPacket(std::shared_ptr<DataPacket> packet);
int deliverAudioData_(std::shared_ptr<DataPacket> audio_packet) override;
Expand All @@ -218,6 +222,8 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

private:
boost::mutex event_listener_mutex_;
bool audio_enabled_;
bool video_enabled_;
boost::mutex layer_bitrates_mutex_;
boost::mutex priority_mutex_;
MediaStreamEventListener* media_stream_event_listener_;
Expand Down Expand Up @@ -272,8 +278,9 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
uint64_t target_padding_bitrate_;
bool periodic_keyframes_requested_;
uint32_t periodic_keyframe_interval_;
int session_version_;
PublisherInfo publisher_info_;
std::string audio_mid_;
std::string video_mid_;

protected:
std::shared_ptr<SdpInfo> remote_sdp_;
Expand Down
6 changes: 4 additions & 2 deletions erizo/src/erizo/OneToManyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ namespace erizo {
}
std::map<std::string, std::shared_ptr<MediaSink>>::iterator it;
RtpHeader* rhead = reinterpret_cast<RtpHeader*>(video_packet->data);
uint32_t ssrc = head->isRtcp() ? head->getSSRC() : rhead->getSSRC();
uint32_t ssrc_offset = translateAndMaybeAdaptForSimulcast(ssrc);
// We create a controlled offset to keep having multiple SSRCs in the
// subscribers.
uint32_t ssrc_offset = std::max(stoi(video_packet->rid) - 1, 0);

for (it = subscribers_.begin(); it != subscribers_.end(); ++it) {
if ((*it).second != nullptr) {
uint32_t base_ssrc = (*it).second->getVideoSinkSSRC();
Expand Down
Loading

0 comments on commit 279d4d8

Please sign in to comment.