forked from rclone/rclone
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread.go
506 lines (475 loc) · 13.9 KB
/
read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
package vfs
import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunkedreader"
"github.com/rclone/rclone/fs/hash"
)
// ReadFileHandle is an open for read file handle on a File
type ReadFileHandle struct {
baseHandle
done func(ctx context.Context, err error)
mu sync.Mutex
cond sync.Cond // cond lock for out of sequence reads
r *accounting.Account
size int64 // size of the object (0 for unknown length)
offset int64 // offset of read of o
roffset int64 // offset of Read() calls
file *File
hash *hash.MultiHasher
remote string
closed bool // set if handle has been closed
readCalled bool // set if read has been called
noSeek bool
sizeUnknown bool // set if size of source is not known
opened bool
}
// Check interfaces
var (
_ io.Reader = (*ReadFileHandle)(nil)
_ io.ReaderAt = (*ReadFileHandle)(nil)
_ io.Seeker = (*ReadFileHandle)(nil)
_ io.Closer = (*ReadFileHandle)(nil)
)
func newReadFileHandle(f *File) (*ReadFileHandle, error) {
var mhash *hash.MultiHasher
var err error
o := f.getObject()
if !f.VFS().Opt.NoChecksum {
hashes := hash.NewHashSet(o.Fs().Hashes().GetOne()) // just pick one hash
mhash, err = hash.NewMultiHasherTypes(hashes)
if err != nil {
fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err)
}
}
fh := &ReadFileHandle{
remote: o.Remote(),
noSeek: f.VFS().Opt.NoSeek,
file: f,
hash: mhash,
size: nonNegative(o.Size()),
sizeUnknown: o.Size() < 0,
}
fh.cond = sync.Cond{L: &fh.mu}
return fh, nil
}
// openPending opens the file if there is a pending open
// call with the lock held
func (fh *ReadFileHandle) openPending() (err error) {
if fh.opened {
return nil
}
o := fh.file.getObject()
opt := &fh.file.VFS().Opt
r, err := chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams).Open()
if err != nil {
return err
}
tr := accounting.GlobalStats().NewTransfer(o, nil)
fh.done = tr.Done
fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer
fh.opened = true
return nil
}
// String converts it to printable
func (fh *ReadFileHandle) String() string {
if fh == nil {
return "<nil *ReadFileHandle>"
}
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.file == nil {
return "<nil *ReadFileHandle.file>"
}
return fh.file.String() + " (r)"
}
// Node returns the Node associated with this - satisfies Noder interface
func (fh *ReadFileHandle) Node() Node {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file
}
// seek to a new offset
//
// if reopen is true, then we won't attempt to use an io.Seeker interface
//
// Must be called with fh.mu held
func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
if fh.noSeek {
return ESPIPE
}
fh.hash = nil
if !reopen {
ar := fh.r.GetAsyncReader()
// try to fulfill the seek with buffer discard
if ar != nil && ar.SkipBytes(int(offset-fh.offset)) {
fh.offset = offset
return nil
}
}
fh.r.StopBuffering() // stop the background reading first
oldReader := fh.r.GetReader()
r, ok := oldReader.(chunkedreader.ChunkedReader)
if !ok {
fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader)
reopen = true
}
if !reopen {
fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (fs.RangeSeeker)", fh.offset, offset)
_, err = r.RangeSeek(context.TODO(), offset, io.SeekStart, -1)
if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read fs.RangeSeeker failed: %v", err)
return err
}
} else {
fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d", fh.offset, offset)
// close old one
err = oldReader.Close()
if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read seek close old failed: %v", err)
}
// re-open with a seek
o := fh.file.getObject()
opt := &fh.file.VFS().Opt
r = chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams)
_, err := r.Seek(offset, 0)
if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
return err
}
r, err = r.Open()
if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
return err
}
}
fh.r.UpdateReader(context.TODO(), r)
fh.offset = offset
return nil
}
// Seek the file - returns ESPIPE if seeking isn't possible
func (fh *ReadFileHandle) Seek(offset int64, whence int) (n int64, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.noSeek {
return 0, ESPIPE
}
size := fh.size
switch whence {
case io.SeekStart:
fh.roffset = 0
case io.SeekEnd:
fh.roffset = size
}
fh.roffset += offset
// we don't check the offset - the next Read will
return fh.roffset, nil
}
// ReadAt reads len(p) bytes into p starting at offset off in the
// underlying input source. It returns the number of bytes read (0 <=
// n <= len(p)) and any error encountered.
//
// When ReadAt returns n < len(p), it returns a non-nil error
// explaining why more bytes were not returned. In this respect,
// ReadAt is stricter than Read.
//
// Even if ReadAt returns n < len(p), it may use all of p as scratch
// space during the call. If some data is available but not len(p)
// bytes, ReadAt blocks until either all the data is available or an
// error occurs. In this respect ReadAt is different from Read.
//
// If the n = len(p) bytes returned by ReadAt are at the end of the
// input source, ReadAt may return either err == EOF or err == nil.
//
// If ReadAt is reading from an input source with a seek offset,
// ReadAt should not affect nor be affected by the underlying seek
// offset.
//
// Clients of ReadAt can execute parallel ReadAt calls on the same
// input source.
//
// Implementations must not retain p.
func (fh *ReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.readAt(p, off)
}
// This waits for *poff to equal off or aborts after the timeout.
//
// Waits here potentially affect all seeks so need to keep them short.
//
// Call with fh.mu Locked
func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Duration, poff *int64, off int64) {
var (
timeout = time.NewTimer(maxWait)
done = make(chan struct{})
abort atomic.Int32
)
go func() {
select {
case <-timeout.C:
// take the lock to make sure that cond.Wait() is called before
// cond.Broadcast. NB cond.L == mu
cond.L.Lock()
// set abort flag and give all the waiting goroutines a kick on timeout
abort.Store(1)
fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off)
cond.Broadcast()
cond.L.Unlock()
case <-done:
}
}()
for *poff != off && abort.Load() == 0 {
fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait)
cond.Wait()
}
// tidy up end timer
close(done)
timeout.Stop()
if *poff != off {
fs.Debugf(remote, "failed to wait for in-sequence %s to %d", what, off)
}
}
// Implementation of ReadAt - call with lock held
func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
// defer log.Trace(fh.remote, "p[%d], off=%d", len(p), off)("n=%d, err=%v", &n, &err)
err = fh.openPending() // FIXME pending open could be more efficient in the presence of seek (and retries)
if err != nil {
return 0, err
}
// fs.Debugf(fh.remote, "ReadFileHandle.Read size %d offset %d", reqSize, off)
if fh.closed {
fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF)
return 0, ECLOSED
}
maxBuf := 1024 * 1024
if len(p) < maxBuf {
maxBuf = len(p)
}
if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) {
waitSequential("read", fh.remote, &fh.cond, time.Duration(fh.file.VFS().Opt.ReadWait), &fh.offset, off)
}
doSeek := off != fh.offset
if doSeek && fh.noSeek {
return 0, ESPIPE
}
var newOffset int64
retries := 0
reqSize := len(p)
doReopen := false
lowLevelRetries := fs.GetConfig(context.TODO()).LowLevelRetries
for {
if doSeek {
// Are we attempting to seek beyond the end of the
// file - if so just return EOF leaving the underlying
// file in an unchanged state.
if off >= fh.size {
fs.Debugf(fh.remote, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size)
return 0, io.EOF
}
// Otherwise do the seek
err = fh.seek(off, doReopen)
} else {
err = nil
}
if err == nil {
if reqSize > 0 {
fh.readCalled = true
}
n, err = io.ReadFull(fh.r, p)
newOffset = fh.offset + int64(n)
// if err == nil && rand.Intn(10) == 0 {
// err = errors.New("random error")
// }
if err == nil {
break
} else if (err == io.ErrUnexpectedEOF || err == io.EOF) && (newOffset == fh.size || fh.sizeUnknown) {
if fh.sizeUnknown {
// size is now known since we have read to the end
fh.sizeUnknown = false
fh.size = newOffset
}
// Have read to end of file - reset error
err = nil
break
}
}
if retries >= lowLevelRetries {
break
}
retries++
fs.Errorf(fh.remote, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, lowLevelRetries, err)
doSeek = true
doReopen = true
}
if err != nil {
fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", err)
} else {
fh.offset = newOffset
// fs.Debugf(fh.remote, "ReadFileHandle.Read OK")
if fh.hash != nil {
_, err = fh.hash.Write(p[:n])
if err != nil {
fs.Errorf(fh.remote, "ReadFileHandle.Read HashError: %v", err)
return 0, err
}
}
// If we have no error and we didn't fill the buffer, must be EOF
if n != len(p) {
err = io.EOF
}
}
fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read
return n, err
}
func (fh *ReadFileHandle) checkHash() error {
if fh.hash == nil || !fh.readCalled || fh.offset < fh.size {
return nil
}
o := fh.file.getObject()
for hashType, dstSum := range fh.hash.Sums() {
srcSum, err := o.Hash(context.TODO(), hashType)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// if it was file not found then at
// this point we don't care any more
continue
}
return err
}
if !hash.Equals(dstSum, srcSum) {
return fmt.Errorf("corrupted on transfer: %v hashes differ src %q vs dst %q", hashType, srcSum, dstSum)
}
}
return nil
}
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0
// <= n <= len(p)) and any error encountered. Even if Read returns n < len(p),
// it may use all of p as scratch space during the call. If some data is
// available but not len(p) bytes, Read conventionally returns what is
// available instead of waiting for more.
//
// When Read encounters an error or end-of-file condition after successfully
// reading n > 0 bytes, it returns the number of bytes read. It may return the
// (non-nil) error from the same call or return the error (and n == 0) from a
// subsequent call. An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may return either
// err == EOF or err == nil. The next Read should return 0, EOF.
//
// Callers should always process the n > 0 bytes returned before considering
// the error err. Doing so correctly handles I/O errors that happen after
// reading some bytes and also both of the allowed EOF behaviors.
//
// Implementations of Read are discouraged from returning a zero byte count
// with a nil error, except when len(p) == 0. Callers should treat a return of
// 0 and nil as indicating that nothing happened; in particular it does not
// indicate EOF.
//
// Implementations must not retain p.
func (fh *ReadFileHandle) Read(p []byte) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.roffset >= fh.size && !fh.sizeUnknown {
return 0, io.EOF
}
n, err = fh.readAt(p, fh.roffset)
fh.roffset += int64(n)
return n, err
}
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
func (fh *ReadFileHandle) close() error {
if fh.closed {
return ECLOSED
}
fh.closed = true
if fh.opened {
var err error
defer func() {
fh.done(context.TODO(), err)
}()
// Close first so that we have hashes
err = fh.r.Close()
if err != nil {
return err
}
// Now check the hash
err = fh.checkHash()
if err != nil {
return err
}
}
return nil
}
// Close closes the file
func (fh *ReadFileHandle) Close() error {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.close()
}
// Flush is called each time the file or directory is closed.
// Because there can be multiple file descriptors referring to a
// single opened file, Flush can be called multiple times.
func (fh *ReadFileHandle) Flush() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return nil
}
// fs.Debugf(fh.remote, "ReadFileHandle.Flush")
if err := fh.checkHash(); err != nil {
fs.Errorf(fh.remote, "ReadFileHandle.Flush error: %v", err)
return err
}
// fs.Debugf(fh.remote, "ReadFileHandle.Flush OK")
return nil
}
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *ReadFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return nil
}
if fh.closed {
fs.Debugf(fh.remote, "ReadFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.remote, "ReadFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "ReadFileHandle.Release error: %v", err)
//} else {
// fs.Debugf(fh.remote, "ReadFileHandle.Release OK")
}
return err
}
// Name returns the name of the file from the underlying Object.
func (fh *ReadFileHandle) Name() string {
return fh.file.String()
}
// Size returns the size of the underlying file
func (fh *ReadFileHandle) Size() int64 {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.size
}
// Stat returns info about the file
func (fh *ReadFileHandle) Stat() (os.FileInfo, error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file, nil
}