Skip to content

Commit

Permalink
Allow sequential pulling with break in the middle (oras-project#83)
Browse files Browse the repository at this point in the history
- allow pulling in sequence
- allow base handlers on push and pull
  • Loading branch information
shizhMSFT authored Apr 18, 2019
1 parent d1e0198 commit 40d2096
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 17 deletions.
21 changes: 14 additions & 7 deletions pkg/oras/errors.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package oras

import "errors"
import (
"errors"
"fmt"
)

// Common errors
var (
ErrResolverUndefined = errors.New("resolver_undefined")
ErrEmptyDescriptors = errors.New("empty_descriptors")
ErrResolverUndefined = errors.New("resolver undefined")
ErrEmptyDescriptors = errors.New("empty descriptors")
)

// Path validation related errors
var (
ErrDirtyPath = errors.New("dirty_path")
ErrPathNotSlashSeparated = errors.New("path_not_slash_separated")
ErrAbsolutePathDisallowed = errors.New("absolute_path_disallowed")
ErrPathTraversalDisallowed = errors.New("path_traversal_disallowed")
ErrDirtyPath = errors.New("dirty path")
ErrPathNotSlashSeparated = errors.New("path not slash separated")
ErrAbsolutePathDisallowed = errors.New("absolute path disallowed")
ErrPathTraversalDisallowed = errors.New("path traversal disallowed")
)

// ErrStopProcessing is used to stop processing an oras operation.
// This error only makes sense in sequential pulling operation.
var ErrStopProcessing = fmt.Errorf("stop processing")
78 changes: 78 additions & 0 deletions pkg/oras/oras_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

orascontent "github.com/deislabs/oras/pkg/content"

"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/configuration"
Expand Down Expand Up @@ -235,6 +236,83 @@ func (suite *ORASTestSuite) Test_2_MediaType() {
suite.Equal(0, len(descriptors), "number of contents matches on pull")
}

// Pull with condition
func (suite *ORASTestSuite) Test_3_Conditional_Pull() {
var (
testData = [][]string{
{"version.txt", "edge"},
{"content.txt", "hello world"},
}
err error
ref string
descriptors []ocispec.Descriptor
store *orascontent.Memorystore
stop bool
)

// Push test content
store = orascontent.NewMemoryStore()
descriptors = nil
for _, data := range testData {
desc := store.Add(data[0], "", []byte(data[1]))
descriptors = append(descriptors, desc)
}
ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost)
_, err = Push(newContext(), newResolver(), ref, store, descriptors)
suite.Nil(err, "no error pushing test data")

// Pull all contents in sequence
store = orascontent.NewMemoryStore()
ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost)
_, descriptors, err = Pull(newContext(), newResolver(), ref, store, WithPullByBFS)
suite.Nil(err, "no error pulling ref")
suite.Equal(2, len(descriptors), "number of contents matches on pull")
for i, data := range testData {
_, actualContent, ok := store.GetByName(data[0])
suite.True(ok, "find in memory")
content := []byte(data[1])
suite.Equal(content, actualContent, "test content matches on pull")
name, _ := orascontent.ResolveName(descriptors[i])
suite.Equal(data[0], name, "content sequence matches on pull")
}

// Selective pull contents: stop at the very beginning
store = orascontent.NewMemoryStore()
ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost)
_, descriptors, err = Pull(newContext(), newResolver(), ref, store, WithPullByBFS,
WithPullBaseHandler(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
if name, ok := orascontent.ResolveName(desc); ok && name == testData[0][0] {
return nil, ErrStopProcessing
}
return nil, nil
})))
suite.Nil(err, "no error pulling ref")
suite.Equal(0, len(descriptors), "number of contents matches on pull")

// Selective pull contents: stop in the middle
store = orascontent.NewMemoryStore()
ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost)
stop = false
_, descriptors, err = Pull(newContext(), newResolver(), ref, store, WithPullByBFS,
WithPullBaseHandler(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
if stop {
return nil, ErrStopProcessing
}
if name, ok := orascontent.ResolveName(desc); ok && name == testData[0][0] {
stop = true
}
return nil, nil
})))
suite.Nil(err, "no error pulling ref")
suite.Equal(1, len(descriptors), "number of contents matches on pull")
for _, data := range testData[:1] {
_, actualContent, ok := store.GetByName(data[0])
suite.True(ok, "find in memory")
content := []byte(data[1])
suite.Equal(content, actualContent, "test content matches on pull")
}
}

func TestORASTestSuite(t *testing.T) {
suite.Run(t, new(ORASTestSuite))
}
40 changes: 32 additions & 8 deletions pkg/oras/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/remotes"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

// Pull pull files from the remote
Expand All @@ -35,18 +36,18 @@ func Pull(ctx context.Context, resolver remotes.Resolver, ref string, ingester c
return ocispec.Descriptor{}, nil, err
}

layers, err := fetchContent(ctx, fetcher, desc, ingester, opt.allowedMediaTypes...)
layers, err := fetchContent(ctx, fetcher, desc, ingester, opt)
if err != nil {
return ocispec.Descriptor{}, nil, err
}
return desc, layers, nil
}

func fetchContent(ctx context.Context, fetcher remotes.Fetcher, desc ocispec.Descriptor, ingester content.Ingester, allowedMediaTypes ...string) ([]ocispec.Descriptor, error) {
func fetchContent(ctx context.Context, fetcher remotes.Fetcher, desc ocispec.Descriptor, ingester content.Ingester, opts *pullOpts) ([]ocispec.Descriptor, error) {
var descriptors []ocispec.Descriptor
lock := &sync.Mutex{}
picker := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
if isAllowedMediaType(desc.MediaType, allowedMediaTypes...) {
if isAllowedMediaType(desc.MediaType, opts.allowedMediaTypes...) {
if name, ok := orascontent.ResolveName(desc); ok && len(name) > 0 {
lock.Lock()
defer lock.Unlock()
Expand All @@ -57,13 +58,17 @@ func fetchContent(ctx context.Context, fetcher remotes.Fetcher, desc ocispec.Des
return nil, nil
})
store := newHybridStoreFromIngester(ingester)
handlers := images.Handlers(
filterHandler(allowedMediaTypes...),
handlers := []images.Handler{
filterHandler(opts.allowedMediaTypes...),
}
handlers = append(handlers, opts.baseHandlers...)
handlers = append(handlers,
remotes.FetchHandler(store, fetcher),
picker,
images.ChildrenHandler(store),
)
if err := images.Dispatch(ctx, handlers, desc); err != nil {

if err := opts.dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
return nil, err
}

Expand All @@ -79,9 +84,9 @@ func filterHandler(allowedMediaTypes ...string) images.HandlerFunc {
if name, ok := orascontent.ResolveName(desc); ok && len(name) > 0 {
return nil, nil
}
log.G(ctx).Warnf("blob_no_name: %v", desc.Digest)
log.G(ctx).Warnf("blob no name: %v", desc.Digest)
default:
log.G(ctx).Warnf("unknown_type: %v", desc.MediaType)
log.G(ctx).Warnf("unknown type: %v", desc.MediaType)
}
return nil, images.ErrStopHandler
}
Expand All @@ -98,3 +103,22 @@ func isAllowedMediaType(mediaType string, allowedMediaTypes ...string) bool {
}
return false
}

// dispatchBFS behaves the same as images.Dispatch() but in sequence with breath-first search.
func dispatchBFS(ctx context.Context, handler images.Handler, descs ...ocispec.Descriptor) error {
for i := 0; i < len(descs); i++ {
desc := descs[i]
children, err := handler.Handle(ctx, desc)
if err != nil {
switch err := errors.Cause(err); err {
case images.ErrSkipDesc:
continue // don't traverse the children.
case ErrStopProcessing:
return nil
}
return err
}
descs = append(descs, children...)
}
return nil
}
28 changes: 27 additions & 1 deletion pkg/oras/pull_opts.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
package oras

import (
"context"

"github.com/containerd/containerd/images"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

type pullOpts struct {
allowedMediaTypes []string
dispatch func(context.Context, images.Handler, ...ocispec.Descriptor) error
baseHandlers []images.Handler
}

// PullOpt allows callers to set options on the oras pull
type PullOpt func(o *pullOpts) error

func pullOptsDefaults() *pullOpts {
return &pullOpts{}
return &pullOpts{
dispatch: images.Dispatch,
}
}

// WithAllowedMediaType sets the allowed media types
Expand All @@ -26,3 +37,18 @@ func WithAllowedMediaTypes(allowedMediaTypes []string) PullOpt {
return nil
}
}

// WithPullByBFS opt to pull in sequence with breath-first search
func WithPullByBFS(o *pullOpts) error {
o.dispatch = dispatchBFS
return nil
}

// WithPullBaseHandler provides base handlers, which will be called before
// any pull specific handlers.
func WithPullBaseHandler(handlers ...images.Handler) PullOpt {
return func(o *pullOpts) error {
o.baseHandlers = append(o.baseHandlers, handlers...)
return nil
}
}
2 changes: 1 addition & 1 deletion pkg/oras/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Push(ctx context.Context, resolver remotes.Resolver, ref string, provider c
return ocispec.Descriptor{}, err
}

if err := remotes.PushContent(ctx, pusher, desc, provider, nil); err != nil {
if err := remotes.PushContent(ctx, pusher, desc, provider, nil, opt.baseHandlers...); err != nil {
return ocispec.Descriptor{}, err
}
return desc, nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/oras/push_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"path/filepath"
"strings"

"github.com/containerd/containerd/images"
orascontent "github.com/deislabs/oras/pkg/content"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -15,6 +16,7 @@ type pushOpts struct {
configAnnotations map[string]string
manifestAnnotations map[string]string
validateName func(desc ocispec.Descriptor) error
baseHandlers []images.Handler
}

func pushOptsDefaults() *pushOpts {
Expand Down Expand Up @@ -98,3 +100,12 @@ func ValidateNameAsPath(desc ocispec.Descriptor) error {

return nil
}

// WithPushBaseHandler provides base handlers, which will be called before
// any push specific handlers.
func WithPushBaseHandler(handlers ...images.Handler) PushOpt {
return func(o *pushOpts) error {
o.baseHandlers = append(o.baseHandlers, handlers...)
return nil
}
}

0 comments on commit 40d2096

Please sign in to comment.