forked from cosmos/cosmos-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.go
258 lines (228 loc) · 7.34 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package snapshots
import (
"bytes"
"crypto/sha256"
"io"
"sync"
"github.com/cosmos/cosmos-sdk/snapshots/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
const (
opNone operation = ""
opSnapshot operation = "snapshot"
opPrune operation = "prune"
opRestore operation = "restore"
chunkBufferSize = 4
)
// operation represents a Manager operation. Only one operation can be in progress at a time.
type operation string
// restoreDone represents the result of a restore operation.
type restoreDone struct {
complete bool // if true, restore completed successfully (not prematurely)
err error // if non-nil, restore errored
}
// Manager manages snapshot and restore operations for an app, making sure only a single
// long-running operation is in progress at any given time, and provides convenience methods
// mirroring the ABCI interface.
//
// Although the ABCI interface (and this manager) passes chunks as byte slices, the internal
// snapshot/restore APIs use IO streams (i.e. chan io.ReadCloser), for two reasons:
//
// 1) In the future, ABCI should support streaming. Consider e.g. InitChain during chain
// upgrades, which currently passes the entire chain state as an in-memory byte slice.
// https://github.com/tendermint/tendermint/issues/5184
//
// 2) io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary
// errors via io.Pipe.CloseWithError().
type Manager struct {
store *Store
target types.Snapshotter
mtx sync.Mutex
operation operation
chRestore chan<- io.ReadCloser
chRestoreDone <-chan restoreDone
restoreChunkHashes [][]byte
restoreChunkIndex uint32
}
// NewManager creates a new manager.
func NewManager(store *Store, target types.Snapshotter) *Manager {
return &Manager{
store: store,
target: target,
}
}
// begin starts an operation, or errors if one is in progress. It manages the mutex itself.
func (m *Manager) begin(op operation) error {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.beginLocked(op)
}
// beginLocked begins an operation while already holding the mutex.
func (m *Manager) beginLocked(op operation) error {
if op == opNone {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "can't begin a none operation")
}
if m.operation != opNone {
return sdkerrors.Wrapf(sdkerrors.ErrConflict, "a %v operation is in progress", m.operation)
}
m.operation = op
return nil
}
// end ends the current operation.
func (m *Manager) end() {
m.mtx.Lock()
defer m.mtx.Unlock()
m.endLocked()
}
// endLocked ends the current operation while already holding the mutex.
func (m *Manager) endLocked() {
m.operation = opNone
if m.chRestore != nil {
close(m.chRestore)
m.chRestore = nil
}
m.chRestoreDone = nil
m.restoreChunkHashes = nil
m.restoreChunkIndex = 0
}
// Create creates a snapshot and returns its metadata.
func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
if m == nil {
return nil, sdkerrors.Wrap(sdkerrors.ErrLogic, "no snapshot store configured")
}
err := m.begin(opSnapshot)
if err != nil {
return nil, err
}
defer m.end()
latest, err := m.store.GetLatest()
if err != nil {
return nil, sdkerrors.Wrap(err, "failed to examine latest snapshot")
}
if latest != nil && latest.Height >= height {
return nil, sdkerrors.Wrapf(sdkerrors.ErrConflict,
"a more recent snapshot already exists at height %v", latest.Height)
}
chunks, err := m.target.Snapshot(height, types.CurrentFormat)
if err != nil {
return nil, err
}
return m.store.Save(height, types.CurrentFormat, chunks)
}
// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
func (m *Manager) List() ([]*types.Snapshot, error) {
return m.store.List()
}
// LoadChunk loads a chunk into a byte slice, mirroring ABCI LoadChunk. It can be called
// concurrently with other operations. If the chunk does not exist, nil is returned.
func (m *Manager) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
reader, err := m.store.LoadChunk(height, format, chunk)
if err != nil {
return nil, err
}
if reader == nil {
return nil, nil
}
defer reader.Close()
return io.ReadAll(reader)
}
// Prune prunes snapshots, if no other operations are in progress.
func (m *Manager) Prune(retain uint32) (uint64, error) {
err := m.begin(opPrune)
if err != nil {
return 0, err
}
defer m.end()
return m.store.Prune(retain)
}
// Restore begins an async snapshot restoration, mirroring ABCI OfferSnapshot. Chunks must be fed
// via RestoreChunk() until the restore is complete or a chunk fails.
func (m *Manager) Restore(snapshot types.Snapshot) error {
if snapshot.Chunks == 0 {
return sdkerrors.Wrap(types.ErrInvalidMetadata, "no chunks")
}
if uint32(len(snapshot.Metadata.ChunkHashes)) != snapshot.Chunks {
return sdkerrors.Wrapf(types.ErrInvalidMetadata, "snapshot has %v chunk hashes, but %v chunks",
uint32(len(snapshot.Metadata.ChunkHashes)),
snapshot.Chunks)
}
m.mtx.Lock()
defer m.mtx.Unlock()
err := m.beginLocked(opRestore)
if err != nil {
return err
}
// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chReady := make(chan struct{}, 1)
chDone := make(chan restoreDone, 1)
go func() {
err := m.target.Restore(snapshot.Height, snapshot.Format, chChunks, chReady)
chDone <- restoreDone{
complete: err == nil,
err: err,
}
close(chDone)
}()
// Check for any initial errors from the restore, before any chunks are fed.
select {
case done := <-chDone:
m.endLocked()
if done.err != nil {
return done.err
}
return sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended unexpectedly")
case <-chReady:
}
m.chRestore = chChunks
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
m.restoreChunkIndex = 0
return nil
}
// RestoreChunk adds a chunk to an active snapshot restoration, mirroring ABCI ApplySnapshotChunk.
// Chunks must be given until the restore is complete, returning true, or a chunk errors.
func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.operation != opRestore {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress")
}
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk")
}
// Check if any errors have occurred yet.
select {
case done := <-m.chRestoreDone:
m.endLocked()
if done.err != nil {
return false, done.err
}
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended unexpectedly")
default:
}
// Verify the chunk hash.
hash := sha256.Sum256(chunk)
expected := m.restoreChunkHashes[m.restoreChunkIndex]
if !bytes.Equal(hash[:], expected) {
return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch,
"expected %x, got %x", hash, expected)
}
// Pass the chunk to the restore, and wait for completion if it was the final one.
m.chRestore <- io.NopCloser(bytes.NewReader(chunk))
m.restoreChunkIndex++
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
close(m.chRestore)
m.chRestore = nil
done := <-m.chRestoreDone
m.endLocked()
if done.err != nil {
return false, done.err
}
if !done.complete {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely")
}
return true, nil
}
return false, nil
}