Skip to content

Commit

Permalink
fix wal log's snapshots that persists raftpb.ConfState
Browse files Browse the repository at this point in the history
Signed-off-by: CrazyMax <[email protected]>
  • Loading branch information
crazy-max committed Mar 7, 2022
1 parent b9e99e6 commit 4f4bdf9
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 39 deletions.
1 change: 1 addition & 0 deletions cmd/swarm-rafttool/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func decryptRaftData(swarmdir, outdir, unlockKey string) error {
if snap != nil {
walsnap.Index = snap.Metadata.Index
walsnap.Term = snap.Metadata.Term
walsnap.ConfState = &snap.Metadata.ConfState
}

walDir := filepath.Join(outdir, "wal-decrypted")
Expand Down
1 change: 1 addition & 0 deletions cmd/swarm-rafttool/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func writeFakeRaftData(t *testing.T, stateDir string, snapshot *raftpb.Snapshot,

wsn.Index = snapshot.Metadata.Index
wsn.Term = snapshot.Metadata.Term
wsn.ConfState = &snapshot.Metadata.ConfState
}

var entries []raftpb.Entry
Expand Down
1 change: 1 addition & 0 deletions cmd/swarm-rafttool/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func loadData(swarmdir, unlockKey string) (*storage.WALData, *raftpb.Snapshot, e
if snapshot != nil {
walsnap.Index = snapshot.Metadata.Index
walsnap.Term = snapshot.Metadata.Term
walsnap.ConfState = &snapshot.Metadata.ConfState
}

wal, walData, err := storage.ReadRepairWAL(context.Background(), walDir, walsnap, walFactory)
Expand Down
4 changes: 2 additions & 2 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
}
}

return nil, nil
return &api.UpdateTaskStatusResponse{}, nil
}

func (d *Dispatcher) UpdateVolumeStatus(ctx context.Context, r *api.UpdateVolumeStatusRequest) (*api.UpdateVolumeStatusResponse, error) {
Expand Down Expand Up @@ -721,7 +721,7 @@ func (d *Dispatcher) UpdateVolumeStatus(ctx context.Context, r *api.UpdateVolume
d.unpublishedVolumesLock.Unlock()

// we won't kick off a batch here, we'll just wait for the timer.
return nil, nil
return &api.UpdateVolumeStatusResponse{}, nil
}

func (d *Dispatcher) processUpdates(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
}

n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)
n.raftNode = raft.RestartNode(n.Config)

return nil
}
Expand Down
7 changes: 2 additions & 5 deletions manager/state/raft/storage/snapwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.uber.org/zap"
)

// This package wraps the go.etcd.io/etcd/server/v3/api/snap package, and encrypts
Expand Down Expand Up @@ -87,9 +86,8 @@ func NewSnapFactory(encrypter encryption.Encrypter, decrypter encryption.Decrypt

// NewSnapshotter returns a new Snapshotter with the given encrypters and decrypters
func (sc snapCryptor) New(dirpath string) Snapshotter {
lg, _ := zap.NewProduction()
return &wrappedSnap{
Snapshotter: snap.New(lg, dirpath),
Snapshotter: snap.New(nil, dirpath),
encrypter: sc.encrypter,
decrypter: sc.decrypter,
}
Expand All @@ -98,8 +96,7 @@ func (sc snapCryptor) New(dirpath string) Snapshotter {
type originalSnap struct{}

func (o originalSnap) New(dirpath string) Snapshotter {
lg, _ := zap.NewProduction()
return snap.New(lg, dirpath)
return snap.New(nil, dirpath)
}

// OriginalSnap is the original `snap` package as an implementation of the SnapFactory interface
Expand Down
6 changes: 4 additions & 2 deletions manager/state/raft/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (e *EncryptedRaftLogger) BootstrapFromDisk(ctx context.Context, oldEncrypti
if snapshot != nil {
walsnap.Index = snapshot.Metadata.Index
walsnap.Term = snapshot.Metadata.Term
walsnap.ConfState = &snapshot.Metadata.ConfState
}

if !wal.Exist(walDir) {
Expand Down Expand Up @@ -197,8 +198,9 @@ func (e *EncryptedRaftLogger) RotateEncryptionKey(newKey []byte) {
func (e *EncryptedRaftLogger) SaveSnapshot(snapshot raftpb.Snapshot) error {

walsnap := walpb.Snapshot{
Index: snapshot.Metadata.Index,
Term: snapshot.Metadata.Term,
Index: snapshot.Metadata.Index,
Term: snapshot.Metadata.Term,
ConfState: &snapshot.Metadata.ConfState,
}

e.encoderMu.RLock()
Expand Down
14 changes: 8 additions & 6 deletions manager/state/raft/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestBootstrapFromDisk(t *testing.T) {
require.NoError(t, err)

// everything should be saved with "key1"
_, entries, _ := makeWALData(0, 0)
_, entries, _ := makeWALData(0, 0, &confState)
err = logger.SaveEntries(raftpb.HardState{}, entries)
require.NoError(t, err)
logger.Close(context.Background())
Expand All @@ -45,7 +45,7 @@ func TestBootstrapFromDisk(t *testing.T) {
snapshot := fakeSnapshotData
err = logger.SaveSnapshot(snapshot)
require.NoError(t, err)
_, entries, _ = makeWALData(snapshot.Metadata.Index, snapshot.Metadata.Term)
_, entries, _ = makeWALData(snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState)
err = logger.SaveEntries(raftpb.HardState{}, entries)
require.NoError(t, err)
logger.Close(context.Background())
Expand All @@ -62,7 +62,7 @@ func TestBootstrapFromDisk(t *testing.T) {
require.Equal(t, entries, waldata.Entries)

// start writing more wals and rotate in the middle
_, entries, _ = makeWALData(snapshot.Metadata.Index, snapshot.Metadata.Term)
_, entries, _ = makeWALData(snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState)
err = logger.SaveEntries(raftpb.HardState{}, entries[:1])
require.NoError(t, err)
logger.RotateEncryptionKey([]byte("key2"))
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestRaftLoggerRace(t *testing.T) {
err = logger.BootstrapNew([]byte("metadata"))
require.NoError(t, err)

_, entries, _ := makeWALData(fakeSnapshotData.Metadata.Index, fakeSnapshotData.Metadata.Term)
_, entries, _ := makeWALData(fakeSnapshotData.Metadata.Index, fakeSnapshotData.Metadata.Term, &fakeSnapshotData.Metadata.ConfState)

done1 := make(chan error)
done2 := make(chan error)
Expand Down Expand Up @@ -153,10 +153,10 @@ func TestMigrateToV3EncryptedForm(t *testing.T) {
require.NoError(t, os.MkdirAll(snapDir, 0o755))
require.NoError(t, snapFactory.New(snapDir).SaveSnap(snapshot))

_, entries, _ := makeWALData(snapshot.Metadata.Index, snapshot.Metadata.Term)
_, entries, _ := makeWALData(snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState)
walWriter, err := walFactory.Create(walDir, []byte("metadata"))
require.NoError(t, err)
require.NoError(t, walWriter.SaveSnapshot(walpb.Snapshot{Index: snapshot.Metadata.Index, Term: snapshot.Metadata.Term}))
require.NoError(t, walWriter.SaveSnapshot(walpb.Snapshot{Index: snapshot.Metadata.Index, Term: snapshot.Metadata.Term, ConfState: &snapshot.Metadata.ConfState}))
require.NoError(t, walWriter.Save(raftpb.HardState{}, entries))
require.NoError(t, walWriter.Close())
return entries
Expand All @@ -179,9 +179,11 @@ func TestMigrateToV3EncryptedForm(t *testing.T) {
v3Snapshot := fakeSnapshotData
v3Snapshot.Metadata.Index += 100
v3Snapshot.Metadata.Term += 10
v3Snapshot.Metadata.ConfState = confState
v3EncryptedSnapshot := fakeSnapshotData
v3EncryptedSnapshot.Metadata.Index += 200
v3EncryptedSnapshot.Metadata.Term += 20
v3EncryptedSnapshot.Metadata.ConfState = confState

encoder, decoders := encryption.Defaults(dek, false)
walFactory := NewWALFactory(encoder, decoders)
Expand Down
16 changes: 5 additions & 11 deletions manager/state/raft/storage/walwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap"
)

// This package wraps the go.etcd.io/etcd/server/v3/storage/wal package, and encrypts
Expand Down Expand Up @@ -103,8 +102,7 @@ func NewWALFactory(encrypter encryption.Encrypter, decrypter encryption.Decrypte

// Create returns a new WAL object with the given encrypters and decrypters.
func (wc walCryptor) Create(dirpath string, metadata []byte) (WAL, error) {
lg, _ := zap.NewProduction()
w, err := wal.Create(lg, dirpath, metadata)
w, err := wal.Create(nil, dirpath, metadata)
if err != nil {
return nil, err
}
Expand All @@ -117,8 +115,7 @@ func (wc walCryptor) Create(dirpath string, metadata []byte) (WAL, error) {

// Open returns a new WAL object with the given encrypters and decrypters.
func (wc walCryptor) Open(dirpath string, snap walpb.Snapshot) (WAL, error) {
lg, _ := zap.NewProduction()
w, err := wal.Open(lg, dirpath, snap)
w, err := wal.Open(nil, dirpath, snap)
if err != nil {
return nil, err
}
Expand All @@ -132,12 +129,10 @@ func (wc walCryptor) Open(dirpath string, snap walpb.Snapshot) (WAL, error) {
type originalWAL struct{}

func (o originalWAL) Create(dirpath string, metadata []byte) (WAL, error) {
lg, _ := zap.NewProduction()
return wal.Create(lg, dirpath, metadata)
return wal.Create(nil, dirpath, metadata)
}
func (o originalWAL) Open(dirpath string, walsnap walpb.Snapshot) (WAL, error) {
lg, _ := zap.NewProduction()
return wal.Open(lg, dirpath, walsnap)
return wal.Open(nil, dirpath, walsnap)
}

// OriginalWAL is the original `wal` package as an implementation of the WALFactory interface
Expand Down Expand Up @@ -167,7 +162,6 @@ func ReadRepairWAL(
err error
)
repaired := false
lg, _ := zap.NewProduction()
for {
if reader, err = factory.Open(walDir, walsnap); err != nil {
return nil, WALData{}, errors.Wrap(err, "failed to open WAL")
Expand All @@ -183,7 +177,7 @@ func ReadRepairWAL(
if repaired || err != io.ErrUnexpectedEOF {
return nil, WALData{}, errors.Wrap(err, "irreparable WAL error")
}
if !wal.Repair(lg, walDir) {
if !wal.Repair(nil, walDir) {
return nil, WALData{}, errors.Wrap(err, "WAL error cannot be repaired")
}
log.G(ctx).WithError(err).Info("repaired WAL error")
Expand Down
30 changes: 19 additions & 11 deletions manager/state/raft/storage/walwrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ import (

var _ WALFactory = walCryptor{}

var (
confState = raftpb.ConfState{
Voters: []uint64{0x00ffca74},
AutoLeave: false,
}
)

// Generates a bunch of WAL test data
func makeWALData(index uint64, term uint64) ([]byte, []raftpb.Entry, walpb.Snapshot) {
func makeWALData(index uint64, term uint64, state *raftpb.ConfState) ([]byte, []raftpb.Entry, walpb.Snapshot) {
wsn := walpb.Snapshot{
Index: index,
Term: term,
Index: index,
Term: term,
ConfState: state,
}

var entries []raftpb.Entry
Expand Down Expand Up @@ -53,7 +61,7 @@ func createWithWAL(t *testing.T, w WALFactory, metadata []byte, startSnap walpb.

// WAL can read entries are not wrapped, but not encrypted
func TestReadAllWrappedNoEncryption(t *testing.T) {
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)
wrappedEntries := make([]raftpb.Entry, len(entries))
for i, entry := range entries {
r := api.MaybeEncryptedRecord{Data: entry.Data}
Expand Down Expand Up @@ -81,7 +89,7 @@ func TestReadAllWrappedNoEncryption(t *testing.T) {

// When reading WAL, if the decrypter can't read the encryption type, errors
func TestReadAllNoSupportedDecrypter(t *testing.T) {
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)
for i, entry := range entries {
r := api.MaybeEncryptedRecord{Data: entry.Data, Algorithm: api.MaybeEncryptedRecord_Algorithm(-3)}
data, err := r.Marshal()
Expand All @@ -106,7 +114,7 @@ func TestReadAllNoSupportedDecrypter(t *testing.T) {
// entry is incorrectly encryptd, an error is returned
func TestReadAllEntryIncorrectlyEncrypted(t *testing.T) {
crypter := &meowCrypter{}
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)

// metadata is correctly encryptd, but entries are not meow-encryptd
for i, entry := range entries {
Expand All @@ -132,7 +140,7 @@ func TestReadAllEntryIncorrectlyEncrypted(t *testing.T) {
// The entry data and metadata are encryptd with the given encrypter, and a regular
// WAL will see them as such.
func TestSave(t *testing.T) {
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)

crypter := &meowCrypter{}
c := NewWALFactory(crypter, encryption.NoopCrypter)
Expand All @@ -158,7 +166,7 @@ func TestSave(t *testing.T) {

// If encryption fails, saving will fail
func TestSaveEncryptionFails(t *testing.T) {
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)

tempdir, err := os.MkdirTemp("", "waltests")
require.NoError(t, err)
Expand Down Expand Up @@ -207,7 +215,7 @@ func TestCreateOpenInvalidDirFails(t *testing.T) {
// A WAL can read what it wrote so long as it has a corresponding decrypter
func TestSaveAndRead(t *testing.T) {
crypter := &meowCrypter{}
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)

c := NewWALFactory(crypter, crypter)
tempdir := createWithWAL(t, c, metadata, snapshot, entries)
Expand All @@ -224,7 +232,7 @@ func TestSaveAndRead(t *testing.T) {
}

func TestReadRepairWAL(t *testing.T) {
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)
tempdir := createWithWAL(t, OriginalWAL, metadata, snapshot, entries)
defer os.RemoveAll(tempdir)

Expand Down Expand Up @@ -253,7 +261,7 @@ func TestReadRepairWAL(t *testing.T) {
}

func TestMigrateWALs(t *testing.T) {
metadata, entries, snapshot := makeWALData(1, 1)
metadata, entries, snapshot := makeWALData(1, 1, &confState)
coder := &meowCrypter{}
c := NewWALFactory(coder, coder)

Expand Down
2 changes: 1 addition & 1 deletion vendor.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ require (
go.etcd.io/etcd/pkg/v3 v3.5.1
go.etcd.io/etcd/raft/v3 v3.5.1
go.etcd.io/etcd/server/v3 v3.5.1
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
Expand Down Expand Up @@ -124,6 +123,7 @@ require (
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a // indirect
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
Expand Down

0 comments on commit 4f4bdf9

Please sign in to comment.