forked from ipfs/kubo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfd.go
151 lines (123 loc) · 3.25 KB
/
fd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package mfs
import (
"fmt"
"io"
mod "github.com/ipfs/go-ipfs/unixfs/mod"
context "context"
)
type FileDescriptor interface {
io.Reader
CtxReadFull(context.Context, []byte) (int, error)
io.Writer
io.WriterAt
io.Closer
io.Seeker
Truncate(int64) error
Size() (int64, error)
Sync() error
Flush() error
}
type fileDescriptor struct {
inode *File
mod *mod.DagModifier
perms int
sync bool
hasChanges bool
closed bool
}
// Size returns the size of the file referred to by this descriptor
func (fi *fileDescriptor) Size() (int64, error) {
return fi.mod.Size()
}
// Truncate truncates the file to size
func (fi *fileDescriptor) Truncate(size int64) error {
if fi.perms == OpenReadOnly {
return fmt.Errorf("cannot call truncate on readonly file descriptor")
}
fi.hasChanges = true
return fi.mod.Truncate(size)
}
// Write writes the given data to the file at its current offset
func (fi *fileDescriptor) Write(b []byte) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
}
fi.hasChanges = true
return fi.mod.Write(b)
}
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) Read(b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
}
return fi.mod.Read(b)
}
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
}
return fi.mod.CtxReadFull(ctx, b)
}
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *fileDescriptor) Close() error {
defer func() {
switch fi.perms {
case OpenReadOnly:
fi.inode.desclock.RUnlock()
case OpenWriteOnly, OpenReadWrite:
fi.inode.desclock.Unlock()
}
}()
if fi.closed {
panic("attempted to close file descriptor twice!")
}
if fi.hasChanges {
err := fi.mod.Sync()
if err != nil {
return err
}
fi.hasChanges = false
// explicitly stay locked for flushUp call,
// it will manage the lock for us
return fi.flushUp(fi.sync)
}
return nil
}
func (fi *fileDescriptor) Sync() error {
return fi.flushUp(false)
}
func (fi *fileDescriptor) Flush() error {
return fi.flushUp(true)
}
// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
func (fi *fileDescriptor) flushUp(fullsync bool) error {
nd, err := fi.mod.GetNode()
if err != nil {
return err
}
err = fi.inode.dserv.Add(context.TODO(), nd)
if err != nil {
return err
}
fi.inode.nodelk.Lock()
fi.inode.node = nd
name := fi.inode.name
parent := fi.inode.parent
fi.inode.nodelk.Unlock()
return parent.closeChild(name, nd, fullsync)
}
// Seek implements io.Seeker
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
return fi.mod.Seek(offset, whence)
}
// Write At writes the given bytes at the offset 'at'
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
}
fi.hasChanges = true
return fi.mod.WriteAt(b, at)
}