Skip to content

Commit

Permalink
Adding support for parallel file uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
ashikjm committed Jul 8, 2020
1 parent d530038 commit 07455bf
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func doUpload(cmd *cobra.Command, args []string) {
store := createStore(config.S3)
remoteIndex := readRemoteIndex(config, store)
localIndex := createLocalIndex()
err := s3backup.UploadDifferences(localIndex, remoteIndex, 10, store, getFile)
err := s3backup.UploadDifferences(localIndex, remoteIndex, 5, store, getFile)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/text v0.3.2 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.2.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
73 changes: 52 additions & 21 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"

"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -208,11 +209,33 @@ type IndexStore interface {
Save(key string, data io.Reader) error
}

// Limiter object for limiting concurrency of go routines
type Limiter struct {
done chan bool
jobs chan int
}

//ParallelLimiter helps you create a channel used to limit parallel file uploads
func ParallelLimiter(ParallelLimit int) *Limiter {
limiter := &Limiter{
done: make(chan bool),
jobs: make(chan int, ParallelLimit),
}

for i := 0; i < ParallelLimit; i++ {
limiter.jobs <- i
}

return limiter
}

// UploadDifferences will upload the files that are missing from the remote index
func UploadDifferences(localIndex, remoteIndex *Index, interval int, store IndexStore, getFile FileGetter) error {
func UploadDifferences(localIndex, remoteIndex *Index, ParallelLimit int, store IndexStore, getFile FileGetter) error {
diff := localIndex.Diff(remoteIndex)
toUpload := CopyIndex(remoteIndex)
count := 0
routineGroup := new(errgroup.Group)
limiter := ParallelLimiter(ParallelLimit)
totalFiles := len(diff.Files)

uploadIndex := func() error {
r, _ := toUpload.Encode()
Expand All @@ -224,32 +247,40 @@ func UploadDifferences(localIndex, remoteIndex *Index, interval int, store Index
return nil
}

for p, srcFile := range diff.Files {
r := getFile(p)
defer func() {
_ = r.Close()
}()

doLog("Uploading %s as %s\n", p, srcFile.Key)
err := store.Save(srcFile.Key, r)
if err != nil {
return err
go func() {
for i := 0; i < totalFiles; i++ {
<-limiter.done
limiter.jobs <- i
}
}()

count++
toUpload.Add(p, srcFile)
if count == interval {
err := uploadIndex()
for p, srcFile := range diff.Files {
p, srcFile := p, srcFile // https://golang.org/doc/faq#closures_and_goroutines

<-limiter.jobs
routineGroup.Go(func() error {
r := getFile(p)
defer func() {
_ = r.Close()
limiter.done <- true
}()

doLog("Uploading %s as %s\n", p, srcFile.Key)
err := store.Save(srcFile.Key, r)
if err != nil {
return err
}

count = 0
}
toUpload.Add(p, srcFile)
return nil
})
}

err := uploadIndex()
if err != nil {
if err := routineGroup.Wait(); err == nil {
err := uploadIndex()
if err != nil {
return err
}
} else {
return err
}

Expand Down
11 changes: 2 additions & 9 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ func TestUploadDifferences(t *testing.T) {
"2": Sourcefile{Key: "b", Hash: "123"},
"3": Sourcefile{Key: "c", Hash: "123"},
"4": Sourcefile{Key: "d", Hash: "123"},
// index
"5": Sourcefile{Key: "e", Hash: "123"},
"6": Sourcefile{Key: "f", Hash: "123"},
"7": Sourcefile{Key: "g", Hash: "123"},
"8": Sourcefile{Key: "h", Hash: "123"},
// index
"9": Sourcefile{Key: "i", Hash: "123"},
// index
},
Expand All @@ -178,12 +176,8 @@ func TestUploadDifferences(t *testing.T) {
}
err := UploadDifferences(index, &Index{}, 4, mock, getter)

assert.Equal(t, 12, len(mock.Keys))
assert.Equal(t, ".index.yaml", mock.Keys[4])
assert.Equal(t, 10, len(mock.Keys))
assert.Equal(t, ".index.yaml", mock.Keys[9])
assert.True(t, len(mock.Values[4]) < len(mock.Values[9]))
assert.Equal(t, ".index.yaml", mock.Keys[11])
assert.True(t, len(mock.Values[9]) < len(mock.Values[11]))
assert.NoError(t, err)
}

Expand Down Expand Up @@ -243,8 +237,7 @@ func TestUploadDifferences_IndexSaveFails(t *testing.T) {
Keys: []string{},
FailAfter: 9,
}
err := UploadDifferences(index, &Index{}, 12, mock, getter)

err := UploadDifferences(index, &Index{}, 4, mock, getter)
assert.Equal(t, 9, len(mock.Keys))
assert.NotContains(t, ".index.yaml", mock.Keys)
assert.Error(t, err)
Expand Down

0 comments on commit 07455bf

Please sign in to comment.