Skip to content

Commit

Permalink
Merge pull request ionorg#49 from pion/move_send
Browse files Browse the repository at this point in the history
Move send queue and loop to sender
  • Loading branch information
jbrady42 authored Jul 29, 2020
2 parents d03a4c5 + 9fe5b72 commit f932260
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 29 deletions.
24 changes: 4 additions & 20 deletions pkg/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/pion/ion-sfu/pkg/log"
"github.com/pion/ion-sfu/pkg/util"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)

// Router defines a track rtp/rtcp router
Expand All @@ -16,16 +15,14 @@ type Router struct {
pub Receiver
pubLock sync.RWMutex
subs map[string]*Sender
subChans map[string]chan *rtp.Packet
subsLock sync.RWMutex
}

// NewRouter for routing rtp/rtcp packets
func NewRouter(recv Receiver) *Router {
r := &Router{
pub: recv,
subs: make(map[string]*Sender),
subChans: make(map[string]chan *rtp.Packet),
pub: recv,
subs: make(map[string]*Sender),
}

go r.start()
Expand All @@ -37,11 +34,8 @@ func NewRouter(recv Receiver) *Router {
func (r *Router) AddSub(pid string, sub *Sender) {
r.subsLock.Lock()
r.subs[pid] = sub
subChan := make(chan *rtp.Packet, 1000)
r.subChans[pid] = subChan
r.subsLock.Unlock()

go r.subWriteLoop(subChan, sub)
go r.subFeedbackLoop(sub)
}

Expand All @@ -62,7 +56,6 @@ func (r *Router) Close() {
r.subsLock.Lock()
for pid, sub := range r.subs {
sub.Close()
close(r.subChans[pid])
delete(r.subs, pid)
}
r.subsLock.Unlock()
Expand Down Expand Up @@ -95,22 +88,13 @@ func (r *Router) start() {

r.subsLock.RLock()
// Push to sub send queues
for pid := range r.subs {
r.subChans[pid] <- pkt
for _, sub := range r.subs {
sub.sendChan <- pkt
}
r.subsLock.RUnlock()
}
}

func (r *Router) subWriteLoop(ch chan *rtp.Packet, sub *Sender) {
for pkt := range ch {
if err := sub.WriteRTP(pkt); err != nil {
log.Errorf("wt.WriteRTP err=%v", err)
}
}
log.Infof("Closing sub writer")
}

// subFeedbackLoop reads rtcp packets from the sub
// and either handles them or forwards them to the pub.
func (r *Router) subFeedbackLoop(sub *Sender) {
Expand Down
31 changes: 22 additions & 9 deletions pkg/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ type SenderConfig struct {

// Sender represents a track being sent to a peer
type Sender struct {
track *webrtc.Track
stop bool
rtcpCh chan rtcp.Packet
useRemb bool
rembCh chan *rtcp.ReceiverEstimatedMaximumBitrate
target uint64
track *webrtc.Track
stop bool
rtcpCh chan rtcp.Packet
useRemb bool
rembCh chan *rtcp.ReceiverEstimatedMaximumBitrate
target uint64
sendChan chan *rtp.Packet
}

// NewSender creates a new send track instance
func NewSender(track *webrtc.Track, sender *webrtc.RTPSender) *Sender {
s := &Sender{
track: track,
rtcpCh: make(chan rtcp.Packet, maxSize),
rembCh: make(chan *rtcp.ReceiverEstimatedMaximumBitrate, maxSize),
track: track,
rtcpCh: make(chan rtcp.Packet, maxSize),
rembCh: make(chan *rtcp.ReceiverEstimatedMaximumBitrate, maxSize),
sendChan: make(chan *rtp.Packet, maxSize),
}

for _, feedback := range track.Codec().RTCPFeedback {
Expand All @@ -50,10 +52,20 @@ func NewSender(track *webrtc.Track, sender *webrtc.RTPSender) *Sender {
}

go s.receiveRTCP(sender)
go s.sendRTP()

return s
}

func (s *Sender) sendRTP() {
for pkt := range s.sendChan {
if err := s.WriteRTP(pkt); err != nil {
log.Errorf("wt.WriteRTP err=%v", err)
}
}
log.Infof("Closing send writer")
}

// ReadRTCP read rtp packet
func (s *Sender) ReadRTCP() (rtcp.Packet, error) {
rtcp, ok := <-s.rtcpCh
Expand All @@ -76,6 +88,7 @@ func (s *Sender) WriteRTP(pkt *rtp.Packet) error {
// Close track
func (s *Sender) Close() {
s.stop = true
close(s.sendChan)
}

func (s *Sender) receiveRTCP(sender *webrtc.RTPSender) {
Expand Down

0 comments on commit f932260

Please sign in to comment.