Skip to content

Commit

Permalink
swarm/storage: pyramid chunker re-write (ethereum#14382)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmozah authored and fjl committed Sep 21, 2017
1 parent 3c86563 commit d558a59
Show file tree
Hide file tree
Showing 12 changed files with 1,010 additions and 235 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ build/_vendor/pkg
# travis
profile.tmp
profile.cov

# IdeaIDE
.idea
4 changes: 2 additions & 2 deletions swarm/network/depo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
// Handler for storage/retrieval related protocol requests
// implements the StorageHandler interface used by the bzz protocol
type Depo struct {
hashfunc storage.Hasher
hashfunc storage.SwarmHasher
localStore storage.ChunkStore
netStore storage.ChunkStore
}

func NewDepo(hash storage.Hasher, localStore, remoteStore storage.ChunkStore) *Depo {
func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo {
return &Depo{
hashfunc: hash,
localStore: localStore,
Expand Down
93 changes: 58 additions & 35 deletions swarm/storage/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"encoding/binary"
"errors"
"fmt"
"hash"
"io"
"sync"
"time"
)

/*
Expand Down Expand Up @@ -50,14 +50,6 @@ data_{i} := size(subtree_{i}) || key_{j} || key_{j+1} .... || key_{j+n-1}
The underlying hash function is configurable
*/

const (
defaultHash = "SHA3"
// defaultHash = "BMTSHA3" // http://golang.org/pkg/hash/#Hash
// defaultHash = "SHA256" // http://golang.org/pkg/hash/#Hash
defaultBranches int64 = 128
// hashSize int64 = hasherfunc.New().Size() // hasher knows about its own length in bytes
// chunksize int64 = branches * hashSize // chunk is defined as this
)

/*
Tree chunker is a concrete implementation of data chunking.
Expand All @@ -67,25 +59,19 @@ If all is well it is possible to implement this by simply composing readers so t
The hashing itself does use extra copies and allocation though, since it does need it.
*/

type ChunkerParams struct {
Branches int64
Hash string
}

func NewChunkerParams() *ChunkerParams {
return &ChunkerParams{
Branches: defaultBranches,
Hash: defaultHash,
}
}
var (
errAppendOppNotSuported = errors.New("Append operation not supported")
errOperationTimedOut = errors.New("operation timed out")
)

type TreeChunker struct {
branches int64
hashFunc Hasher
hashFunc SwarmHasher
// calculated
hashSize int64 // self.hashFunc.New().Size()
chunkSize int64 // hashSize* branches
workerCount int
workerCount int64 // the number of worker routines used
workerLock sync.RWMutex // lock for the worker count
}

func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
Expand All @@ -94,7 +80,8 @@ func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
self.branches = params.Branches
self.hashSize = int64(self.hashFunc().Size())
self.chunkSize = self.hashSize * self.branches
self.workerCount = 1
self.workerCount = 0

return
}

Expand All @@ -114,13 +101,31 @@ type hashJob struct {
parentWg *sync.WaitGroup
}

func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
func (self *TreeChunker) incrementWorkerCount() {
self.workerLock.Lock()
defer self.workerLock.Unlock()
self.workerCount += 1
}

func (self *TreeChunker) getWorkerCount() int64 {
self.workerLock.RLock()
defer self.workerLock.RUnlock()
return self.workerCount
}

func (self *TreeChunker) decrementWorkerCount() {
self.workerLock.Lock()
defer self.workerLock.Unlock()
self.workerCount -= 1
}

func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
if self.chunkSize <= 0 {
panic("chunker must be initialised")
}

jobC := make(chan *hashJob, 2*processors)

jobC := make(chan *hashJob, 2*ChunkProcessors)
wg := &sync.WaitGroup{}
errC := make(chan error)
quitC := make(chan bool)
Expand All @@ -129,6 +134,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
if wwg != nil {
wwg.Add(1)
}

self.incrementWorkerCount()
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)

depth := 0
Expand Down Expand Up @@ -157,17 +164,24 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
close(errC)
}()

//TODO: add a timeout
if err := <-errC; err != nil {
close(quitC)
return nil, err

defer close(quitC)
select {
case err := <-errC:
if err != nil {
return nil, err
}
case <-time.NewTimer(splitTimeout).C:
return nil,errOperationTimedOut
}

return key, nil
}

func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) {

//

for depth > 0 && size < treeSize {
treeSize /= self.branches
depth--
Expand Down Expand Up @@ -223,12 +237,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
// parentWg.Add(1)
// go func() {
childrenWg.Wait()
if len(jobC) > self.workerCount && self.workerCount < processors {

worker := self.getWorkerCount()
if int64(len(jobC)) > worker && worker < ChunkProcessors {
if wwg != nil {
wwg.Add(1)
}
self.workerCount++
self.incrementWorkerCount()
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)

}
select {
case jobC <- &hashJob{key, chunk, size, parentWg}:
Expand All @@ -237,6 +254,8 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
}

func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
defer self.decrementWorkerCount()

hasher := self.hashFunc()
if wwg != nil {
defer wwg.Done()
Expand All @@ -249,7 +268,6 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
return
}
// now we got the hashes in the chunk, then hash the chunks
hasher.Reset()
self.hashChunk(hasher, job, chunkC, swg)
case <-quitC:
return
Expand All @@ -260,9 +278,11 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
// The treeChunkers own Hash hashes together
// - the size (of the subtree encoded in the Chunk)
// - the Chunk, ie. the contents read from the input reader
func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
hasher.Write(job.chunk)
func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
hasher.Write(job.chunk[8:]) // minus 8 []byte length
h := hasher.Sum(nil)

newChunk := &Chunk{
Key: h,
SData: job.chunk,
Expand All @@ -285,6 +305,10 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *
}
}

func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
return nil, errAppendOppNotSuported
}

// LazyChunkReader implements LazySectionReader
type LazyChunkReader struct {
key Key // root key
Expand All @@ -298,7 +322,6 @@ type LazyChunkReader struct {

// implements the Joiner interface
func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {

return &LazyChunkReader{
key: key,
chunkC: chunkC,
Expand Down
Loading

0 comments on commit d558a59

Please sign in to comment.