Skip to content

Commit

Permalink
return errors, don't panic
Browse files Browse the repository at this point in the history
  • Loading branch information
otommod committed Nov 2, 2018
1 parent 36a0363 commit 20781c3
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 44 deletions.
74 changes: 30 additions & 44 deletions hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"net/http"
"net/url"
"sort"
"strconv"
"time"

"github.com/grafov/m3u8"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -29,8 +29,7 @@ var (
)

type HTTPError struct {
StatusCode int
Status string
*http.Response
}

func (e HTTPError) Error() string {
Expand All @@ -42,7 +41,7 @@ func getBestBandwidth(u *url.URL) (*m3u8.Variant, error) {
if err != nil {
return nil, err
} else if r.StatusCode != 200 {
return nil, HTTPError{r.StatusCode, r.Status}
return nil, HTTPError{r}
}

playlist, playlistType, err := m3u8.DecodeFrom(r.Body, false)
Expand Down Expand Up @@ -90,7 +89,7 @@ func HLS(u *url.URL, dst io.Writer) error {
if err != nil {
return err
} else if r.StatusCode != 200 {
return HTTPError{r.StatusCode, r.Status}
return HTTPError{r}
}

playlist, playlistType, err := DecodeFrom(r.Body, false)
Expand Down Expand Up @@ -131,19 +130,19 @@ func HLS(u *url.URL, dst io.Writer) error {
waitCh := time.After(media.TargetDuration)

segCh := make(chan io.ReadCloser)
doneCh := make(chan struct{})

go func() {
var g errgroup.Group
g.Go(func() error {
for r := range segCh {
if _, err := io.Copy(dst, r); err != nil {
log.Fatal(err)
return err
}
if err := r.Close(); err != nil {
log.Fatal(err)
return err
}
}
close(doneCh)
}()
return nil
})

for i, seg := range media.Segments {
seqID := mediaSequence + uint64(i)
Expand All @@ -153,7 +152,7 @@ func HLS(u *url.URL, dst io.Writer) error {
log.Println("skipping segment", seg.URI)
continue
} else if seqID > seenMediaSequence+1 {
log.Printf("\033[31m%d segments expired\033[m\n", seqID-seenMediaSequence-1)
log.Println("\033[31m", seqID-seenMediaSequence-1, "segments expired", "\033[m")
}
seenMediaSequence = seqID

Expand All @@ -177,53 +176,40 @@ func HLS(u *url.URL, dst io.Writer) error {
return errors.New("EXT-X-BYTERANGE offset not given")
}

// the Range header is inclusive
end := offset + seg.Limit - 1
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, end))
byterangeOffsets[seg.URI] = end + 1
}

success := false
retries := 0
retriesStart := time.Now()
retriesEnd := retriesStart.Add(media.TargetDuration)
for time.Now().Before(retriesEnd) {
err = retry(media.TargetDuration, func() error {
segData, err := http.DefaultClient.Do(req)
if err != nil || segData.StatusCode >= 500 {
// retry the request if it failed because of network or
// server issues

// retry the request if it failed due to network or server issues
if err != nil {
return err
} else if segData.StatusCode != 200 {
segData.Body.Close()
var delay time.Duration

if segData.StatusCode == 503 && segData.Header.Get("Retry-After") != "" {
header := segData.Header.Get("Retry-After")
when, err := http.ParseTime(header)
if err != nil {
after, err := strconv.ParseInt(header, 10, 64)
if err == nil && after > 0 {
delay = time.Duration(after) * time.Second
}
} else {
delay = time.Until(when)
}
} else {
retries++
delay = time.Duration(retries) * time.Second / 2
if segData.StatusCode >= 500 {
return HTTPError{segData}
} else if segData.StatusCode >= 400 {
// client error, the fault is on us
return stopRetrying{HTTPError{segData}}
}
time.Sleep(delay)
continue
}

segCh <- ActivityReadCloser{segData.Body}
success = true
break
}
if !success {
return fmt.Errorf("request failed %s", seg.URI)
return nil
})
if err != nil {
return err
}
}

close(segCh)
<-doneCh
if err := g.Wait(); err != nil {
return err
}

if media.Closed {
break
Expand Down
50 changes: 50 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package godam

import (
"net/http"
"strconv"
"time"
)

type stopRetrying struct {
error
}

func parseRetryAfterHeader(retryAfter string) time.Duration {
when, err := http.ParseTime(retryAfter)
if err != nil {
after, err := strconv.ParseInt(retryAfter, 10, 64)
if err == nil && after > 0 {
return time.Duration(after) * time.Second
}
}
return time.Until(when)
}

func retry(timeout time.Duration, f func() error) (err error) {
retriesStart := time.Now()
retriesEnd := retriesStart.Add(timeout)

for retries := 0; time.Now().Before(retriesEnd); retries++ {
var delay time.Duration

err = f()
if err != nil {
switch v := err.(type) {
case stopRetrying:
return v.error
case HTTPError:
if v.StatusCode == 503 && v.Header.Get("Retry-After") != "" {
delay = parseRetryAfterHeader(v.Header.Get("Retry-After"))
}
default:
delay = time.Duration(retries+1) * time.Second / 2
}

time.Sleep(delay)
continue
}
break
}
return err
}

0 comments on commit 20781c3

Please sign in to comment.