forked from cosmos/cosmos-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.go
163 lines (147 loc) · 3.26 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package snapshots
import (
"io"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
// ChunkWriter reads an input stream, splits it into fixed-size chunks, and writes them to a
// sequence of io.ReadClosers via a channel.
type ChunkWriter struct {
ch chan<- io.ReadCloser
pipe *io.PipeWriter
chunkSize uint64
written uint64
closed bool
}
// NewChunkWriter creates a new ChunkWriter. If chunkSize is 0, no chunking will be done.
func NewChunkWriter(ch chan<- io.ReadCloser, chunkSize uint64) *ChunkWriter {
return &ChunkWriter{
ch: ch,
chunkSize: chunkSize,
}
}
// chunk creates a new chunk.
func (w *ChunkWriter) chunk() error {
if w.pipe != nil {
err := w.pipe.Close()
if err != nil {
return err
}
}
pr, pw := io.Pipe()
w.ch <- pr
w.pipe = pw
w.written = 0
return nil
}
// Close implements io.Closer.
func (w *ChunkWriter) Close() error {
if !w.closed {
w.closed = true
close(w.ch)
var err error
if w.pipe != nil {
err = w.pipe.Close()
}
return err
}
return nil
}
// CloseWithError closes the writer and sends an error to the reader.
func (w *ChunkWriter) CloseWithError(err error) {
if !w.closed {
w.closed = true
close(w.ch)
if w.pipe != nil {
w.pipe.CloseWithError(err)
}
}
}
// Write implements io.Writer.
func (w *ChunkWriter) Write(data []byte) (int, error) {
if w.closed {
return 0, sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot write to closed ChunkWriter")
}
nTotal := 0
for len(data) > 0 {
if w.pipe == nil || (w.written >= w.chunkSize && w.chunkSize > 0) {
err := w.chunk()
if err != nil {
return nTotal, err
}
}
var writeSize uint64
if w.chunkSize == 0 {
writeSize = uint64(len(data))
} else {
writeSize = w.chunkSize - w.written
}
if writeSize > uint64(len(data)) {
writeSize = uint64(len(data))
}
n, err := w.pipe.Write(data[:writeSize])
w.written += uint64(n)
nTotal += n
if err != nil {
return nTotal, err
}
data = data[writeSize:]
}
return nTotal, nil
}
// ChunkReader reads chunks from a channel of io.ReadClosers and outputs them as an io.Reader
type ChunkReader struct {
ch <-chan io.ReadCloser
reader io.ReadCloser
}
// NewChunkReader creates a new ChunkReader.
func NewChunkReader(ch <-chan io.ReadCloser) *ChunkReader {
return &ChunkReader{ch: ch}
}
// next fetches the next chunk from the channel, or returns io.EOF if there are no more chunks.
func (r *ChunkReader) next() error {
reader, ok := <-r.ch
if !ok {
return io.EOF
}
r.reader = reader
return nil
}
// Close implements io.ReadCloser.
func (r *ChunkReader) Close() error {
var err error
if r.reader != nil {
err = r.reader.Close()
r.reader = nil
}
for reader := range r.ch {
if e := reader.Close(); e != nil && err == nil {
err = e
}
}
return err
}
// Read implements io.Reader.
func (r *ChunkReader) Read(p []byte) (int, error) {
if r.reader == nil {
err := r.next()
if err != nil {
return 0, err
}
}
n, err := r.reader.Read(p)
if err == io.EOF {
err = r.reader.Close()
r.reader = nil
if err != nil {
return 0, err
}
return r.Read(p)
}
return n, err
}
// DrainChunks drains and closes all remaining chunks from a chunk channel.
func DrainChunks(chunks <-chan io.ReadCloser) {
for chunk := range chunks {
_ = chunk.Close()
}
}