Skip to content

Commit

Permalink
feat(ipld): concurrent GetSharesByNamespace (celestiaorg#970)
Browse files Browse the repository at this point in the history
* feat(ipld): concurrent SharesByNamespace

* fix: incrementing wg counter before adding new jobs

* moving to own file, adding WrappedWaitGroup, changing loop condition

* removing result struct

* overallocating with row length, adding comments

* local imports

* changing buffer size

* fixes from review

* fix: atomic counter in wrappedWaitGroup and fetchedBounds

* fix: changing loop syntax and bounds update api

* removing wrappedWaitGroup constructor

* Update ipld/get_namespaced_shares.go

Co-authored-by: Hlib Kanunnikov <[email protected]>

* improvement: loop optimization from Wondertan

* feat: errgroup wrapping retrieval errors

* linter: goimports error

* test: Adding test for partial retrieval, opening discussion to return data

* feat: adding tracing

* fix: fixing race between counter read and channel close

* fix: not passing context from errgroup to avoid unwanted cancellation

* linter: goimports makes me cry

* refactor: removing errgroup, removing unnecessary sync.Once, simplifying logic

* tracing: removing traces in get_shares.go for separate pr

* lint: goimports

* Apply suggestions from code review

Co-authored-by: Hlib Kanunnikov <[email protected]>

* Apply suggestions from code review

Co-authored-by: rene <[email protected]>

* refactor: fixes from reviews

* fix: comment punctuation and import formatting

* docs: adding issue number for pool worker limit

Co-authored-by: Hlib Kanunnikov <[email protected]>
Co-authored-by: rene <[email protected]>
  • Loading branch information
3 people authored Aug 12, 2022
1 parent e46aea4 commit cdbc28a
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 77 deletions.
63 changes: 0 additions & 63 deletions ipld/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/celestia-node/ipld/plugin"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/nmt/namespace"
)

// GetShare fetches and returns the data for leaf `leafIndex` of root `rootCid`.
Expand Down Expand Up @@ -126,66 +123,6 @@ func GetProof(
return GetProof(ctx, bGetter, root, proof, leaf, total)
}

// GetSharesByNamespace returns all the shares from the given root
// with the given namespace.ID.
func GetSharesByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
nID namespace.ID,
) ([]Share, error) {
leaves, err := GetLeavesByNamespace(ctx, bGetter, root, nID)
if err != nil {
return nil, err
}

shares := make([]Share, len(leaves))
for i, leaf := range leaves {
shares[i] = leafToShare(leaf)
}

return shares, nil
}

// GetLeavesByNamespace returns all the leaves from the given root with the given namespace.ID.
// If nothing is found it returns both data and err as nil.
func GetLeavesByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
nID namespace.ID,
) ([]ipld.Node, error) {
err := SanityCheckNID(nID)
if err != nil {
return nil, err
}
rootH := plugin.NamespacedSha256FromCID(root)
if nID.Less(nmt.MinNamespace(rootH, nID.Size())) || !nID.LessOrEqual(nmt.MaxNamespace(rootH, nID.Size())) {
return nil, nil
}
// request the node
nd, err := plugin.GetNode(ctx, bGetter, root)
if err != nil {
return nil, err
}
// check links
lnks := nd.Links()
if len(lnks) == 1 {
// if there is one link, then this is a leaf node, so just return it
return []ipld.Node{nd}, nil
}
// if there are some links, then traverse them
var out []ipld.Node
for _, lnk := range nd.Links() {
nds, err := GetLeavesByNamespace(ctx, bGetter, lnk.Cid, nID)
if err != nil {
return out, err
}
out = append(out, nds...)
}
return out, nil
}

// leafToShare converts an NMT leaf into a Share.
func leafToShare(nd ipld.Node) Share {
// * First byte represents the type of the node, and is unrelated to the actual share data
Expand Down
222 changes: 222 additions & 0 deletions ipld/get_namespaced_shares.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package ipld

import (
"context"
"sync"
"sync/atomic"

"github.com/gammazero/workerpool"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/celestia-node/ipld/plugin"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/nmt/namespace"
)

// TODO(@distractedm1nd) Find a better figure than NumWorkersLimit for this pool. Issue #970
// namespacePool is a worker pool responsible for the goroutines spawned by getLeavesByNamespace
var namespacePool = workerpool.New(NumWorkersLimit)

// GetSharesByNamespace walks the tree of a given root and returns its shares within the given namespace.ID.
// If a share could not be retrieved, err is not nil, and the returned array
// contains nil shares in place of the shares it was unable to retrieve.
func GetSharesByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
nID namespace.ID,
maxShares int,
) ([]Share, error) {
ctx, span := tracer.Start(ctx, "get-shares-by-namespace")
defer span.End()

leaves, err := getLeavesByNamespace(ctx, bGetter, root, nID, maxShares)
if err != nil && leaves == nil {
return nil, err
}

shares := make([]Share, len(leaves))
for i, leaf := range leaves {
if leaf != nil {
shares[i] = leafToShare(leaf)
}
}

return shares, err
}

// wrappedWaitGroup is needed because waitgroups do not expose their internal counter,
// and we don't know in advance how many jobs we will have to wait for.
type wrappedWaitGroup struct {
wg sync.WaitGroup
jobs chan *job
counter int64
}

func (w *wrappedWaitGroup) add(count int64) {
w.wg.Add(int(count))
atomic.AddInt64(&w.counter, count)
}

func (w *wrappedWaitGroup) done() {
w.wg.Done()
numRemaining := atomic.AddInt64(&w.counter, -1)

// Close channel if this job was the last one
if numRemaining == 0 {
close(w.jobs)
}
}

type fetchedBounds struct {
lowest int64
highest int64
}

// update checks if the passed index is outside the current bounds,
// and updates the bounds atomically if it extends them.
func (b *fetchedBounds) update(index int64) {
lowest := atomic.LoadInt64(&b.lowest)
// try to write index to the lower bound if appropriate, and retry until the atomic op is successful
// CAS ensures that we don't overwrite if the bound has been updated in another goroutine after the comparison here
for index < lowest && !atomic.CompareAndSwapInt64(&b.lowest, lowest, index) {
lowest = atomic.LoadInt64(&b.lowest)
}
// we always run both checks because element can be both the lower and higher bound
// for example, if there is only one share in the namespace
highest := atomic.LoadInt64(&b.highest)
for index > highest && !atomic.CompareAndSwapInt64(&b.highest, highest, index) {
highest = atomic.LoadInt64(&b.highest)
}
}

// getLeavesByNamespace returns as many leaves from the given root with the given namespace.ID as it can retrieve.
// If no shares are found, it returns both data and error as nil.
// A non-nil error means that only partial data is returned, because at least one share retrieval failed
// The following implementation is based on `GetShares`.
func getLeavesByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
nID namespace.ID,
maxShares int,
) ([]ipld.Node, error) {
err := SanityCheckNID(nID)
if err != nil {
return nil, err
}

ctx, span := tracer.Start(ctx, "get-leaves-by-namespace")
defer span.End()

span.SetAttributes(
attribute.String("namespace", nID.String()),
attribute.String("root", root.String()),
)

// we don't know where in the tree the leaves in the namespace are,
// so we keep track of the bounds to return the correct slice
// maxShares acts as a sentinel to know if we find any leaves
bounds := fetchedBounds{int64(maxShares), 0}

// buffer the jobs to avoid blocking, we only need as many
// queued as the number of shares in the second-to-last layer
jobs := make(chan *job, (maxShares+1)/2)
jobs <- &job{id: root}

var wg wrappedWaitGroup
wg.jobs = jobs
wg.add(1)

var (
singleErr sync.Once
retrievalErr error
)

// we overallocate space for leaves since we do not know how many we will find
// on the level above, the length of the Row is passed in as maxShares
leaves := make([]ipld.Node, maxShares)

for {
select {
case j, ok := <-jobs:
if !ok {
// if there were no leaves under the given root in the given namespace,
// both return values are nil. otherwise, the error will also be non-nil.
if bounds.lowest == int64(maxShares) {
return nil, retrievalErr
}

return leaves[bounds.lowest : bounds.highest+1], retrievalErr
}
namespacePool.Submit(func() {
ctx, span := tracer.Start(ctx, "process-job")
defer span.End()
defer wg.done()

// if an error is likely to be returned or not depends on
// the underlying impl of the blockservice, currently it is not a realistic probability
nd, err := plugin.GetNode(ctx, bGetter, j.id)
if err != nil {
singleErr.Do(func() {
retrievalErr = err
})
log.Errorw("getSharesByNamespace: could not retrieve node", "nID", nID, "pos", j.pos, "err", err)
span.RecordError(err, trace.WithAttributes(
attribute.Int("pos", j.pos),
))
// we still need to update the bounds
bounds.update(int64(j.pos))
return
}

links := nd.Links()
linkCount := uint64(len(links))
if linkCount == 1 {
// successfully fetched a leaf belonging to the namespace
span.AddEvent("found-leaf")
leaves[j.pos] = nd
// we found a leaf, so we update the bounds
// the update routine is repeated until the atomic swap is successful
bounds.update(int64(j.pos))
return
}

// this node has links in the namespace, so keep walking
for i, lnk := range links {
newJob := &job{
id: lnk.Cid,
// position represents the index in a flattened binary tree,
// so we can return a slice of leaves in order
pos: j.pos*2 + i,
}

// if the link's nID isn't in range we don't need to create a new job for it
jobNid := plugin.NamespacedSha256FromCID(newJob.id)
if nID.Less(nmt.MinNamespace(jobNid, nID.Size())) || !nID.LessOrEqual(nmt.MaxNamespace(jobNid, nID.Size())) {
continue
}

// by passing the previous check, we know we will have one more node to process
// note: it is important to increase the counter before sending to the channel
wg.add(1)
select {
case jobs <- newJob:
span.AddEvent("added-job", trace.WithAttributes(
attribute.String("cid", newJob.id.String()),
attribute.Int("pos", newJob.pos),
))
case <-ctx.Done():
return
}
}
})
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
7 changes: 1 addition & 6 deletions ipld/get_shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ var pool = workerpool.New(NumWorkersLimit)
// implementation that rely on this property are explicitly tagged with
// (bin-tree-feat).
func GetShares(ctx context.Context, bGetter blockservice.BlockGetter, root cid.Cid, shares int, put func(int, Share)) {
// job is not used anywhere else, so can be kept here
type job struct {
id cid.Cid
pos int
}
// this buffer ensures writes to 'jobs' are never blocking (bin-tree-feat)
jobs := make(chan *job, (shares+1)/2) // +1 for the case where 'shares' is 1
jobs <- &job{id: root}
Expand Down Expand Up @@ -91,7 +86,7 @@ func GetShares(ctx context.Context, bGetter blockservice.BlockGetter, root cid.C
return
}
// successfully fetched a share/leaf
// ladies and gentlemen, we got em!
// ladies and gentlemen, we got em
put(j.pos, leafToShare(nd))
return
}
Expand Down
Loading

0 comments on commit cdbc28a

Please sign in to comment.