Skip to content

Commit

Permalink
fix EOF not called in linear reader
Browse files Browse the repository at this point in the history
  • Loading branch information
divyam234 committed Nov 3, 2023
1 parent 6111362 commit 60cdcae
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
4 changes: 2 additions & 2 deletions services/file.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
return err
}
parts = rangedParts(parts, start, end)
r, _ := reader.NewLinearReader(c, client, parts)
r, _ := reader.NewLinearReader(c, client, parts, contentLength)
io.CopyN(w, r, contentLength)
return nil
})
Expand Down Expand Up @@ -461,7 +461,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
return
}
parts = rangedParts(parts, start, end)
r, _ := reader.NewLinearReader(c, client.Tg, parts)
r, _ := reader.NewLinearReader(c, client.Tg, parts, contentLength)
io.CopyN(w, r, contentLength)
}
}
Expand Down
50 changes: 23 additions & 27 deletions utils/reader/lr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@ import (
"context"
"fmt"
"io"
"sync"

"github.com/divyam234/teldrive/types"
"github.com/gotd/td/telegram"
"github.com/gotd/td/tg"
)

type linearReader struct {
ctx context.Context
parts []types.Part
pos int
client *telegram.Client
next func() []byte
buffer []byte
bytesread int64
chunkSize int64
i int64
mu sync.Mutex
ctx context.Context
parts []types.Part
pos int
client *telegram.Client
next func() []byte
buffer []byte
bytesread int64
chunkSize int64
i int64
contentLength int64
}

func (*linearReader) Close() error {
return nil
}

func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types.Part) (io.ReadCloser, error) {
func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types.Part, contentLength int64) (io.ReadCloser, error) {

r := &linearReader{
ctx: ctx,
parts: parts,
client: client,
chunkSize: int64(1024 * 1024),
ctx: ctx,
parts: parts,
client: client,
chunkSize: int64(1024 * 1024),
contentLength: contentLength,
}

r.next = r.partStream()
Expand All @@ -43,13 +43,16 @@ func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types
}

func (r *linearReader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.bytesread == r.contentLength {
return 0, io.EOF
}

if r.i >= int64(len(r.buffer)) {
r.buffer = r.next()
if len(r.buffer) == 0 && r.pos == len(r.parts)-1 {
return 0, io.EOF
if len(r.buffer) == 0 {
r.pos++
r.next = r.partStream()
}
r.i = 0
}
Expand All @@ -60,11 +63,6 @@ func (r *linearReader) Read(p []byte) (n int, err error) {

r.bytesread += int64(n)

if r.bytesread == r.parts[r.pos].Length && r.pos < len(r.parts)-1 {
r.pos++
r.next = r.partStream()
r.bytesread = 0
}
return n, nil
}

Expand All @@ -91,8 +89,6 @@ func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) {
}

func (r *linearReader) partStream() func() []byte {
r.mu.Lock()
defer r.mu.Unlock()

start := r.parts[r.pos].Start
end := r.parts[r.pos].End
Expand Down

0 comments on commit 60cdcae

Please sign in to comment.