Skip to content

Commit

Permalink
dragonboat: renamed pb.SnapshotChunk to pb.Chunk
Browse files Browse the repository at this point in the history
lni committed Feb 21, 2020
1 parent 5561bba commit 0987daa
Showing 19 changed files with 215 additions and 126 deletions.
12 changes: 6 additions & 6 deletions benchmark_test.go
Original file line number Diff line number Diff line change
@@ -618,12 +618,12 @@ func BenchmarkStateMachineStep1024(b *testing.B) {

type noopSink struct{}

func (n *noopSink) Receive(pb.SnapshotChunk) (bool, bool) { return true, false }
func (n *noopSink) Stop() {}
func (n *noopSink) ClusterID() uint64 { return 1 }
func (n *noopSink) ToNodeID() uint64 { return 1 }
func (n *noopSink) Receive(pb.Chunk) (bool, bool) { return true, false }
func (n *noopSink) Stop() {}
func (n *noopSink) ClusterID() uint64 { return 1 }
func (n *noopSink) ToNodeID() uint64 { return 1 }

func BenchmarkSnapshotChunkWriter(b *testing.B) {
func BenchmarkChunkWriter(b *testing.B) {
sink := &noopSink{}
meta := &rsm.SSMeta{}
cw := rsm.NewChunkWriter(sink, meta)
@@ -640,7 +640,7 @@ func BenchmarkSnapshotChunkWriter(b *testing.B) {
}
}

func BenchmarkSnappyCompressedSnapshotChunkWriter(b *testing.B) {
func BenchmarkSnappyCompressedChunkWriter(b *testing.B) {
sink := &noopSink{}
meta := &rsm.SSMeta{}
cw := rsm.NewChunkWriter(sink, meta)
8 changes: 4 additions & 4 deletions internal/rsm/chunkwriter.go
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ func (cw *ChunkWriter) onNewBlock(data []byte, crc []byte) error {
return cw.onNewChunk(chunk)
}

func (cw *ChunkWriter) onNewChunk(chunk pb.SnapshotChunk) error {
func (cw *ChunkWriter) onNewChunk(chunk pb.Chunk) error {
sent, stopped := cw.sink.Receive(chunk)
if stopped {
cw.stopped = true
@@ -138,8 +138,8 @@ func (cw *ChunkWriter) getHeader() []byte {
return result
}

func (cw *ChunkWriter) getChunk() pb.SnapshotChunk {
return pb.SnapshotChunk{
func (cw *ChunkWriter) getChunk() pb.Chunk {
return pb.Chunk{
ClusterId: cw.sink.ClusterID(),
NodeId: cw.sink.ToNodeID(),
From: cw.meta.From,
@@ -154,7 +154,7 @@ func (cw *ChunkWriter) getChunk() pb.SnapshotChunk {
}
}

func (cw *ChunkWriter) getTailChunk() pb.SnapshotChunk {
func (cw *ChunkWriter) getTailChunk() pb.Chunk {
tailChunk := cw.getChunk()
tailChunk.ChunkCount = pb.LastChunkCount
tailChunk.FileChunkCount = pb.LastChunkCount
6 changes: 3 additions & 3 deletions internal/rsm/chunkwriter_test.go
Original file line number Diff line number Diff line change
@@ -24,12 +24,12 @@ import (
)

type testSink struct {
chunks []pb.SnapshotChunk
chunks []pb.Chunk
sendFailed bool
stopped bool
}

func (s *testSink) Receive(chunk pb.SnapshotChunk) (bool, bool) {
func (s *testSink) Receive(chunk pb.Chunk) (bool, bool) {
if s.sendFailed || s.stopped {
return !s.sendFailed, s.stopped
}
@@ -38,7 +38,7 @@ func (s *testSink) Receive(chunk pb.SnapshotChunk) (bool, bool) {
}

func (s *testSink) Stop() {
s.Receive(pb.SnapshotChunk{ChunkCount: pb.PoisonChunkCount})
s.Receive(pb.Chunk{ChunkCount: pb.PoisonChunkCount})
}

func (s *testSink) ClusterID() uint64 {
2 changes: 1 addition & 1 deletion internal/rsm/statemachine_test.go
Original file line number Diff line number Diff line change
@@ -2195,7 +2195,7 @@ func TestStreamSnapshot(t *testing.T) {
}
sm.members.members.Addresses[1] = "a1"
ts := &testSink{
chunks: make([]pb.SnapshotChunk, 0),
chunks: make([]pb.Chunk, 0),
}
if err := sm.StreamSnapshot(ts); err != nil {
t.Errorf("stream snapshot failed %v", err)
24 changes: 12 additions & 12 deletions internal/transport/chunks.go
Original file line number Diff line number Diff line change
@@ -39,12 +39,12 @@ var (
maxConcurrentSlot = settings.Soft.MaxConcurrentStreamingSnapshot
)

func chunkKey(c pb.SnapshotChunk) string {
func chunkKey(c pb.Chunk) string {
return fmt.Sprintf("%d:%d:%d", c.ClusterId, c.NodeId, c.Index)
}

type tracked struct {
firstChunk pb.SnapshotChunk
firstChunk pb.Chunk
extraFiles []*pb.SnapshotFile
validator *rsm.SnapshotValidator
nextChunk uint64
@@ -98,7 +98,7 @@ func NewChunks(onReceive func(pb.MessageBatch),
}

// AddChunk adds an received trunk to chunks.
func (c *Chunks) AddChunk(chunk pb.SnapshotChunk) bool {
func (c *Chunks) AddChunk(chunk pb.Chunk) bool {
did := c.getDeploymentID()
if chunk.DeploymentId != did ||
chunk.BinVer != raftio.RPCBinVersion {
@@ -190,7 +190,7 @@ func (c *Chunks) full() bool {
return uint64(len(c.tracked)) >= maxConcurrentSlot
}

func (c *Chunks) record(chunk pb.SnapshotChunk) *tracked {
func (c *Chunks) record(chunk pb.Chunk) *tracked {
c.mu.Lock()
defer c.mu.Unlock()
key := chunkKey(chunk)
@@ -246,11 +246,11 @@ func (c *Chunks) record(chunk pb.SnapshotChunk) *tracked {
return td
}

func (c *Chunks) shouldValidate(chunk pb.SnapshotChunk) bool {
func (c *Chunks) shouldValidate(chunk pb.Chunk) bool {
return c.validate && !chunk.HasFileInfo && chunk.ChunkId != 0
}

func (c *Chunks) addLocked(chunk pb.SnapshotChunk) bool {
func (c *Chunks) addLocked(chunk pb.Chunk) bool {
key := chunkKey(chunk)
td := c.record(chunk)
if td == nil {
@@ -303,13 +303,13 @@ func (c *Chunks) addLocked(chunk pb.SnapshotChunk) bool {
return true
}

func (c *Chunks) nodeRemoved(chunk pb.SnapshotChunk) (bool, error) {
func (c *Chunks) nodeRemoved(chunk pb.Chunk) (bool, error) {
env := c.getSSEnv(chunk)
dir := env.GetRootDir()
return fileutil.IsDirMarkedAsDeleted(dir, c.fs)
}

func (c *Chunks) save(chunk pb.SnapshotChunk) (err error) {
func (c *Chunks) save(chunk pb.Chunk) (err error) {
env := c.getSSEnv(chunk)
if chunk.ChunkId == 0 {
if err := env.CreateTempDir(); err != nil {
@@ -347,12 +347,12 @@ func (c *Chunks) save(chunk pb.SnapshotChunk) (err error) {
return nil
}

func (c *Chunks) getSSEnv(chunk pb.SnapshotChunk) *server.SSEnv {
func (c *Chunks) getSSEnv(chunk pb.Chunk) *server.SSEnv {
return server.NewSSEnv(c.folder, chunk.ClusterId, chunk.NodeId,
chunk.Index, chunk.From, server.ReceivingMode, c.fs)
}

func (c *Chunks) finalize(chunk pb.SnapshotChunk, td *tracked) error {
func (c *Chunks) finalize(chunk pb.Chunk, td *tracked) error {
env := c.getSSEnv(chunk)
msg := c.toMessage(td.firstChunk, td.extraFiles)
if len(msg.Requests) != 1 || msg.Requests[0].Type != pb.InstallSnapshot {
@@ -366,12 +366,12 @@ func (c *Chunks) finalize(chunk pb.SnapshotChunk, td *tracked) error {
return err
}

func (c *Chunks) removeTempDir(chunk pb.SnapshotChunk) {
func (c *Chunks) removeTempDir(chunk pb.Chunk) {
env := c.getSSEnv(chunk)
env.MustRemoveTempDir()
}

func (c *Chunks) toMessage(chunk pb.SnapshotChunk,
func (c *Chunks) toMessage(chunk pb.Chunk,
files []*pb.SnapshotFile) pb.MessageBatch {
if chunk.ChunkId != 0 {
panic("not first chunk")
18 changes: 9 additions & 9 deletions internal/transport/chunks_test.go
Original file line number Diff line number Diff line change
@@ -32,10 +32,10 @@ const (
testDeploymentID uint64 = 0
)

func getTestChunks() []pb.SnapshotChunk {
result := make([]pb.SnapshotChunk, 0)
func getTestChunks() []pb.Chunk {
result := make([]pb.Chunk, 0)
for chunkID := uint64(0); chunkID < 10; chunkID++ {
c := pb.SnapshotChunk{
c := pb.Chunk{
BinVer: raftio.RPCBinVersion,
ClusterId: 100,
NodeId: 2,
@@ -58,7 +58,7 @@ func getTestChunks() []pb.SnapshotChunk {
return result
}

func hasSnapshotTempFile(cs *Chunks, c pb.SnapshotChunk) bool {
func hasSnapshotTempFile(cs *Chunks, c pb.Chunk) bool {
env := cs.getSSEnv(c)
fp := env.GetTempFilepath()
if _, err := cs.fs.Stat(fp); vfs.IsNotExist(err) {
@@ -68,7 +68,7 @@ func hasSnapshotTempFile(cs *Chunks, c pb.SnapshotChunk) bool {
}

func hasExternalFile(cs *Chunks,
c pb.SnapshotChunk, fn string, sz uint64, ifs vfs.IFS) bool {
c pb.Chunk, fn string, sz uint64, ifs vfs.IFS) bool {
env := cs.getSSEnv(c)
efp := ifs.PathJoin(env.GetFinalDir(), fn)
fs, err := cs.fs.Stat(efp)
@@ -233,7 +233,7 @@ func TestShouldUpdateValidator(t *testing.T) {
}
for idx, tt := range tests {
c := &Chunks{validate: tt.validate}
input := pb.SnapshotChunk{ChunkId: tt.chunkID, HasFileInfo: tt.hasFileInfo}
input := pb.Chunk{ChunkId: tt.chunkID, HasFileInfo: tt.hasFileInfo}
if result := c.shouldValidate(input); result != tt.result {
t.Errorf("%d, result %t, want %t", idx, result, tt.result)
}
@@ -353,7 +353,7 @@ func TestChunksAreIgnoredWhenNodeIsRemoved(t *testing.T) {
}

// when there is no flag file
func TestOutOfDateSnapshotChunksCanBeHandled(t *testing.T) {
func TestOutOfDateChunksCanBeHandled(t *testing.T) {
fn := func(t *testing.T, chunks *Chunks, handler *testMessageHandler) {
inputs := getTestChunks()
env := chunks.getSSEnv(inputs[0])
@@ -421,7 +421,7 @@ func TestSignificantlyDelayedNonFirstChunksAreIgnored(t *testing.T) {
}

func checkTestSnapshotFile(chunks *Chunks,
chunk pb.SnapshotChunk, size uint64) bool {
chunk pb.Chunk, size uint64) bool {
env := chunks.getSSEnv(chunk)
finalFp := env.GetFilepath()
f, err := chunks.fs.Open(finalFp)
@@ -731,7 +731,7 @@ func TestGetMessageFromChunk(t *testing.T) {
Metadata: make([]byte, 32),
}
files := []*pb.SnapshotFile{sf1, sf2}
chunk := pb.SnapshotChunk{
chunk := pb.Chunk{
ClusterId: 123,
NodeId: 3,
From: 2,
24 changes: 12 additions & 12 deletions internal/transport/job.go
Original file line number Diff line number Diff line change
@@ -43,13 +43,13 @@ type Sink struct {
}

// Receive receives a snapshot chunk.
func (s *Sink) Receive(chunk pb.SnapshotChunk) (bool, bool) {
func (s *Sink) Receive(chunk pb.Chunk) (bool, bool) {
return s.j.SendChunk(chunk)
}

// Stop stops the sink processing.
func (s *Sink) Stop() {
s.Receive(pb.SnapshotChunk{ChunkCount: pb.PoisonChunkCount})
s.Receive(pb.Chunk{ChunkCount: pb.PoisonChunkCount})
}

// ClusterID returns the cluster ID of the source node.
@@ -71,7 +71,7 @@ type job struct {
ctx context.Context
rpc raftio.IRaftRPC
conn raftio.ISnapshotConnection
ch chan pb.SnapshotChunk
ch chan pb.Chunk
stopc chan struct{}
failed chan struct{}
streamChunkSent atomic.Value
@@ -100,7 +100,7 @@ func newJob(ctx context.Context,
} else {
chsz = sz
}
j.ch = make(chan pb.SnapshotChunk, chsz)
j.ch = make(chan pb.Chunk, chsz)
return j
}

@@ -130,7 +130,7 @@ func (j *job) sendSavedSnapshot(m pb.Message) {
}
}

func (j *job) SendChunk(chunk pb.SnapshotChunk) (bool, bool) {
func (j *job) SendChunk(chunk pb.Chunk) (bool, bool) {
if !chunk.IsPoisonChunk() {
plog.Infof("node %d is sending chunk %d to %s",
chunk.From, chunk.ChunkId, dn(chunk.ClusterId, chunk.NodeId))
@@ -189,7 +189,7 @@ func (j *job) streamSnapshot() error {
}

func (j *job) processSavedSnapshot() error {
chunks := make([]pb.SnapshotChunk, 0)
chunks := make([]pb.Chunk, 0)
for {
select {
case <-j.stopc:
@@ -206,12 +206,12 @@ func (j *job) processSavedSnapshot() error {
}
}

func (j *job) sendSavedChunks(chunks []pb.SnapshotChunk) error {
func (j *job) sendSavedChunks(chunks []pb.Chunk) error {
for _, chunk := range chunks {
chunkData := make([]byte, snapshotChunkSize)
chunk.DeploymentId = j.deploymentID
if !chunk.Witness {
data, err := loadSnapshotChunkData(chunk, chunkData, j.fs)
data, err := loadChunkData(chunk, chunkData, j.fs)
if err != nil {
plog.Errorf("failed to read the snapshot chunk, %v", err)
return err
@@ -223,13 +223,13 @@ func (j *job) sendSavedChunks(chunks []pb.SnapshotChunk) error {
return err
}
if v := j.streamChunkSent.Load(); v != nil {
v.(func(pb.SnapshotChunk))(chunk)
v.(func(pb.Chunk))(chunk)
}
}
return nil
}

func (j *job) sendChunk(c pb.SnapshotChunk,
func (j *job) sendChunk(c pb.Chunk,
conn raftio.ISnapshotConnection) error {
if v := j.preStreamChunkSend.Load(); v != nil {
plog.Infof("pre stream chunk send set")
@@ -239,7 +239,7 @@ func (j *job) sendChunk(c pb.SnapshotChunk,
plog.Infof("not sending the chunk!")
return errChunkSendSkipped
}
return conn.SendSnapshotChunk(updated)
return conn.SendChunk(updated)
}
return conn.SendSnapshotChunk(c)
return conn.SendChunk(c)
}
6 changes: 3 additions & 3 deletions internal/transport/job_test.go
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ func TestKeepSendingChunksUsingFailedJobWillNotBlock(t *testing.T) {
t.Fatalf("failed to get noopConn")
}
noopConn.req.SetToFail(true)
sent, stopped := c.SendChunk(pb.SnapshotChunk{})
sent, stopped := c.SendChunk(pb.Chunk{})
if !sent {
t.Fatalf("failed to send")
}
@@ -97,7 +97,7 @@ func TestKeepSendingChunksUsingFailedJobWillNotBlock(t *testing.T) {
t.Fatalf("error didn't return from process()")
}
for i := 0; i < streamingChanLength*10; i++ {
c.SendChunk(pb.SnapshotChunk{})
c.SendChunk(pb.Chunk{})
}
select {
case <-c.failed:
@@ -120,7 +120,7 @@ func testSpecialChunkCanStopTheProcessLoop(t *testing.T,
stopper.RunWorker(func() {
perr = c.process()
})
poison := pb.SnapshotChunk{
poison := pb.Chunk{
ChunkCount: tt,
}
sent, stopped := c.SendChunk(poison)
4 changes: 2 additions & 2 deletions internal/transport/noop.go
Original file line number Diff line number Diff line change
@@ -111,8 +111,8 @@ type NOOPSnapshotConnection struct {
func (c *NOOPSnapshotConnection) Close() {
}

// SendSnapshotChunk returns ErrRequestedToFail when requested.
func (c *NOOPSnapshotConnection) SendSnapshotChunk(chunk raftpb.SnapshotChunk) error {
// SendChunk returns ErrRequestedToFail when requested.
func (c *NOOPSnapshotConnection) SendChunk(chunk raftpb.Chunk) error {
if c.req.Fail() {
return ErrRequestedToFail
}
Loading

0 comments on commit 0987daa

Please sign in to comment.