Skip to content

Commit

Permalink
Adds context support for nas, oss and s3
Browse files Browse the repository at this point in the history
  • Loading branch information
ebozduman authored and kannappanr committed Mar 15, 2018
1 parent bdb1a90 commit 33fe42d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 77 deletions.
41 changes: 21 additions & 20 deletions cmd/gateway-unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cmd

import (
"context"
"time"

"github.com/minio/minio-go/pkg/policy"
Expand All @@ -29,105 +30,105 @@ import (
type GatewayUnsupported struct{}

// ListMultipartUploads lists all multipart uploads.
func (a GatewayUnsupported) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi ListMultipartsInfo, err error) {
func (a GatewayUnsupported) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi ListMultipartsInfo, err error) {
return lmi, errors.Trace(NotImplemented{})
}

// NewMultipartUpload upload object in multiple parts
func (a GatewayUnsupported) NewMultipartUpload(bucket string, object string, metadata map[string]string) (uploadID string, err error) {
func (a GatewayUnsupported) NewMultipartUpload(ctx context.Context, bucket string, object string, metadata map[string]string) (uploadID string, err error) {
return "", errors.Trace(NotImplemented{})
}

// CopyObjectPart copy part of object to uploadID for another object
func (a GatewayUnsupported) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string, partID int, startOffset, length int64, srcInfo ObjectInfo) (pi PartInfo, err error) {
func (a GatewayUnsupported) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string, partID int, startOffset, length int64, srcInfo ObjectInfo) (pi PartInfo, err error) {
return pi, errors.Trace(NotImplemented{})
}

// PutObjectPart puts a part of object in bucket
func (a GatewayUnsupported) PutObjectPart(bucket string, object string, uploadID string, partID int, data *hash.Reader) (pi PartInfo, err error) {
func (a GatewayUnsupported) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, data *hash.Reader) (pi PartInfo, err error) {
return pi, errors.Trace(NotImplemented{})
}

// ListObjectParts returns all object parts for specified object in specified bucket
func (a GatewayUnsupported) ListObjectParts(bucket string, object string, uploadID string, partNumberMarker int, maxParts int) (lpi ListPartsInfo, err error) {
func (a GatewayUnsupported) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int) (lpi ListPartsInfo, err error) {
return lpi, errors.Trace(NotImplemented{})
}

// AbortMultipartUpload aborts a ongoing multipart upload
func (a GatewayUnsupported) AbortMultipartUpload(bucket string, object string, uploadID string) error {
func (a GatewayUnsupported) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
return errors.Trace(NotImplemented{})
}

// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
func (a GatewayUnsupported) CompleteMultipartUpload(bucket string, object string, uploadID string, uploadedParts []CompletePart) (oi ObjectInfo, err error) {
func (a GatewayUnsupported) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []CompletePart) (oi ObjectInfo, err error) {
return oi, errors.Trace(NotImplemented{})
}

// SetBucketPolicy sets policy on bucket
func (a GatewayUnsupported) SetBucketPolicy(bucket string, policyInfo policy.BucketAccessPolicy) error {
func (a GatewayUnsupported) SetBucketPolicy(ctx context.Context, bucket string, policyInfo policy.BucketAccessPolicy) error {
return errors.Trace(NotImplemented{})
}

// GetBucketPolicy will get policy on bucket
func (a GatewayUnsupported) GetBucketPolicy(bucket string) (bal policy.BucketAccessPolicy, err error) {
func (a GatewayUnsupported) GetBucketPolicy(ctx context.Context, bucket string) (bal policy.BucketAccessPolicy, err error) {
return bal, errors.Trace(NotImplemented{})
}

// DeleteBucketPolicy deletes all policies on bucket
func (a GatewayUnsupported) DeleteBucketPolicy(bucket string) error {
func (a GatewayUnsupported) DeleteBucketPolicy(ctx context.Context, bucket string) error {
return errors.Trace(NotImplemented{})
}

// HealFormat - Not implemented stub
func (a GatewayUnsupported) HealFormat(dryRun bool) (madmin.HealResultItem, error) {
func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
return madmin.HealResultItem{}, errors.Trace(NotImplemented{})
}

// HealBucket - Not implemented stub
func (a GatewayUnsupported) HealBucket(bucket string, dryRun bool) ([]madmin.HealResultItem, error) {
func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error) {
return nil, errors.Trace(NotImplemented{})
}

// ListBucketsHeal - Not implemented stub
func (a GatewayUnsupported) ListBucketsHeal() (buckets []BucketInfo, err error) {
func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) {
return nil, errors.Trace(NotImplemented{})
}

// HealObject - Not implemented stub
func (a GatewayUnsupported) HealObject(bucket, object string, dryRun bool) (h madmin.HealResultItem, e error) {
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun bool) (h madmin.HealResultItem, e error) {
return h, errors.Trace(NotImplemented{})
}

// ListObjectsV2 - Not implemented stub
func (a GatewayUnsupported) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
return result, errors.Trace(NotImplemented{})
}

// ListObjectsHeal - Not implemented stub
func (a GatewayUnsupported) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
func (a GatewayUnsupported) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
return loi, errors.Trace(NotImplemented{})
}

// CopyObject copies a blob from source container to destination container.
func (a GatewayUnsupported) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string,
func (a GatewayUnsupported) CopyObject(ctx context.Context, srcBucket string, srcObject string, destBucket string, destObject string,
srcInfo ObjectInfo) (objInfo ObjectInfo, err error) {
return objInfo, errors.Trace(NotImplemented{})
}

// Locking operations

// ListLocks lists namespace locks held in object layer
func (a GatewayUnsupported) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
func (a GatewayUnsupported) ListLocks(ctx context.Context, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
return []VolumeLockInfo{}, errors.Trace(NotImplemented{})
}

// ClearLocks clears namespace locks held in object layer
func (a GatewayUnsupported) ClearLocks([]VolumeLockInfo) error {
func (a GatewayUnsupported) ClearLocks(ctx context.Context, info []VolumeLockInfo) error {
return errors.Trace(NotImplemented{})
}

// RefreshBucketPolicy refreshes cache policy with what's on disk.
func (a GatewayUnsupported) RefreshBucketPolicy(bucket string) error {
func (a GatewayUnsupported) RefreshBucketPolicy(ctx context.Context, bucket string) error {
return errors.Trace(NotImplemented{})
}

Expand Down
8 changes: 5 additions & 3 deletions cmd/gateway/nas/gateway-nas.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package nas

import (
"context"

"github.com/minio/cli"
"github.com/minio/minio-go/pkg/policy"
minio "github.com/minio/minio/cmd"
Expand Down Expand Up @@ -71,8 +73,8 @@ EXAMPLES:
func nasGatewayMain(ctx *cli.Context) {
// Validate gateway arguments.
host := ctx.Args().First()
if host == "" {
cli.ShowCommandHelpAndExit(ctx, "nas", 1)
if host == "help" {
cli.ShowCommandHelpAndExit(ctx, nasBackend, 1)
}
// Validate gateway arguments.
minio.StartGateway(ctx, &NAS{host})
Expand Down Expand Up @@ -114,6 +116,6 @@ func (l *nasObjects) IsNotificationSupported() bool {
}

// GetBucketPolicy will get policy on bucket
func (l *nasObjects) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, error) {
func (l *nasObjects) GetBucketPolicy(ctx context.Context, bucket string) (policy.BucketAccessPolicy, error) {
return minio.ReadBucketPolicy(bucket, l)
}
51 changes: 26 additions & 25 deletions cmd/gateway/oss/gateway-oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package oss

import (
"context"
"encoding/xml"
"fmt"
"io"
Expand Down Expand Up @@ -323,12 +324,12 @@ type ossObjects struct {

// Shutdown saves any gateway metadata to disk
// if necessary and reload upon next restart.
func (l *ossObjects) Shutdown() error {
func (l *ossObjects) Shutdown(ctx context.Context) error {
return nil
}

// StorageInfo is not relevant to OSS backend.
func (l *ossObjects) StorageInfo() (si minio.StorageInfo) {
func (l *ossObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo) {
return
}

Expand All @@ -345,7 +346,7 @@ func ossIsValidBucketName(bucket string) bool {
}

// MakeBucketWithLocation creates a new container on OSS backend.
func (l *ossObjects) MakeBucketWithLocation(bucket, location string) error {
func (l *ossObjects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
if !ossIsValidBucketName(bucket) {
return errors.Trace(minio.BucketNameInvalid{Bucket: bucket})
}
Expand All @@ -372,12 +373,12 @@ func ossGeBucketInfo(client *oss.Client, bucket string) (bi minio.BucketInfo, er
}

// GetBucketInfo gets bucket metadata.
func (l *ossObjects) GetBucketInfo(bucket string) (bi minio.BucketInfo, err error) {
func (l *ossObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
return ossGeBucketInfo(l.Client, bucket)
}

// ListBuckets lists all OSS buckets.
func (l *ossObjects) ListBuckets() (buckets []minio.BucketInfo, err error) {
func (l *ossObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
marker := oss.Marker("")
for {
lbr, err := l.Client.ListBuckets(marker)
Expand All @@ -402,7 +403,7 @@ func (l *ossObjects) ListBuckets() (buckets []minio.BucketInfo, err error) {
}

// DeleteBucket deletes a bucket on OSS.
func (l *ossObjects) DeleteBucket(bucket string) error {
func (l *ossObjects) DeleteBucket(ctx context.Context, bucket string) error {
err := l.Client.DeleteBucket(bucket)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket)
Expand Down Expand Up @@ -483,12 +484,12 @@ func ossListObjectsV2(client *oss.Client, bucket, prefix, continuationToken, del
}

// ListObjects lists all blobs in OSS bucket filtered by prefix.
func (l *ossObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
func (l *ossObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
return ossListObjects(l.Client, bucket, prefix, marker, delimiter, maxKeys)
}

// ListObjectsV2 lists all blobs in OSS bucket filtered by prefix
func (l *ossObjects) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int,
func (l *ossObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int,
fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) {
return ossListObjectsV2(l.Client, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter)
}
Expand Down Expand Up @@ -532,7 +533,7 @@ func ossGetObject(client *oss.Client, bucket, key string, startOffset, length in
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *ossObjects) GetObject(bucket, key string, startOffset, length int64, writer io.Writer, etag string) error {
func (l *ossObjects) GetObject(ctx context.Context, bucket, key string, startOffset, length int64, writer io.Writer, etag string) error {
return ossGetObject(l.Client, bucket, key, startOffset, length, writer, etag)
}

Expand Down Expand Up @@ -580,7 +581,7 @@ func ossGetObjectInfo(client *oss.Client, bucket, object string) (objInfo minio.
}

// GetObjectInfo reads object info and replies back ObjectInfo.
func (l *ossObjects) GetObjectInfo(bucket, object string) (objInfo minio.ObjectInfo, err error) {
func (l *ossObjects) GetObjectInfo(ctx context.Context, bucket, object string) (objInfo minio.ObjectInfo, err error) {
return ossGetObjectInfo(l.Client, bucket, object)
}

Expand All @@ -606,12 +607,12 @@ func ossPutObject(client *oss.Client, bucket, object string, data *hash.Reader,
}

// PutObject creates a new object with the incoming data.
func (l *ossObjects) PutObject(bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
func (l *ossObjects) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) {
return ossPutObject(l.Client, bucket, object, data, metadata)
}

// CopyObject copies an object from source bucket to a destination bucket.
func (l *ossObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
func (l *ossObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
bkt, err := l.Client.Bucket(srcBucket)
if err != nil {
return objInfo, ossToObjectError(errors.Trace(err), srcBucket, srcObject)
Expand All @@ -633,11 +634,11 @@ func (l *ossObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
if _, err = bkt.CopyObjectTo(dstBucket, dstObject, srcObject, opts...); err != nil {
return objInfo, ossToObjectError(errors.Trace(err), srcBucket, srcObject)
}
return l.GetObjectInfo(dstBucket, dstObject)
return l.GetObjectInfo(ctx, dstBucket, dstObject)
}

// DeleteObject deletes a blob in bucket.
func (l *ossObjects) DeleteObject(bucket, object string) error {
func (l *ossObjects) DeleteObject(ctx context.Context, bucket, object string) error {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, object)
Expand Down Expand Up @@ -679,7 +680,7 @@ func fromOSSClientListMultipartsInfo(lmur oss.ListMultipartUploadResult) minio.L
}

// ListMultipartUploads lists all multipart uploads.
func (l *ossObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
func (l *ossObjects) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return lmi, ossToObjectError(errors.Trace(err), bucket)
Expand All @@ -695,7 +696,7 @@ func (l *ossObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMar
}

// NewMultipartUpload upload object in multiple parts.
func (l *ossObjects) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error) {
func (l *ossObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return uploadID, ossToObjectError(errors.Trace(err), bucket, object)
Expand All @@ -716,7 +717,7 @@ func (l *ossObjects) NewMultipartUpload(bucket, object string, metadata map[stri
}

// PutObjectPart puts a part of object in bucket.
func (l *ossObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *hash.Reader) (pi minio.PartInfo, err error) {
func (l *ossObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (pi minio.PartInfo, err error) {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return pi, ossToObjectError(errors.Trace(err), bucket, object)
Expand Down Expand Up @@ -796,7 +797,7 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par

// CopyObjectPart creates a part in a multipart upload by copying
// existing object or a part of it.
func (l *ossObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string,
func (l *ossObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
partID int, startOffset, length int64, srcInfo minio.ObjectInfo) (p minio.PartInfo, err error) {

bkt, err := l.Client.Bucket(destBucket)
Expand Down Expand Up @@ -825,7 +826,7 @@ func (l *ossObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject
}

// ListObjectParts returns all object parts for specified object in specified bucket
func (l *ossObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi minio.ListPartsInfo, err error) {
func (l *ossObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi minio.ListPartsInfo, err error) {
lupr, err := ossListObjectParts(l.Client, bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil {
return lpi, ossToObjectError(errors.Trace(err), bucket, object, uploadID)
Expand All @@ -835,7 +836,7 @@ func (l *ossObjects) ListObjectParts(bucket, object, uploadID string, partNumber
}

// AbortMultipartUpload aborts a ongoing multipart upload.
func (l *ossObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
func (l *ossObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
bkt, err := l.Client.Bucket(bucket)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket, object)
Expand All @@ -853,7 +854,7 @@ func (l *ossObjects) AbortMultipartUpload(bucket, object, uploadID string) error
}

// CompleteMultipartUpload completes ongoing multipart upload and finalizes object.
func (l *ossObjects) CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []minio.CompletePart) (oi minio.ObjectInfo, err error) {
func (l *ossObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart) (oi minio.ObjectInfo, err error) {
client := l.Client
bkt, err := client.Bucket(bucket)
if err != nil {
Expand Down Expand Up @@ -910,15 +911,15 @@ func (l *ossObjects) CompleteMultipartUpload(bucket, object, uploadID string, up
return oi, ossToObjectError(errors.Trace(err), bucket, object)
}

return l.GetObjectInfo(bucket, object)
return l.GetObjectInfo(ctx, bucket, object)
}

// SetBucketPolicy sets policy on bucket.
// OSS supports three types of bucket policies:
// oss.ACLPublicReadWrite: readwrite in minio terminology
// oss.ACLPublicRead: readonly in minio terminology
// oss.ACLPrivate: none in minio terminology
func (l *ossObjects) SetBucketPolicy(bucket string, policyInfo policy.BucketAccessPolicy) error {
func (l *ossObjects) SetBucketPolicy(ctx context.Context, bucket string, policyInfo policy.BucketAccessPolicy) error {
bucketPolicies := policy.GetPolicies(policyInfo.Statements, bucket, "")
if len(bucketPolicies) != 1 {
return errors.Trace(minio.NotImplemented{})
Expand Down Expand Up @@ -952,7 +953,7 @@ func (l *ossObjects) SetBucketPolicy(bucket string, policyInfo policy.BucketAcce
}

// GetBucketPolicy will get policy on bucket.
func (l *ossObjects) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, error) {
func (l *ossObjects) GetBucketPolicy(ctx context.Context, bucket string) (policy.BucketAccessPolicy, error) {
result, err := l.Client.GetBucketACL(bucket)
if err != nil {
return policy.BucketAccessPolicy{}, ossToObjectError(errors.Trace(err))
Expand All @@ -975,7 +976,7 @@ func (l *ossObjects) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy,
}

// DeleteBucketPolicy deletes all policies on bucket.
func (l *ossObjects) DeleteBucketPolicy(bucket string) error {
func (l *ossObjects) DeleteBucketPolicy(ctx context.Context, bucket string) error {
err := l.Client.SetBucketACL(bucket, oss.ACLPrivate)
if err != nil {
return ossToObjectError(errors.Trace(err), bucket)
Expand Down
Loading

0 comments on commit 33fe42d

Please sign in to comment.