Skip to content

Commit

Permalink
Support protocol 2, DataPackets
Browse files Browse the repository at this point in the history
  • Loading branch information
davidzhao committed Apr 28, 2021
1 parent 62cdcdd commit 0e8dabf
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ constructor(
}
}

fun handleSignalResponse(response: LivekitRtc.SignalResponse) {
private fun handleSignalResponse(response: LivekitRtc.SignalResponse) {
if (!isConnected) {
// Only handle joins if not connected.
if (response.hasJoin()) {
Expand Down Expand Up @@ -307,7 +307,7 @@ constructor(
const val SD_TYPE_ANSWER = "answer"
const val SD_TYPE_OFFER = "offer"
const val SD_TYPE_PRANSWER = "pranswer"
const val PROTOCOL_VERSION = 1;
const val PROTOCOL_VERSION = 2;

private fun iceServer(url: String) =
PeerConnection.IceServer.builder(url).createIceServer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ constructor(
val client: RTCClient,
private val pctFactory: PeerConnectionTransport.Factory,
@Named(InjectionNames.DISPATCHER_IO) ioDispatcher: CoroutineDispatcher,
) : RTCClient.Listener {
) : RTCClient.Listener, DataChannel.Observer {

var listener: Listener? = null
var rtcConnected: Boolean = false
Expand All @@ -41,7 +41,8 @@ constructor(
private val subscriberObserver = SubscriberTransportObserver(this)
internal lateinit var publisher: PeerConnectionTransport
private lateinit var subscriber: PeerConnectionTransport
private lateinit var privateDataChannel: DataChannel
internal var reliableDataChannel: DataChannel? = null
internal var lossyDataChannel: DataChannel? = null

private val coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher)
init {
Expand Down Expand Up @@ -115,10 +116,13 @@ constructor(
fun onUpdateSpeakers(speakers: List<LivekitRtc.SpeakerInfo>)
fun onDisconnect(reason: String)
fun onFailToConnect(error: Exception)
fun onUserPacket(packet: LivekitRtc.UserPacket, kind: LivekitRtc.DataPacket.Kind)
}

companion object {
private const val PRIVATE_DATA_CHANNEL_LABEL = "_private"
private const val RELIABLE_DATA_CHANNEL_LABEL = "_reliable"
private const val LOSSY_DATA_CHANNEL_LABEL = "_lossy"
internal const val MAX_DATA_PACKET_SIZE = 15000

private val OFFER_CONSTRAINTS = MediaConstraints().apply {
with(mandatory) {
Expand All @@ -136,6 +140,8 @@ constructor(
}
}

//---------------------------------- RTCClient.Listener --------------------------------------//

override fun onJoin(info: LivekitRtc.JoinResponse) {
val iceServers = mutableListOf<PeerConnection.IceServer>()
for(serverInfo in info.iceServersList){
Expand Down Expand Up @@ -170,10 +176,21 @@ constructor(
publisher = pctFactory.create(rtcConfig, publisherObserver)
subscriber = pctFactory.create(rtcConfig, subscriberObserver)

privateDataChannel = publisher.peerConnection.createDataChannel(
PRIVATE_DATA_CHANNEL_LABEL,
DataChannel.Init()
val reliableInit = DataChannel.Init()
reliableInit.ordered = true
reliableDataChannel = publisher.peerConnection.createDataChannel(
RELIABLE_DATA_CHANNEL_LABEL,
reliableInit
)
reliableDataChannel!!.registerObserver(this)
val lossyInit = DataChannel.Init()
lossyInit.ordered = true
lossyInit.maxRetransmits = 1
lossyDataChannel = publisher.peerConnection.createDataChannel(
LOSSY_DATA_CHANNEL_LABEL,
lossyInit
)

coroutineScope.launch {
val sdpOffer =
when (val outcome = publisher.peerConnection.createOffer(OFFER_CONSTRAINTS)) {
Expand Down Expand Up @@ -302,4 +319,31 @@ constructor(
override fun onError(error: Exception) {
listener?.onFailToConnect(error)
}

//--------------------------------- DataChannel.Observer ------------------------------------//

override fun onBufferedAmountChange(previousAmount: Long) {
}

override fun onStateChange() {
}

override fun onMessage(buffer: DataChannel.Buffer?) {
if (buffer == null) {
return
}
val dp = LivekitRtc.DataPacket.parseFrom(buffer.data)
when (dp.valueCase) {
LivekitRtc.DataPacket.ValueCase.SPEAKER -> {
listener?.onUpdateSpeakers(dp.speaker.speakersList)
}
LivekitRtc.DataPacket.ValueCase.USER -> {
listener?.onUserPacket(dp.user, dp.kind)
}
LivekitRtc.DataPacket.ValueCase.VALUE_NOT_SET,
null -> {
Timber.v { "invalid value for data packet" }
}
}
}
}
26 changes: 13 additions & 13 deletions livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,17 @@ constructor(
handleSpeakerUpdate(speakers)
}

/**
* @suppress
*/
override fun onUserPacket(packet: LivekitRtc.UserPacket, kind: LivekitRtc.DataPacket.Kind) {
val participant = remoteParticipants[packet.participantSid] ?: return
val data = packet.payload.toByteArray()

listener?.onDataReceived(data, participant, this)
participant.listener?.onDataReceived(data, participant)
}

/**
* @suppress
*/
Expand Down Expand Up @@ -331,17 +342,6 @@ constructor(
listener?.onTrackUnsubscribed(track, publication, participant, this)
}

/**
* @suppress
*/
override fun onDataReceived(
data: ByteBuffer,
dataTrack: DataTrack,
participant: RemoteParticipant
) {
listener?.onDataReceived(data, dataTrack, participant, this)
}

/**
* @suppress
* // TODO(@dl): can this be moved out of Room/SDK?
Expand Down Expand Up @@ -440,9 +440,9 @@ interface RoomListener {
fun onTrackUnsubscribed(track: Track, publications: TrackPublication, participant: RemoteParticipant, room: Room) {}

/**
* Message received over a [DataTrack]
* Received data published by another participant
*/
fun onDataReceived(data: ByteBuffer, dataTrack: DataTrack, participant: RemoteParticipant, room: Room) {}
fun onDataReceived(data: ByteArray, participant: RemoteParticipant, room: Room) {}
}

sealed class RoomException(message: String? = null, cause: Throwable? = null) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package io.livekit.android.room.participant

import android.content.Context
import com.github.ajalt.timberkt.Timber
import com.google.protobuf.ByteString
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import io.livekit.android.room.RTCEngine
import io.livekit.android.room.track.*
import livekit.LivekitModels
import livekit.LivekitRtc
import org.webrtc.*
import java.nio.ByteBuffer

class LocalParticipant
@AssistedInject
Expand Down Expand Up @@ -107,39 +110,6 @@ internal constructor(
publishListener?.onPublishSuccess(publication)
}

suspend fun publishDataTrack(
track: LocalDataTrack,
publishListener: PublishListener? = null
) {
if (localTrackPublications.any { it.track == track }) {
publishListener?.onPublishFailure(TrackException.PublishException("Track has already been published"))
return
}

// data track cid isn't ready until peer connection creates it, so we'll use name
val cid = track.name
val trackInfo =
engine.addTrack(cid = cid, name = track.name, track.kind)
val publication = LocalTrackPublication(trackInfo, track, this)

val config = DataChannel.Init().apply {
ordered = track.options.ordered
maxRetransmitTimeMs = track.options.maxRetransmitTimeMs
maxRetransmits = track.options.maxRetransmits
}

val dataChannel = engine.publisher.peerConnection.createDataChannel(track.name, config)
if (dataChannel == null) {
publishListener?.onPublishFailure(TrackException.PublishException("could not create data channel"))
return
}
track.dataChannel = dataChannel
track.updateConfig(config)
addTrackPublication(publication)

publishListener?.onPublishSuccess(publication)
}

fun unpublishTrack(track: Track) {
val publication = localTrackPublications.firstOrNull { it.track == track }
if (publication === null) {
Expand All @@ -156,9 +126,47 @@ internal constructor(
LivekitModels.TrackType.AUDIO -> audioTracks.remove(sid)
LivekitModels.TrackType.VIDEO -> videoTracks.remove(sid)
LivekitModels.TrackType.DATA -> dataTracks.remove(sid)
else -> {}
}
}

/**
* Publish a new data payload to the room. Data will be forwarded to each participant in the room.
* Each payload must not exceed 15k in size
*
* @param data payload to send
* @param reliability for delivery guarantee, use RELIABLE. for fastest delivery without guarantee, use LOSSY
*/
fun publishData(data: ByteArray, reliability: DataPublishReliability) {
if (data.size > RTCEngine.MAX_DATA_PACKET_SIZE) {
throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE)
}

val kind = when (reliability) {
DataPublishReliability.RELIABLE -> LivekitRtc.DataPacket.Kind.RELIABLE
DataPublishReliability.LOSSY -> LivekitRtc.DataPacket.Kind.LOSSY
}
val channel = when (reliability) {
DataPublishReliability.RELIABLE -> engine.reliableDataChannel
DataPublishReliability.LOSSY -> engine.lossyDataChannel
} ?: throw TrackException.PublishException("data channel not established")

val userPacket = LivekitRtc.UserPacket.newBuilder().
setPayload(ByteString.copyFrom(data)).
setParticipantSid(sid).
build()
val dataPacket = LivekitRtc.DataPacket.newBuilder().
setUser(userPacket).
setKind(kind).
build()
val buf = DataChannel.Buffer(
ByteBuffer.wrap(dataPacket.toByteArray()),
true,
)

channel.send(buf)
}

override fun updateFromInfo(info: LivekitModels.ParticipantInfo) {
super.updateFromInfo(info)

Expand All @@ -175,11 +183,11 @@ internal constructor(
track: T,
sid: String
) where T : MediaTrack {
val senders = engine.publisher?.peerConnection?.senders ?: return
val senders = engine.publisher.peerConnection.senders ?: return
for (sender in senders) {
val t = sender.track() ?: continue
if (t == track.rtcTrack) {
engine.publisher?.peerConnection?.removeTrack(sender)
engine.publisher.peerConnection.removeTrack(sender)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,7 @@ interface ParticipantListener {
}

/**
* Data was received on a data track
* Received data published by another participant
*/
fun onDataReceived(
data: ByteBuffer,
dataTrack: DataTrack,
participant: RemoteParticipant
) {
}
fun onDataReceived(data: ByteArray, participant: RemoteParticipant) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ class RemoteParticipant(
}

override fun onMessage(buffer: DataChannel.Buffer) {
internalListener?.onDataReceived(buffer.data, track, this@RemoteParticipant)
listener?.onDataReceived(buffer.data, track, this@RemoteParticipant)

}
})
internalListener?.onTrackSubscribed(track, publication, participant = this)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
package io.livekit.android.room.track

data class LocalTrackPublicationOptions(val placeholder: Unit)

enum class DataPublishReliability {
RELIABLE,
LOSSY,
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,12 @@ open class Track(
internal set
var kind = kind
internal set
var state: State = State.NONE
var sid: String? = null
internal set

enum class State {
ENDED, LIVE, NONE;
}

open fun stop() {
// subclasses override to provide stop behavior
}

companion object {
fun stateFromRTCMediaTrackState(trackState: MediaStreamTrack.State): State {
return when (trackState) {
MediaStreamTrack.State.ENDED -> State.ENDED
MediaStreamTrack.State.LIVE -> State.LIVE
}
}

fun stateFromRTCDataChannelState(dataChannelState: DataChannel.State): State {
return when (dataChannelState) {
DataChannel.State.CONNECTING,
DataChannel.State.OPEN -> {
State.LIVE
}
DataChannel.State.CLOSING,
DataChannel.State.CLOSED -> {
State.ENDED
}
}
}
}

}

sealed class TrackException(message: String? = null, cause: Throwable? = null) :
Expand Down

0 comments on commit 0e8dabf

Please sign in to comment.