diff --git a/pkg/router.go b/pkg/router.go index 3f829e001..016fbfeb8 100644 --- a/pkg/router.go +++ b/pkg/router.go @@ -6,7 +6,6 @@ import ( "github.com/pion/ion-sfu/pkg/log" "github.com/pion/ion-sfu/pkg/util" "github.com/pion/rtcp" - "github.com/pion/rtp" ) // Router defines a track rtp/rtcp router @@ -16,16 +15,14 @@ type Router struct { pub Receiver pubLock sync.RWMutex subs map[string]*Sender - subChans map[string]chan *rtp.Packet subsLock sync.RWMutex } // NewRouter for routing rtp/rtcp packets func NewRouter(recv Receiver) *Router { r := &Router{ - pub: recv, - subs: make(map[string]*Sender), - subChans: make(map[string]chan *rtp.Packet), + pub: recv, + subs: make(map[string]*Sender), } go r.start() @@ -37,11 +34,8 @@ func NewRouter(recv Receiver) *Router { func (r *Router) AddSub(pid string, sub *Sender) { r.subsLock.Lock() r.subs[pid] = sub - subChan := make(chan *rtp.Packet, 1000) - r.subChans[pid] = subChan r.subsLock.Unlock() - go r.subWriteLoop(subChan, sub) go r.subFeedbackLoop(sub) } @@ -62,7 +56,6 @@ func (r *Router) Close() { r.subsLock.Lock() for pid, sub := range r.subs { sub.Close() - close(r.subChans[pid]) delete(r.subs, pid) } r.subsLock.Unlock() @@ -95,22 +88,13 @@ func (r *Router) start() { r.subsLock.RLock() // Push to sub send queues - for pid := range r.subs { - r.subChans[pid] <- pkt + for _, sub := range r.subs { + sub.sendChan <- pkt } r.subsLock.RUnlock() } } -func (r *Router) subWriteLoop(ch chan *rtp.Packet, sub *Sender) { - for pkt := range ch { - if err := sub.WriteRTP(pkt); err != nil { - log.Errorf("wt.WriteRTP err=%v", err) - } - } - log.Infof("Closing sub writer") -} - // subFeedbackLoop reads rtcp packets from the sub // and either handles them or forwards them to the pub. func (r *Router) subFeedbackLoop(sub *Sender) { diff --git a/pkg/sender.go b/pkg/sender.go index 682f8dd51..a568e48da 100644 --- a/pkg/sender.go +++ b/pkg/sender.go @@ -21,20 +21,22 @@ type SenderConfig struct { // Sender represents a track being sent to a peer type Sender struct { - track *webrtc.Track - stop bool - rtcpCh chan rtcp.Packet - useRemb bool - rembCh chan *rtcp.ReceiverEstimatedMaximumBitrate - target uint64 + track *webrtc.Track + stop bool + rtcpCh chan rtcp.Packet + useRemb bool + rembCh chan *rtcp.ReceiverEstimatedMaximumBitrate + target uint64 + sendChan chan *rtp.Packet } // NewSender creates a new send track instance func NewSender(track *webrtc.Track, sender *webrtc.RTPSender) *Sender { s := &Sender{ - track: track, - rtcpCh: make(chan rtcp.Packet, maxSize), - rembCh: make(chan *rtcp.ReceiverEstimatedMaximumBitrate, maxSize), + track: track, + rtcpCh: make(chan rtcp.Packet, maxSize), + rembCh: make(chan *rtcp.ReceiverEstimatedMaximumBitrate, maxSize), + sendChan: make(chan *rtp.Packet, maxSize), } for _, feedback := range track.Codec().RTCPFeedback { @@ -50,10 +52,20 @@ func NewSender(track *webrtc.Track, sender *webrtc.RTPSender) *Sender { } go s.receiveRTCP(sender) + go s.sendRTP() return s } +func (s *Sender) sendRTP() { + for pkt := range s.sendChan { + if err := s.WriteRTP(pkt); err != nil { + log.Errorf("wt.WriteRTP err=%v", err) + } + } + log.Infof("Closing send writer") +} + // ReadRTCP read rtp packet func (s *Sender) ReadRTCP() (rtcp.Packet, error) { rtcp, ok := <-s.rtcpCh @@ -76,6 +88,7 @@ func (s *Sender) WriteRTP(pkt *rtp.Packet) error { // Close track func (s *Sender) Close() { s.stop = true + close(s.sendChan) } func (s *Sender) receiveRTCP(sender *webrtc.RTPSender) {