From 40d20966e9eac43731e558e1b2c7f02ef33a3a81 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Thu, 18 Apr 2019 14:16:09 +0800 Subject: [PATCH] Allow sequential pulling with break in the middle (#83) - allow pulling in sequence - allow base handlers on push and pull --- pkg/oras/errors.go | 21 ++++++++---- pkg/oras/oras_test.go | 78 +++++++++++++++++++++++++++++++++++++++++++ pkg/oras/pull.go | 40 +++++++++++++++++----- pkg/oras/pull_opts.go | 28 +++++++++++++++- pkg/oras/push.go | 2 +- pkg/oras/push_opts.go | 11 ++++++ 6 files changed, 163 insertions(+), 17 deletions(-) diff --git a/pkg/oras/errors.go b/pkg/oras/errors.go index a045adc46..1812a060e 100644 --- a/pkg/oras/errors.go +++ b/pkg/oras/errors.go @@ -1,17 +1,24 @@ package oras -import "errors" +import ( + "errors" + "fmt" +) // Common errors var ( - ErrResolverUndefined = errors.New("resolver_undefined") - ErrEmptyDescriptors = errors.New("empty_descriptors") + ErrResolverUndefined = errors.New("resolver undefined") + ErrEmptyDescriptors = errors.New("empty descriptors") ) // Path validation related errors var ( - ErrDirtyPath = errors.New("dirty_path") - ErrPathNotSlashSeparated = errors.New("path_not_slash_separated") - ErrAbsolutePathDisallowed = errors.New("absolute_path_disallowed") - ErrPathTraversalDisallowed = errors.New("path_traversal_disallowed") + ErrDirtyPath = errors.New("dirty path") + ErrPathNotSlashSeparated = errors.New("path not slash separated") + ErrAbsolutePathDisallowed = errors.New("absolute path disallowed") + ErrPathTraversalDisallowed = errors.New("path traversal disallowed") ) + +// ErrStopProcessing is used to stop processing an oras operation. +// This error only makes sense in sequential pulling operation. +var ErrStopProcessing = fmt.Errorf("stop processing") diff --git a/pkg/oras/oras_test.go b/pkg/oras/oras_test.go index 3ec811587..6a59e4609 100644 --- a/pkg/oras/oras_test.go +++ b/pkg/oras/oras_test.go @@ -11,6 +11,7 @@ import ( orascontent "github.com/deislabs/oras/pkg/content" + "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/configuration" @@ -235,6 +236,83 @@ func (suite *ORASTestSuite) Test_2_MediaType() { suite.Equal(0, len(descriptors), "number of contents matches on pull") } +// Pull with condition +func (suite *ORASTestSuite) Test_3_Conditional_Pull() { + var ( + testData = [][]string{ + {"version.txt", "edge"}, + {"content.txt", "hello world"}, + } + err error + ref string + descriptors []ocispec.Descriptor + store *orascontent.Memorystore + stop bool + ) + + // Push test content + store = orascontent.NewMemoryStore() + descriptors = nil + for _, data := range testData { + desc := store.Add(data[0], "", []byte(data[1])) + descriptors = append(descriptors, desc) + } + ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost) + _, err = Push(newContext(), newResolver(), ref, store, descriptors) + suite.Nil(err, "no error pushing test data") + + // Pull all contents in sequence + store = orascontent.NewMemoryStore() + ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost) + _, descriptors, err = Pull(newContext(), newResolver(), ref, store, WithPullByBFS) + suite.Nil(err, "no error pulling ref") + suite.Equal(2, len(descriptors), "number of contents matches on pull") + for i, data := range testData { + _, actualContent, ok := store.GetByName(data[0]) + suite.True(ok, "find in memory") + content := []byte(data[1]) + suite.Equal(content, actualContent, "test content matches on pull") + name, _ := orascontent.ResolveName(descriptors[i]) + suite.Equal(data[0], name, "content sequence matches on pull") + } + + // Selective pull contents: stop at the very beginning + store = orascontent.NewMemoryStore() + ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost) + _, descriptors, err = Pull(newContext(), newResolver(), ref, store, WithPullByBFS, + WithPullBaseHandler(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + if name, ok := orascontent.ResolveName(desc); ok && name == testData[0][0] { + return nil, ErrStopProcessing + } + return nil, nil + }))) + suite.Nil(err, "no error pulling ref") + suite.Equal(0, len(descriptors), "number of contents matches on pull") + + // Selective pull contents: stop in the middle + store = orascontent.NewMemoryStore() + ref = fmt.Sprintf("%s/conditional-pull:test", suite.DockerRegistryHost) + stop = false + _, descriptors, err = Pull(newContext(), newResolver(), ref, store, WithPullByBFS, + WithPullBaseHandler(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + if stop { + return nil, ErrStopProcessing + } + if name, ok := orascontent.ResolveName(desc); ok && name == testData[0][0] { + stop = true + } + return nil, nil + }))) + suite.Nil(err, "no error pulling ref") + suite.Equal(1, len(descriptors), "number of contents matches on pull") + for _, data := range testData[:1] { + _, actualContent, ok := store.GetByName(data[0]) + suite.True(ok, "find in memory") + content := []byte(data[1]) + suite.Equal(content, actualContent, "test content matches on pull") + } +} + func TestORASTestSuite(t *testing.T) { suite.Run(t, new(ORASTestSuite)) } diff --git a/pkg/oras/pull.go b/pkg/oras/pull.go index f122cf741..7ed2bf814 100644 --- a/pkg/oras/pull.go +++ b/pkg/oras/pull.go @@ -11,6 +11,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/remotes" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" ) // Pull pull files from the remote @@ -35,18 +36,18 @@ func Pull(ctx context.Context, resolver remotes.Resolver, ref string, ingester c return ocispec.Descriptor{}, nil, err } - layers, err := fetchContent(ctx, fetcher, desc, ingester, opt.allowedMediaTypes...) + layers, err := fetchContent(ctx, fetcher, desc, ingester, opt) if err != nil { return ocispec.Descriptor{}, nil, err } return desc, layers, nil } -func fetchContent(ctx context.Context, fetcher remotes.Fetcher, desc ocispec.Descriptor, ingester content.Ingester, allowedMediaTypes ...string) ([]ocispec.Descriptor, error) { +func fetchContent(ctx context.Context, fetcher remotes.Fetcher, desc ocispec.Descriptor, ingester content.Ingester, opts *pullOpts) ([]ocispec.Descriptor, error) { var descriptors []ocispec.Descriptor lock := &sync.Mutex{} picker := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - if isAllowedMediaType(desc.MediaType, allowedMediaTypes...) { + if isAllowedMediaType(desc.MediaType, opts.allowedMediaTypes...) { if name, ok := orascontent.ResolveName(desc); ok && len(name) > 0 { lock.Lock() defer lock.Unlock() @@ -57,13 +58,17 @@ func fetchContent(ctx context.Context, fetcher remotes.Fetcher, desc ocispec.Des return nil, nil }) store := newHybridStoreFromIngester(ingester) - handlers := images.Handlers( - filterHandler(allowedMediaTypes...), + handlers := []images.Handler{ + filterHandler(opts.allowedMediaTypes...), + } + handlers = append(handlers, opts.baseHandlers...) + handlers = append(handlers, remotes.FetchHandler(store, fetcher), picker, images.ChildrenHandler(store), ) - if err := images.Dispatch(ctx, handlers, desc); err != nil { + + if err := opts.dispatch(ctx, images.Handlers(handlers...), desc); err != nil { return nil, err } @@ -79,9 +84,9 @@ func filterHandler(allowedMediaTypes ...string) images.HandlerFunc { if name, ok := orascontent.ResolveName(desc); ok && len(name) > 0 { return nil, nil } - log.G(ctx).Warnf("blob_no_name: %v", desc.Digest) + log.G(ctx).Warnf("blob no name: %v", desc.Digest) default: - log.G(ctx).Warnf("unknown_type: %v", desc.MediaType) + log.G(ctx).Warnf("unknown type: %v", desc.MediaType) } return nil, images.ErrStopHandler } @@ -98,3 +103,22 @@ func isAllowedMediaType(mediaType string, allowedMediaTypes ...string) bool { } return false } + +// dispatchBFS behaves the same as images.Dispatch() but in sequence with breath-first search. +func dispatchBFS(ctx context.Context, handler images.Handler, descs ...ocispec.Descriptor) error { + for i := 0; i < len(descs); i++ { + desc := descs[i] + children, err := handler.Handle(ctx, desc) + if err != nil { + switch err := errors.Cause(err); err { + case images.ErrSkipDesc: + continue // don't traverse the children. + case ErrStopProcessing: + return nil + } + return err + } + descs = append(descs, children...) + } + return nil +} diff --git a/pkg/oras/pull_opts.go b/pkg/oras/pull_opts.go index c729bfeac..28b90a088 100644 --- a/pkg/oras/pull_opts.go +++ b/pkg/oras/pull_opts.go @@ -1,14 +1,25 @@ package oras +import ( + "context" + + "github.com/containerd/containerd/images" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + type pullOpts struct { allowedMediaTypes []string + dispatch func(context.Context, images.Handler, ...ocispec.Descriptor) error + baseHandlers []images.Handler } // PullOpt allows callers to set options on the oras pull type PullOpt func(o *pullOpts) error func pullOptsDefaults() *pullOpts { - return &pullOpts{} + return &pullOpts{ + dispatch: images.Dispatch, + } } // WithAllowedMediaType sets the allowed media types @@ -26,3 +37,18 @@ func WithAllowedMediaTypes(allowedMediaTypes []string) PullOpt { return nil } } + +// WithPullByBFS opt to pull in sequence with breath-first search +func WithPullByBFS(o *pullOpts) error { + o.dispatch = dispatchBFS + return nil +} + +// WithPullBaseHandler provides base handlers, which will be called before +// any pull specific handlers. +func WithPullBaseHandler(handlers ...images.Handler) PullOpt { + return func(o *pullOpts) error { + o.baseHandlers = append(o.baseHandlers, handlers...) + return nil + } +} diff --git a/pkg/oras/push.go b/pkg/oras/push.go index 86d51d937..e80142ca4 100644 --- a/pkg/oras/push.go +++ b/pkg/oras/push.go @@ -43,7 +43,7 @@ func Push(ctx context.Context, resolver remotes.Resolver, ref string, provider c return ocispec.Descriptor{}, err } - if err := remotes.PushContent(ctx, pusher, desc, provider, nil); err != nil { + if err := remotes.PushContent(ctx, pusher, desc, provider, nil, opt.baseHandlers...); err != nil { return ocispec.Descriptor{}, err } return desc, nil diff --git a/pkg/oras/push_opts.go b/pkg/oras/push_opts.go index b4d75daf5..fd60d666a 100644 --- a/pkg/oras/push_opts.go +++ b/pkg/oras/push_opts.go @@ -4,6 +4,7 @@ import ( "path/filepath" "strings" + "github.com/containerd/containerd/images" orascontent "github.com/deislabs/oras/pkg/content" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -15,6 +16,7 @@ type pushOpts struct { configAnnotations map[string]string manifestAnnotations map[string]string validateName func(desc ocispec.Descriptor) error + baseHandlers []images.Handler } func pushOptsDefaults() *pushOpts { @@ -98,3 +100,12 @@ func ValidateNameAsPath(desc ocispec.Descriptor) error { return nil } + +// WithPushBaseHandler provides base handlers, which will be called before +// any push specific handlers. +func WithPushBaseHandler(handlers ...images.Handler) PushOpt { + return func(o *pushOpts) error { + o.baseHandlers = append(o.baseHandlers, handlers...) + return nil + } +}