Skip to content

Commit

Permalink
Implement proper SCTP data channel closing procedure.
Browse files Browse the repository at this point in the history
The proper closing procedure is:
1. Alice resets outgoing stream.
2. Bob receives incoming stream reset, resets his outgoing stream.
3. Alice receives incoming stream reset; channel closed!
4. Bob receives acknowledgement of reset; channel closed!

https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.7

However, up until now we've been sending both an incoming and outgoing reset
from the side initiating the closing procedure, and doing nothing on the remote
side.

This means that if you call "Close" and the remote endpoint is using an old
version of WebRTC, the channel's state will be stuck at "closing" since the
remote endpoint won't send a reset. Which is already what happens when Firefox
is talking to Chrome.

This CL also fixes an issue where the DataChannel's state prematurely went to
"closed" before the closing procedure was complete. Which could result in a
new DataChannel attempting to re-use the ID and failing.

[email protected]

Bug: chromium:449934, webrtc:4453
Change-Id: Ic1ba813e46538c6c65868961aae6a9780d68a5e2
Reviewed-on: https://webrtc-review.googlesource.com/79061
Reviewed-by: Taylor Brandstetter <[email protected]>
Reviewed-by: Steve Anton <[email protected]>
Commit-Queue: Taylor Brandstetter <[email protected]>
Cr-Commit-Position: refs/heads/master@{#23478}
  • Loading branch information
Taylor Brandstetter authored and Commit Bot committed May 31, 2018
1 parent 20e8cfb commit cdd05f0
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 329 deletions.
288 changes: 121 additions & 167 deletions media/sctp/sctptransport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum PreservedErrno {
#include <stdarg.h>
#include <stdio.h>

#include <algorithm>
#include <memory>
#include <sstream>

Expand Down Expand Up @@ -73,65 +74,6 @@ enum PayloadProtocolIdentifier {
PPID_TEXT_LAST = 51
};

typedef std::set<uint32_t> StreamSet;

// Returns a comma-separated, human-readable list of the stream IDs in 's'
std::string ListStreams(const StreamSet& s) {
std::stringstream result;
bool first = true;
for (StreamSet::const_iterator it = s.begin(); it != s.end(); ++it) {
if (!first) {
result << ", " << *it;
} else {
result << *it;
first = false;
}
}
return result.str();
}

// Returns a pipe-separated, human-readable list of the SCTP_STREAM_RESET
// flags in 'flags'
std::string ListFlags(int flags) {
std::stringstream result;
bool first = true;
// Skip past the first 12 chars (strlen("SCTP_STREAM_"))
#define MAKEFLAG(X) \
{ X, #X + 12 }
struct flaginfo_t {
int value;
const char* name;
} flaginfo[] = {MAKEFLAG(SCTP_STREAM_RESET_INCOMING_SSN),
MAKEFLAG(SCTP_STREAM_RESET_OUTGOING_SSN),
MAKEFLAG(SCTP_STREAM_RESET_DENIED),
MAKEFLAG(SCTP_STREAM_RESET_FAILED),
MAKEFLAG(SCTP_STREAM_CHANGE_DENIED)};
#undef MAKEFLAG
for (uint32_t i = 0; i < arraysize(flaginfo); ++i) {
if (flags & flaginfo[i].value) {
if (!first)
result << " | ";
result << flaginfo[i].name;
first = false;
}
}
return result.str();
}

// Returns a comma-separated, human-readable list of the integers in 'array'.
// All 'num_elems' of them.
std::string ListArray(const uint16_t* array, int num_elems) {
std::stringstream result;
for (int i = 0; i < num_elems; ++i) {
if (i) {
result << ", " << array[i];
} else {
result << array[i];
}
}
return result.str();
}

// Helper for logging SCTP messages.
#if defined(__GNUC__)
__attribute__((__format__(__printf__, 1, 2)))
Expand Down Expand Up @@ -472,43 +414,40 @@ bool SctpTransport::OpenStream(int sid) {
<< "Not adding data stream "
<< "with sid=" << sid << " because sid is too high.";
return false;
} else if (open_streams_.find(sid) != open_streams_.end()) {
}
auto it = stream_status_by_sid_.find(sid);
if (it == stream_status_by_sid_.end()) {
stream_status_by_sid_[sid] = StreamStatus();
return true;
}
if (it->second.is_open()) {
RTC_LOG(LS_WARNING) << debug_name_ << "->OpenStream(...): "
<< "Not adding data stream "
<< "with sid=" << sid
<< " because stream is already open.";
return false;
} else if (queued_reset_streams_.find(sid) != queued_reset_streams_.end() ||
sent_reset_streams_.find(sid) != sent_reset_streams_.end()) {
} else {
RTC_LOG(LS_WARNING) << debug_name_ << "->OpenStream(...): "
<< "Not adding data stream "
<< " with sid=" << sid
<< " because stream is still closing.";
return false;
}

open_streams_.insert(sid);
return true;
}

bool SctpTransport::ResetStream(int sid) {
RTC_DCHECK_RUN_ON(network_thread_);
StreamSet::iterator found = open_streams_.find(sid);
if (found == open_streams_.end()) {
RTC_LOG(LS_WARNING) << debug_name_ << "->ResetStream(" << sid << "): "
<< "stream not found.";

auto it = stream_status_by_sid_.find(sid);
if (it == stream_status_by_sid_.end() || !it->second.is_open()) {
RTC_LOG(LS_WARNING) << debug_name_ << "->ResetStream(" << sid
<< "): stream not open.";
return false;
} else {
RTC_LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << sid << "): "
<< "Removing and queuing RE-CONFIG chunk.";
open_streams_.erase(found);
}

// SCTP won't let you have more than one stream reset pending at a time, but
// you can close multiple streams in a single reset. So, we keep an internal
// queue of streams-to-reset, and send them as one reset message in
// SendQueuedStreamResets().
queued_reset_streams_.insert(sid);
RTC_LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << sid << "): "
<< "Queuing RE-CONFIG chunk.";
it->second.closure_initiated = true;

// Signal our stream-reset logic that it should try to send now, if it can.
SendQueuedStreamResets();
Expand All @@ -534,12 +473,15 @@ bool SctpTransport::SendData(const SendDataParams& params,
return false;
}

if (params.type != DMT_CONTROL &&
open_streams_.find(params.sid) == open_streams_.end()) {
RTC_LOG(LS_WARNING) << debug_name_ << "->SendData(...): "
<< "Not sending data because sid is unknown: "
<< params.sid;
return false;
if (params.type != DMT_CONTROL) {
auto it = stream_status_by_sid_.find(params.sid);
if (it == stream_status_by_sid_.end() || !it->second.is_open()) {
RTC_LOG(LS_WARNING)
<< debug_name_ << "->SendData(...): "
<< "Not sending data because sid is unknown or closing: "
<< params.sid;
return false;
}
}

// Send data using SCTP.
Expand Down Expand Up @@ -790,46 +732,63 @@ void SctpTransport::CloseSctpSocket() {

bool SctpTransport::SendQueuedStreamResets() {
RTC_DCHECK_RUN_ON(network_thread_);
if (!sent_reset_streams_.empty() || queued_reset_streams_.empty()) {

// Figure out how many streams need to be reset. We need to do this so we can
// allocate the right amount of memory for the sctp_reset_streams structure.
size_t num_streams = std::count_if(
stream_status_by_sid_.begin(), stream_status_by_sid_.end(),
[](const std::map<uint32_t, StreamStatus>::value_type& stream) {
return stream.second.need_outgoing_reset();
});
if (num_streams == 0) {
// Nothing to reset.
return true;
}

RTC_LOG(LS_VERBOSE) << "SendQueuedStreamResets[" << debug_name_
<< "]: Sending [" << ListStreams(queued_reset_streams_)
<< "], Open: [" << ListStreams(open_streams_)
<< "], Sent: [" << ListStreams(sent_reset_streams_)
<< "]";
<< "]: Resetting " << num_streams << " outgoing streams.";

const size_t num_streams = queued_reset_streams_.size();
const size_t num_bytes =
sizeof(struct sctp_reset_streams) + (num_streams * sizeof(uint16_t));

std::vector<uint8_t> reset_stream_buf(num_bytes, 0);
struct sctp_reset_streams* resetp =
reinterpret_cast<sctp_reset_streams*>(&reset_stream_buf[0]);
resetp->srs_assoc_id = SCTP_ALL_ASSOC;
resetp->srs_flags = SCTP_STREAM_RESET_INCOMING | SCTP_STREAM_RESET_OUTGOING;
resetp->srs_flags = SCTP_STREAM_RESET_OUTGOING;
resetp->srs_number_streams = rtc::checked_cast<uint16_t>(num_streams);
int result_idx = 0;
for (StreamSet::iterator it = queued_reset_streams_.begin();
it != queued_reset_streams_.end(); ++it) {
resetp->srs_stream_list[result_idx++] = *it;

for (const std::map<uint32_t, StreamStatus>::value_type& stream :
stream_status_by_sid_) {
if (!stream.second.need_outgoing_reset()) {
continue;
}
resetp->srs_stream_list[result_idx++] = stream.first;
}

int ret =
usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_RESET_STREAMS, resetp,
rtc::checked_cast<socklen_t>(reset_stream_buf.size()));
if (ret < 0) {
RTC_LOG_ERRNO(LS_ERROR) << debug_name_
<< "->SendQueuedStreamResets(): "
"Failed to send a stream reset for "
<< num_streams << " streams";
// Note that usrsctp only lets us have one reset in progress at a time
// (even though multiple streams can be reset at once). If this happens,
// SendQueuedStreamResets will end up called after the current in-progress
// reset finishes, in OnStreamResetEvent.
RTC_LOG_ERRNO(LS_WARNING) << debug_name_
<< "->SendQueuedStreamResets(): "
"Failed to send a stream reset for "
<< num_streams << " streams";
return false;
}

// sent_reset_streams_ is empty, and all the queued_reset_streams_ go into
// it now.
queued_reset_streams_.swap(sent_reset_streams_);
// Since the usrsctp call completed successfully, update our stream status
// map to note that we started the outgoing reset.
for (auto it = stream_status_by_sid_.begin();
it != stream_status_by_sid_.end(); ++it) {
if (it->second.need_outgoing_reset()) {
it->second.outgoing_reset_initiated = true;
}
}
return true;
}

Expand Down Expand Up @@ -1049,78 +1008,73 @@ void SctpTransport::OnNotificationAssocChange(const sctp_assoc_change& change) {
void SctpTransport::OnStreamResetEvent(
const struct sctp_stream_reset_event* evt) {
RTC_DCHECK_RUN_ON(network_thread_);
// A stream reset always involves two RE-CONFIG chunks for us -- we always
// simultaneously reset a sid's sequence number in both directions. The
// requesting side transmits a RE-CONFIG chunk and waits for the peer to send
// one back. Both sides get this SCTP_STREAM_RESET_EVENT when they receive
// RE-CONFIGs.

// This callback indicates that a reset is complete for incoming and/or
// outgoing streams. The reset may have been initiated by us or the remote
// side.
const int num_sids = (evt->strreset_length - sizeof(*evt)) /
sizeof(evt->strreset_stream_list[0]);
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): Flags = 0x" << rtc::ToHex(evt->strreset_flags)
<< " (" << ListFlags(evt->strreset_flags) << ")";
RTC_LOG(LS_VERBOSE) << "Assoc = " << evt->strreset_assoc_id << ", Streams = ["
<< ListArray(evt->strreset_stream_list, num_sids)
<< "], Open: [" << ListStreams(open_streams_)
<< "], Q'd: [" << ListStreams(queued_reset_streams_)
<< "], Sent: [" << ListStreams(sent_reset_streams_)
<< "]";

// If both sides try to reset some streams at the same time (even if they're
// disjoint sets), we can get reset failures.

if (evt->strreset_flags & SCTP_STREAM_RESET_FAILED) {
// OK, just try again. The stream IDs sent over when the RESET_FAILED flag
// is set seem to be garbage values. Ignore them.
queued_reset_streams_.insert(sent_reset_streams_.begin(),
sent_reset_streams_.end());
sent_reset_streams_.clear();

} else if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
// Each side gets an event for each direction of a stream. That is,
// closing sid k will make each side receive INCOMING and OUTGOING reset
// events for k. As per RFC6525, Section 5, paragraph 2, each side will
// get an INCOMING event first.
for (int i = 0; i < num_sids; i++) {
const int stream_id = evt->strreset_stream_list[i];

// See if this stream ID was closed by our peer or ourselves.
StreamSet::iterator it = sent_reset_streams_.find(stream_id);

// The reset was requested locally.
if (it != sent_reset_streams_.end()) {
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): local sid " << stream_id << " acknowledged.";
sent_reset_streams_.erase(it);

} else if ((it = open_streams_.find(stream_id)) != open_streams_.end()) {
// The peer requested the reset.
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): closing sid " << stream_id;
open_streams_.erase(it);
SignalStreamClosedRemotely(stream_id);

} else if ((it = queued_reset_streams_.find(stream_id)) !=
queued_reset_streams_.end()) {
// The peer requested the reset, but there was a local reset
// queued.
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): double-sided close for sid " << stream_id;
// Both sides want the stream closed, and the peer got to send the
// RE-CONFIG first. Treat it like the local Remove(Send|Recv)Stream
// finished quickly.
queued_reset_streams_.erase(it);

} else {
// This stream is unknown. Sometimes this can be from an
// RESET_FAILED-related retransmit.
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): Unknown sid " << stream_id;
// OK, just try sending any previously sent stream resets again. The stream
// IDs sent over when the RESET_FIALED flag is set seem to be garbage
// values. Ignore them.
for (std::map<uint32_t, StreamStatus>::value_type& stream :
stream_status_by_sid_) {
stream.second.outgoing_reset_initiated = false;
}
SendQueuedStreamResets();
// TODO(deadbeef): If this happens, the entire SCTP association is in quite
// crippled state. The SCTP session should be dismantled, and the WebRTC
// connectivity errored because is clear that the distant party is not
// playing ball: malforms the transported data.
return;
}

// Loop over the received events and properly update the StreamStatus map.
for (int i = 0; i < num_sids; i++) {
const uint32_t sid = evt->strreset_stream_list[i];
auto it = stream_status_by_sid_.find(sid);
if (it == stream_status_by_sid_.end()) {
// This stream is unknown. Sometimes this can be from a
// RESET_FAILED-related retransmit.
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): Unknown sid " << sid;
continue;
}
StreamStatus& status = it->second;

if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_INCOMING_SSN(" << debug_name_
<< "): sid " << sid;
status.incoming_reset_complete = true;
// If we receive an incoming stream reset and we haven't started the
// closing procedure ourselves, this means the remote side started the
// closing procedure; fire a signal so that the relevant data channel
// can change to "closing" (we still need to reset the outgoing stream
// before it changes to "closed").
if (!status.closure_initiated) {
SignalClosingProcedureStartedRemotely(sid);
}
}
if (evt->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_OUTGOING_SSN(" << debug_name_
<< "): sid " << sid;
status.outgoing_reset_complete = true;
}

// If this reset completes the closing procedure, remove the stream from
// our map so we can consider it closed, and fire a signal such that the
// relevant DataChannel will change its state to "closed" and its ID can be
// re-used.
if (status.reset_complete()) {
stream_status_by_sid_.erase(it);
SignalClosingProcedureComplete(sid);
}
}

// Always try to send the queued RESET because this call indicates that the
// last local RESET or remote RESET has made some progress.
// Always try to send any queued resets because this call indicates that the
// last outgoing or incoming reset has made some progress.
SendQueuedStreamResets();
}

Expand Down
Loading

0 comments on commit cdd05f0

Please sign in to comment.