Skip to content

Commit

Permalink
lib/model: Add support for different puller block ordering (syncthing…
Browse files Browse the repository at this point in the history
…#6587)

* WIP

* Tests

* Header and format

* WIP

* Fix tests

* Goland you disappoint me

* Remove CC storage

* Update blockpullreorderer.go
  • Loading branch information
AudriusButkevicius authored May 11, 2020
1 parent decb967 commit 6201eeb
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 11 deletions.
46 changes: 46 additions & 0 deletions lib/config/blockpullorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

package config

type BlockPullOrder int

const (
BlockPullOrderStandard BlockPullOrder = iota // default is standard
BlockPullOrderRandom
BlockPullOrderInOrder
)

func (o BlockPullOrder) String() string {
switch o {
case BlockPullOrderStandard:
return "standard"
case BlockPullOrderRandom:
return "random"
case BlockPullOrderInOrder:
return "inOrder"
default:
return "unknown"
}
}

func (o BlockPullOrder) MarshalText() ([]byte, error) {
return []byte(o.String()), nil
}

func (o *BlockPullOrder) UnmarshalText(bs []byte) error {
switch string(bs) {
case "standard":
*o = BlockPullOrderStandard
case "random":
*o = BlockPullOrderRandom
case "inOrder":
*o = BlockPullOrderInOrder
default:
*o = BlockPullOrderStandard
}
return nil
}
1 change: 1 addition & 0 deletions lib/config/folderconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type FolderConfiguration struct {
RawModTimeWindowS int `xml:"modTimeWindowS" json:"modTimeWindowS"`
MaxConcurrentWrites int `xml:"maxConcurrentWrites" json:"maxConcurrentWrites" default:"2"`
DisableFsync bool `xml:"disableFsync" json:"disableFsync"`
BlockPullOrder BlockPullOrder `xml:"blockPullOrder" json:"blockPullOrder"`

cachedFilesystem fs.Filesystem
cachedModTimeWindow time.Duration
Expand Down
125 changes: 125 additions & 0 deletions lib/model/blockpullreorderer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

package model

import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand"
"sort"
)

type blockPullReorderer interface {
Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo
}

func newBlockPullReorderer(order config.BlockPullOrder, id protocol.DeviceID, otherDevices []protocol.DeviceID) blockPullReorderer {
switch order {
case config.BlockPullOrderRandom:
return randomOrderBlockPullReorderer{}
case config.BlockPullOrderInOrder:
return inOrderBlockPullReorderer{}
case config.BlockPullOrderStandard:
fallthrough
default:
return newStandardBlockPullReorderer(id, otherDevices)
}
}

type inOrderBlockPullReorderer struct{}

func (inOrderBlockPullReorderer) Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo {
return blocks
}

type randomOrderBlockPullReorderer struct{}

func (randomOrderBlockPullReorderer) Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo {
rand.Shuffle(blocks)
return blocks
}

type standardBlockPullReorderer struct {
myIndex int
count int
shuffle func(interface{}) // Used for test
}

func newStandardBlockPullReorderer(id protocol.DeviceID, otherDevices []protocol.DeviceID) *standardBlockPullReorderer {
allDevices := append(otherDevices, id)
sort.Slice(allDevices, func(i, j int) bool {
return allDevices[i].Compare(allDevices[j]) == -1
})
// Find our index
myIndex := -1
for i, dev := range allDevices {
if dev == id {
myIndex = i
break
}
}
if myIndex < 0 {
panic("bug: could not find my own index")
}
return &standardBlockPullReorderer{
myIndex: myIndex,
count: len(allDevices),
shuffle: rand.Shuffle,
}
}

func (p *standardBlockPullReorderer) Reorder(blocks []protocol.BlockInfo) []protocol.BlockInfo {
if len(blocks) == 0 {
return blocks
}

// Split the blocks into len(allDevices) chunks. Chunk count might be less than device count, if there are more
// devices than blocks.
chunks := chunk(blocks, p.count)

newBlocks := make([]protocol.BlockInfo, 0, len(blocks))

// First add our own chunk. We might fall off the list if there are more devices than chunks...
if p.myIndex < len(chunks) {
newBlocks = append(newBlocks, chunks[p.myIndex]...)
}

// The rest of the chunks we fetch in a random order in whole chunks.
// Generate chunk index slice and shuffle it
indexes := make([]int, 0, len(chunks)-1)
for i := 0; i < len(chunks); i++ {
if i != p.myIndex {
indexes = append(indexes, i)
}
}

p.shuffle(indexes)

// Append the chunks in the order of the index slices.
for _, idx := range indexes {
newBlocks = append(newBlocks, chunks[idx]...)
}

return newBlocks
}

func chunk(blocks []protocol.BlockInfo, partCount int) [][]protocol.BlockInfo {
if partCount == 0 {
return [][]protocol.BlockInfo{blocks}
}
count := len(blocks)
chunkSize := (count + partCount - 1) / partCount
parts := make([][]protocol.BlockInfo, 0, partCount)
for i := 0; i < count; i += chunkSize {
end := i + chunkSize
if end > count {
end = count
}
parts = append(parts, blocks[i:end])
}
return parts
}
105 changes: 105 additions & 0 deletions lib/model/blockpullreorderer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

package model

import (
"github.com/syncthing/syncthing/lib/protocol"
"reflect"
"sort"
"testing"
)

var (
someBlocks = []protocol.BlockInfo{{Offset: 1}, {Offset: 2}, {Offset: 3}}
)

func Test_chunk(t *testing.T) {
type args struct {
blocks []protocol.BlockInfo
partCount int
}
tests := []struct {
name string
args args
want [][]protocol.BlockInfo
}{
{"one", args{someBlocks, 1}, [][]protocol.BlockInfo{someBlocks}},
{"two", args{someBlocks, 2}, [][]protocol.BlockInfo{someBlocks[:2], someBlocks[2:]}},
{"three", args{someBlocks, 3}, [][]protocol.BlockInfo{someBlocks[:1], someBlocks[1:2], someBlocks[2:]}},
{"four", args{someBlocks, 4}, [][]protocol.BlockInfo{someBlocks[:1], someBlocks[1:2], someBlocks[2:]}},
// Never happens as myIdx would be -1, so we'd return in order.
{"zero", args{someBlocks, 0}, [][]protocol.BlockInfo{someBlocks}},
{"empty-one", args{nil, 1}, [][]protocol.BlockInfo{}},
{"empty-zero", args{nil, 0}, [][]protocol.BlockInfo{nil}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := chunk(tt.args.blocks, tt.args.partCount); !reflect.DeepEqual(got, tt.want) {
t.Errorf("chunk() = %v, want %v", got, tt.want)
}
})
}
}

func Test_inOrderBlockPullReorderer_Reorder(t *testing.T) {
type args struct {
blocks []protocol.BlockInfo
}
tests := []struct {
name string
blocks []protocol.BlockInfo
want []protocol.BlockInfo
}{
{"basic", someBlocks, someBlocks},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
in := inOrderBlockPullReorderer{}
if got := in.Reorder(tt.blocks); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Reorder() = %v, want %v", got, tt.want)
}
})
}
}

func Test_standardBlockPullReorderer_Reorder(t *testing.T) {
// Order the devices, so we know their ordering ahead of time.
devices := []protocol.DeviceID{myID, device1, device2}
sort.Slice(devices, func(i, j int) bool {
return devices[i].Compare(devices[j]) == -1
})

blocks := func(i ...int) []protocol.BlockInfo {
b := make([]protocol.BlockInfo, 0, len(i))
for _, v := range i {
b = append(b, protocol.BlockInfo{Offset: int64(v)})
}
return b
}
tests := []struct {
name string
myId protocol.DeviceID
devices []protocol.DeviceID
blocks []protocol.BlockInfo
want []protocol.BlockInfo
}{
{"front", devices[0], []protocol.DeviceID{devices[1], devices[2]}, blocks(1, 2, 3), blocks(1, 2, 3)},
{"back", devices[2], []protocol.DeviceID{devices[0], devices[1]}, blocks(1, 2, 3), blocks(3, 1, 2)},
{"few-blocks", devices[2], []protocol.DeviceID{devices[0], devices[1]}, blocks(1), blocks(1)},
{"more-than-one-block", devices[1], []protocol.DeviceID{devices[0]}, blocks(1, 2, 3, 4), blocks(3, 4, 1, 2)},
{"empty-blocks", devices[0], []protocol.DeviceID{devices[1]}, blocks(), blocks()},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := newStandardBlockPullReorderer(tt.myId, tt.devices)
p.shuffle = func(i interface{}) {} // Noop shuffle
if got := p.Reorder(tt.blocks); !reflect.DeepEqual(got, tt.want) {
t.Errorf("reorderBlocksForDevices() = %v, want %v (my idx: %d, count %d)", got, tt.want, p.myIndex, p.count)
}
})
}
}
23 changes: 12 additions & 11 deletions lib/model/folder_sendrecv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/sync"
Expand Down Expand Up @@ -104,8 +103,9 @@ type sendReceiveFolder struct {
fs fs.Filesystem
versioner versioner.Versioner

queue *jobQueue
writeLimiter *byteSemaphore
queue *jobQueue
blockPullReorderer blockPullReorderer
writeLimiter *byteSemaphore

pullErrors map[string]string // errors for most recent/current iteration
oldPullErrors map[string]string // errors from previous iterations for log filtering only
Expand All @@ -114,12 +114,13 @@ type sendReceiveFolder struct {

func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service {
f := &sendReceiveFolder{
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
fs: fs,
versioner: ver,
queue: newJobQueue(),
writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites),
pullErrorsMut: sync.NewMutex(),
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter),
fs: fs,
versioner: ver,
queue: newJobQueue(),
blockPullReorderer: newBlockPullReorderer(cfg.BlockPullOrder, model.id, cfg.DeviceIDs()),
writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites),
pullErrorsMut: sync.NewMutex(),
}
f.folder.puller = f
f.folder.Service = util.AsService(f.serve, f.String())
Expand Down Expand Up @@ -1071,8 +1072,8 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot
blocks = append(blocks, file.Blocks...)
}

// Shuffle the blocks
rand.Shuffle(blocks)
// Reorder blocks
blocks = f.blockPullReorderer.Reorder(blocks)

f.evLogger.Log(events.ItemStarted, map[string]string{
"folder": f.folderID,
Expand Down

0 comments on commit 6201eeb

Please sign in to comment.