forked from rclone/rclone
-
Notifications
You must be signed in to change notification settings - Fork 0
/
read_write.go
457 lines (404 loc) · 10.4 KB
/
read_write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
package vfs
import (
"fmt"
"io"
"os"
"sync"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/vfs/vfscache"
)
// RWFileHandle is a handle that can be open for read and write.
//
// It will be open to a temporary file which, when closed, will be
// transferred to the remote.
type RWFileHandle struct {
// read only variables
file *File
d *Dir
flags int // open flags
item *vfscache.Item // cached file item
// read write variables protected by mutex
mu sync.Mutex
offset int64 // file pointer offset
closed bool // set if handle has been closed
opened bool
writeCalled bool // if any Write() methods have been called
}
func newRWFileHandle(d *Dir, f *File, flags int) (fh *RWFileHandle, err error) {
defer log.Trace(f.Path(), "")("err=%v", &err)
// get an item to represent this from the cache
item := d.vfs.cache.Item(f.Path())
exists := f.exists() || (item.Exists() && !item.WrittenBack())
// if O_CREATE and O_EXCL are set and if path already exists, then return EEXIST
if flags&(os.O_CREATE|os.O_EXCL) == os.O_CREATE|os.O_EXCL && exists {
return nil, EEXIST
}
fh = &RWFileHandle{
file: f,
d: d,
flags: flags,
item: item,
}
// truncate immediately if O_TRUNC is set or O_CREATE is set and file doesn't exist
if !fh.readOnly() && (fh.flags&os.O_TRUNC != 0 || (fh.flags&os.O_CREATE != 0 && !exists)) {
err = fh.Truncate(0)
if err != nil {
return nil, errors.Wrap(err, "cache open with O_TRUNC: failed to truncate")
}
// we definitely need to write back the item even if we don't write to it
item.Dirty()
}
if !fh.readOnly() {
fh.file.addWriter(fh)
}
return fh, nil
}
// readOnly returns whether flags say fh is read only
func (fh *RWFileHandle) readOnly() bool {
return (fh.flags & accessModeMask) == os.O_RDONLY
}
// writeOnly returns whether flags say fh is write only
func (fh *RWFileHandle) writeOnly() bool {
return (fh.flags & accessModeMask) == os.O_WRONLY
}
// openPending opens the file if there is a pending open
//
// call with the lock held
func (fh *RWFileHandle) openPending() (err error) {
if fh.opened {
return nil
}
defer log.Trace(fh.logPrefix(), "")("err=%v", &err)
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
o := fh.file.getObject()
err = fh.item.Open(o)
if err != nil {
return errors.Wrap(err, "open RW handle failed to open cache file")
}
size := fh._size() // update size in file and read size
if fh.flags&os.O_APPEND != 0 {
fh.offset = size
fs.Debugf(fh.logPrefix(), "open at offset %d", fh.offset)
} else {
fh.offset = 0
}
fh.opened = true
fh.d.addObject(fh.file) // make sure the directory has this object in it now
return nil
}
// String converts it to printable
func (fh *RWFileHandle) String() string {
if fh == nil {
return "<nil *RWFileHandle>"
}
if fh.file == nil {
return "<nil *RWFileHandle.file>"
}
return fh.file.String() + " (rw)"
}
// Node returns the Node associated with this - satisfies Noder interface
func (fh *RWFileHandle) Node() Node {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file
}
// updateSize updates the size of the file if necessary
//
// Must be called with fh.mu held
func (fh *RWFileHandle) updateSize() {
// If read only or not opened then ignore
if fh.readOnly() || !fh.opened {
return
}
size := fh._size()
fh.file.setSize(size)
}
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
//
// Note that we leave the file around in the cache on error conditions
// to give the user a chance to recover it.
func (fh *RWFileHandle) close() (err error) {
defer log.Trace(fh.logPrefix(), "")("err=%v", &err)
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
if fh.closed {
return ECLOSED
}
fh.closed = true
fh.updateSize()
if fh.opened {
err = fh.item.Close(fh.file.setObject)
fh.opened = false
}
if !fh.readOnly() {
fh.file.delWriter(fh)
}
return err
}
// Close closes the file
func (fh *RWFileHandle) Close() error {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.close()
}
// Flush is called each time the file or directory is closed.
// Because there can be multiple file descriptors referring to a
// single opened file, Flush can be called multiple times.
func (fh *RWFileHandle) Flush() error {
fh.mu.Lock()
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush")
fh.updateSize()
fh.mu.Unlock()
return nil
}
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *RWFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.logPrefix(), "RWFileHandle.Release")
if fh.closed {
// Don't return an error if called twice
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.logPrefix(), "RWFileHandle.Release error: %v", err)
}
return err
}
// _size returns the size of the underlying file and also sets it in
// the owning file
//
// call with the lock held
func (fh *RWFileHandle) _size() int64 {
size, err := fh.item.GetSize()
if err != nil {
o := fh.file.getObject()
if o != nil {
size = o.Size()
} else {
fs.Errorf(fh.logPrefix(), "Couldn't read size of file")
size = 0
}
}
fh.file.setSize(size)
return size
}
// Size returns the size of the underlying file
func (fh *RWFileHandle) Size() int64 {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh._size()
}
// Stat returns info about the file
func (fh *RWFileHandle) Stat() (os.FileInfo, error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file, nil
}
// _readAt bytes from the file at off
//
// if release is set then it releases the mutex just before doing the IO
//
// call with lock held
func (fh *RWFileHandle) _readAt(b []byte, off int64, release bool) (n int, err error) {
defer log.Trace(fh.logPrefix(), "size=%d, off=%d", len(b), off)("n=%d, err=%v", &n, &err)
if fh.closed {
return n, ECLOSED
}
if fh.writeOnly() {
return n, EBADF
}
if off >= fh._size() {
return n, io.EOF
}
if err = fh.openPending(); err != nil {
return n, err
}
if release {
// Do the writing with fh.mu unlocked
fh.mu.Unlock()
}
n, err = fh.item.ReadAt(b, off)
if release {
fh.mu.Lock()
}
return n, err
}
// ReadAt bytes from the file at off
func (fh *RWFileHandle) ReadAt(b []byte, off int64) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh._readAt(b, off, true)
}
// Read bytes from the file
func (fh *RWFileHandle) Read(b []byte) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
n, err = fh._readAt(b, fh.offset, false)
fh.offset += int64(n)
return n, err
}
// Seek to new file position
func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if !fh.opened && offset == 0 && whence != 2 {
return 0, nil
}
if err = fh.openPending(); err != nil {
return ret, err
}
switch whence {
case io.SeekStart:
fh.offset = 0
case io.SeekEnd:
fh.offset = fh._size()
}
fh.offset += offset
// we don't check the offset - the next Read will
return fh.offset, nil
}
// _writeAt bytes to the file at off
//
// if release is set then it releases the mutex just before doing the IO
//
// call with lock held
func (fh *RWFileHandle) _writeAt(b []byte, off int64, release bool) (n int, err error) {
defer log.Trace(fh.logPrefix(), "size=%d, off=%d", len(b), off)("n=%d, err=%v", &n, &err)
if fh.closed {
return n, ECLOSED
}
if fh.readOnly() {
return n, EBADF
}
if err = fh.openPending(); err != nil {
return n, err
}
if fh.flags&os.O_APPEND != 0 {
// From open(2): Before each write(2), the file offset is
// positioned at the end of the file, as if with lseek(2).
size := fh._size()
fh.offset = size
off = fh.offset
}
fh.writeCalled = true
if release {
// Do the writing with fh.mu unlocked
fh.mu.Unlock()
}
n, err = fh.item.WriteAt(b, off)
if release {
fh.mu.Lock()
}
if err != nil {
return n, err
}
_ = fh._size()
return n, err
}
// WriteAt bytes to the file at off
func (fh *RWFileHandle) WriteAt(b []byte, off int64) (n int, err error) {
fh.mu.Lock()
n, err = fh._writeAt(b, off, true)
if fh.flags&os.O_APPEND != 0 {
fh.offset += int64(n)
}
fh.mu.Unlock()
return n, err
}
// Write bytes to the file
func (fh *RWFileHandle) Write(b []byte) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
n, err = fh._writeAt(b, fh.offset, false)
fh.offset += int64(n)
return n, err
}
// WriteString a string to the file
func (fh *RWFileHandle) WriteString(s string) (n int, err error) {
return fh.Write([]byte(s))
}
// Truncate file to given size
//
// Call with mutex held
func (fh *RWFileHandle) _truncate(size int64) (err error) {
if size == fh._size() {
return nil
}
fh.file.setSize(size)
return fh.item.Truncate(size)
}
// Truncate file to given size
func (fh *RWFileHandle) Truncate(size int64) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if err = fh.openPending(); err != nil {
return err
}
return fh._truncate(size)
}
// Sync commits the current contents of the file to stable storage. Typically,
// this means flushing the file system's in-memory copy of recently written
// data to disk.
func (fh *RWFileHandle) Sync() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if !fh.opened {
return nil
}
if fh.readOnly() {
return nil
}
return fh.item.Sync()
}
func (fh *RWFileHandle) logPrefix() string {
return fmt.Sprintf("%s(%p)", fh.file.Path(), fh)
}
// Chdir changes the current working directory to the file, which must
// be a directory.
func (fh *RWFileHandle) Chdir() error {
return ENOSYS
}
// Chmod changes the mode of the file to mode.
func (fh *RWFileHandle) Chmod(mode os.FileMode) error {
return ENOSYS
}
// Chown changes the numeric uid and gid of the named file.
func (fh *RWFileHandle) Chown(uid, gid int) error {
return ENOSYS
}
// Fd returns the integer Unix file descriptor referencing the open file.
func (fh *RWFileHandle) Fd() uintptr {
return 0xdeadbeef // FIXME
}
// Name returns the name of the file from the underlying Object.
func (fh *RWFileHandle) Name() string {
return fh.file.String()
}
// Readdir reads the contents of the directory associated with file.
func (fh *RWFileHandle) Readdir(n int) ([]os.FileInfo, error) {
return nil, ENOSYS
}
// Readdirnames reads the contents of the directory associated with file.
func (fh *RWFileHandle) Readdirnames(n int) (names []string, err error) {
return nil, ENOSYS
}