Skip to content

Commit

Permalink
add queue WriteHeader/WriteTrailer modify example
Browse files Browse the repository at this point in the history
  • Loading branch information
nareix committed Oct 16, 2016
1 parent 00d8696 commit 1f76954
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
8 changes: 7 additions & 1 deletion av/pubsub/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (self *Queue) SetMaxGopCount(n int) {
return
}

func (self *Queue) WriteHeader(streams []av.CodecData) {
func (self *Queue) WriteHeader(streams []av.CodecData) error {
self.lock.Lock()

self.streams = streams
Expand All @@ -60,6 +60,12 @@ func (self *Queue) WriteHeader(streams []av.CodecData) {
self.cond.Broadcast()

self.lock.Unlock()

return nil
}

func (self *Queue) WriteTrailer() error {
return nil
}

// After Close() called, all QueueCursor's ReadPacket will return io.EOF.
Expand Down
41 changes: 20 additions & 21 deletions examples/rtmp_server_channels/main.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package main

import (
"sync"
"fmt"
"time"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/format"
"github.com/nareix/joy4/av/avutil"
"github.com/nareix/joy4/av/pubsub"
"github.com/nareix/joy4/av/pktque"
"github.com/nareix/joy4/av/pubsub"
"github.com/nareix/joy4/format"
"github.com/nareix/joy4/format/rtmp"
"sync"
"time"
)

func init() {
format.RegisterAll()
}

type FrameDropper struct {
Interval int
n int
skipping bool
DelaySkip time.Duration
lasttime time.Time
lastpkttime time.Duration
delay time.Duration
Interval int
n int
skipping bool
DelaySkip time.Duration
lasttime time.Time
lastpkttime time.Duration
delay time.Duration
SkipInterval int
}

Expand Down Expand Up @@ -125,7 +125,7 @@ func main() {
}

demuxer := &pktque.FilterDemuxer{
Filter: filters,
Filter: filters,
Demuxer: cursor,
}

Expand All @@ -134,17 +134,16 @@ func main() {
}

server.HandlePublish = func(conn *rtmp.Conn) {
streams, _ := conn.Streams()

l.Lock()
ch := channels[conn.URL.Path]
if ch == nil {
ch = &Channel{}
ch.que = pubsub.NewQueue(streams)
ch.que = pubsub.NewQueue()
query := conn.URL.Query()
if q := query.Get("cachetime"); q != "" {
dur, _ := time.ParseDuration(q)
ch.que.SetMaxDuration(dur)
if q := query.Get("cachegop"); q != "" {
var n int
fmt.Sscanf(q, "%d", &n)
ch.que.SetMaxGopCount(n)
}
channels[conn.URL.Path] = ch
} else {
Expand All @@ -155,7 +154,7 @@ func main() {
return
}

avutil.CopyPackets(ch.que, conn)
avutil.CopyFile(ch.que, conn)

l.Lock()
delete(channels, conn.URL.Path)
Expand All @@ -176,6 +175,6 @@ func main() {
// ffplay rtmp://localhost/movie?delaytime=10s&waitkey=true
// ffplay rtmp://localhost/movie?delaytime=20s

// ffmpeg -re -i movie.flv -c copy -f flv rtmp://localhost/movie?cachetime=30s
// ffmpeg -re -i movie.flv -c copy -f flv rtmp://localhost/movie?cachetime=1m
// ffmpeg -re -i movie.flv -c copy -f flv rtmp://localhost/movie?cachegop=2
// ffmpeg -re -i movie.flv -c copy -f flv rtmp://localhost/movie?cachegop=1
}

0 comments on commit 1f76954

Please sign in to comment.