Skip to content

Commit

Permalink
use hierarchical contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed May 11, 2021
1 parent e558b24 commit f086b62
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 23 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210511094934-fa2830eb2251
github.com/aler9/gortsplib v0.0.0-20210511125241-4d1c2d18319b
github.com/asticode/go-astits v0.0.0-00010101000000-000000000000
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
github.com/aler9/gortsplib v0.0.0-20210511094934-fa2830eb2251 h1:pG9MnD5MOIpqcOBccQ5L4ZAhJGBcN7LKO87ilADzIYI=
github.com/aler9/gortsplib v0.0.0-20210511094934-fa2830eb2251/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/gortsplib v0.0.0-20210511125241-4d1c2d18319b h1:meeL4Le2z7zG0qO+X+2kG20/ys0MEph6RvamSYHqlAE=
github.com/aler9/gortsplib v0.0.0-20210511125241-4d1c2d18319b/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
Expand Down
5 changes: 4 additions & 1 deletion internal/hlsconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type Converter struct {

// New allocates a Converter.
func New(
ctxParent context.Context,
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
readBufferCount int,
Expand All @@ -161,7 +162,7 @@ func New(
pathMan PathMan,
parent Parent) *Converter {

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

c := &Converter{
hlsSegmentCount: hlsSegmentCount,
Expand Down Expand Up @@ -495,6 +496,8 @@ func (c *Converter) runInner(innerCtx context.Context) error {
return err

case <-innerCtx.Done():
c.ringBuffer.Close()
<-writerDone
return nil
}
}
Expand Down
4 changes: 3 additions & 1 deletion internal/hlsserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Server struct {

// New allocates a Server.
func New(
ctxParent context.Context,
address string,
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
Expand All @@ -56,7 +57,7 @@ func New(
return nil, err
}

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

s := &Server{
hlsSegmentCount: hlsSegmentCount,
Expand Down Expand Up @@ -105,6 +106,7 @@ outer:
c, ok := s.converters[req.Path]
if !ok {
c = hlsconverter.New(
s.ctx,
s.hlsSegmentCount,
s.hlsSegmentDuration,
s.readBufferCount,
Expand Down
5 changes: 4 additions & 1 deletion internal/path/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Path struct {

// New allocates a Path.
func New(
ctxParent context.Context,
rtspAddress string,
readTimeout time.Duration,
writeTimeout time.Duration,
Expand All @@ -117,7 +118,7 @@ func New(
stats *stats.Stats,
parent Parent) *Path {

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

pa := &Path{
rtspAddress: rtspAddress,
Expand Down Expand Up @@ -327,6 +328,7 @@ func (pa *Path) startExternalSource() {
if strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") {
pa.source = rtspsource.New(
pa.ctx,
pa.conf.Source,
pa.conf.SourceProtocolParsed,
pa.conf.SourceFingerprint,
Expand All @@ -340,6 +342,7 @@ func (pa *Path) startExternalSource() {

} else if strings.HasPrefix(pa.conf.Source, "rtmp://") {
pa.source = rtmpsource.New(
pa.ctx,
pa.conf.Source,
pa.readTimeout,
pa.writeTimeout,
Expand Down
8 changes: 3 additions & 5 deletions internal/pathman/pathman.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type PathManager struct {

// New allocates a PathManager.
func New(
ctxParent context.Context,
rtspAddress string,
readTimeout time.Duration,
writeTimeout time.Duration,
Expand All @@ -76,7 +77,7 @@ func New(
stats *stats.Stats,
parent Parent) *PathManager {

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

pm := &PathManager{
rtspAddress: rtspAddress,
Expand Down Expand Up @@ -251,14 +252,11 @@ outer:
}

pm.ctxCancel()

for _, pa := range pm.paths {
pa.Close()
}
}

func (pm *PathManager) createPath(confName string, conf *conf.PathConf, name string) {
pm.paths[name] = path.New(
pm.ctx,
pm.rtspAddress,
pm.readTimeout,
pm.writeTimeout,
Expand Down
5 changes: 3 additions & 2 deletions internal/rtmpconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Conn struct {

// New allocates a Conn.
func New(
ctxParent context.Context,
rtspAddress string,
readTimeout time.Duration,
writeTimeout time.Duration,
Expand All @@ -95,7 +96,7 @@ func New(
pathMan PathMan,
parent Parent) *Conn {

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

c := &Conn{
rtspAddress: rtspAddress,
Expand Down Expand Up @@ -157,7 +158,7 @@ func (c *Conn) run() {
defer onConnectCmd.Close()
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(c.ctx)
runErr := make(chan error)
go func() {
runErr <- c.runInner(ctx)
Expand Down
4 changes: 3 additions & 1 deletion internal/rtmpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Server struct {

// New allocates a Server.
func New(
ctxParent context.Context,
address string,
readTimeout time.Duration,
writeTimeout time.Duration,
Expand All @@ -57,7 +58,7 @@ func New(
return nil, err
}

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

s := &Server{
readTimeout: readTimeout,
Expand Down Expand Up @@ -133,6 +134,7 @@ outer:

case nconn := <-connNew:
c := rtmpconn.New(
s.ctx,
s.rtspAddress,
s.readTimeout,
s.writeTimeout,
Expand Down
16 changes: 9 additions & 7 deletions internal/rtmpsource/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ type Source struct {
}

// New allocates a Source.
func New(ur string,
func New(
ctxParent context.Context,
ur string,
readTimeout time.Duration,
writeTimeout time.Duration,
wg *sync.WaitGroup,
stats *stats.Stats,
parent Parent) *Source {

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

s := &Source{
ur: ur,
Expand Down Expand Up @@ -116,14 +118,14 @@ func (s *Source) run() {
}

func (s *Source) runInner() bool {
ctx, cancel := context.WithCancel(context.Background())
innerCtx, innerCtxCancel := context.WithCancel(s.ctx)

runErr := make(chan error)
go func() {
runErr <- func() error {
s.log(logger.Debug, "connecting")

ctx2, cancel2 := context.WithTimeout(ctx, s.readTimeout)
ctx2, cancel2 := context.WithTimeout(innerCtx, s.readTimeout)
defer cancel2()

conn, err := rtmp.DialContext(ctx2, s.ur)
Expand Down Expand Up @@ -258,7 +260,7 @@ func (s *Source) runInner() bool {
conn.NetConn().Close()
return err

case <-ctx.Done():
case <-innerCtx.Done():
conn.NetConn().Close()
<-readDone
return nil
Expand All @@ -268,12 +270,12 @@ func (s *Source) runInner() bool {

select {
case err := <-runErr:
cancel()
innerCtxCancel()
s.log(logger.Info, "ERR: %s", err)
return true

case <-s.ctx.Done():
cancel()
innerCtxCancel()
<-runErr
return false
}
Expand Down
3 changes: 2 additions & 1 deletion internal/rtspserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Server struct {

// New allocates a Server.
func New(
ctxParent context.Context,
address string,
readTimeout time.Duration,
writeTimeout time.Duration,
Expand All @@ -90,7 +91,7 @@ func New(
pathMan *pathman.PathManager,
parent Parent) (*Server, error) {

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

s := &Server{
readTimeout: readTimeout,
Expand Down
3 changes: 2 additions & 1 deletion internal/rtspsource/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Source struct {

// New allocates a Source.
func New(
ctxParent context.Context,
ur string,
proto *gortsplib.StreamProtocol,
fingerprint string,
Expand All @@ -60,7 +61,7 @@ func New(
stats *stats.Stats,
parent Parent) *Source {

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancel(ctxParent)

s := &Source{
ur: ur,
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (p *program) createResources(initial bool) error {

if p.pathMan == nil {
p.pathMan = pathman.New(
p.ctx,
p.conf.RTSPAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
Expand All @@ -215,6 +216,7 @@ func (p *program) createResources(initial bool) error {
if p.serverRTSPPlain == nil {
_, useUDP := p.conf.ProtocolsParsed[gortsplib.StreamProtocolUDP]
p.serverRTSPPlain, err = rtspserver.New(
p.ctx,
p.conf.RTSPAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
Expand Down Expand Up @@ -244,6 +246,7 @@ func (p *program) createResources(initial bool) error {
p.conf.EncryptionParsed == conf.EncryptionOptional) {
if p.serverRTSPTLS == nil {
p.serverRTSPTLS, err = rtspserver.New(
p.ctx,
p.conf.RTSPSAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
Expand Down Expand Up @@ -271,6 +274,7 @@ func (p *program) createResources(initial bool) error {
if !p.conf.RTMPDisable {
if p.serverRTMP == nil {
p.serverRTMP, err = rtmpserver.New(
p.ctx,
p.conf.RTMPAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
Expand All @@ -290,6 +294,7 @@ func (p *program) createResources(initial bool) error {
if !p.conf.HLSDisable {
if p.serverHLS == nil {
p.serverHLS, err = hlsserver.New(
p.ctx,
p.conf.HLSAddress,
p.conf.HLSSegmentCount,
p.conf.HLSSegmentDuration,
Expand Down

0 comments on commit f086b62

Please sign in to comment.