Skip to content

Commit

Permalink
feat(objectnode): support post object
Browse files Browse the repository at this point in the history
Signed-off-by: yhjiango <[email protected]>
  • Loading branch information
yhjiango authored and yhjiango committed Oct 31, 2023
1 parent 78e0bc2 commit 26f6923
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 13 deletions.
223 changes: 220 additions & 3 deletions objectnode/api_handler_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"syscall"
"time"
"unicode/utf8"

"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util/log"
Expand Down Expand Up @@ -1398,10 +1399,226 @@ func (o *ObjectNode) postObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

// TODO: will be implemented
errorCode = UnsupportedOperation
var vol *Volume
if vol, err = o.getVol(param.Bucket()); err != nil {
log.LogErrorf("postObjectHandler: load volume fail: requestID(%v) volume(%v) err(%v)",
GetRequestID(r), param.Bucket(), err)
return
}

return
var userInfo *proto.UserInfo
if userInfo, err = o.getUserInfoByAccessKeyV2(param.AccessKey()); err != nil {
log.LogErrorf("postObjectHandler: get user info fail: requestID(%v) volume(%v) accessKey(%v) err(%v)",
GetRequestID(r), param.Bucket(), param.AccessKey(), err)
return
}

// qps and concurrency limit
rateLimit := o.AcquireRateLimiter()
if err = rateLimit.AcquireLimitResource(vol.owner, param.apiName); err != nil {
return
}
defer rateLimit.ReleaseLimitResource(vol.owner, param.apiName)

// content-md5 check if specified in the request
requestMD5 := r.Header.Get(ContentMD5)
if requestMD5 != "" {
decoded, err := base64.StdEncoding.DecodeString(requestMD5)
if err != nil {
errorCode = InvalidDigest
return
}
requestMD5 = hex.EncodeToString(decoded)
}

// object lock check
objetLock, err := vol.metaLoader.loadObjectLock()
if err != nil {
log.LogErrorf("postObjectHandler: load volume objetLock fail: requestID(%v) volume(%v) err(%v)",
GetRequestID(r), param.Bucket(), err)
return
}
if objetLock != nil && objetLock.ToRetention() != nil && requestMD5 == "" {
errorCode = NoContentMd5HeaderErr
return
}

// parse the request form and check
formReq := NewFormRequest(r)
if err = formReq.ParseMultipartForm(); err != nil {
log.LogErrorf("postObjectHandler: parse form fail: requestID(%v) volume(%v) err(%v)",
GetRequestID(r), param.Bucket(), err)
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%v)", errorCode.ErrorMessage, err)
return
}

key := formReq.MultipartFormValue("key")
if key == "" {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%s)", errorCode.ErrorMessage, "Missing key")
return
}
key = strings.Replace(key, "${filename}", formReq.FileName(), -1)
if !utf8.ValidString(key) || len(key) > MaxKeyLength {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%s)", errorCode.ErrorMessage, "Invalid utf8 string or the key is too long")
return
}

var aclInfo *AccessControlPolicy
if acl := formReq.MultipartFormValue("acl"); acl != "" {
if aclInfo, err = ParseCannedAcl(acl, userInfo.UserID); err != nil {
log.LogErrorf("postObjectHandler: parse canned acl fail: requestID(%v) volume(%v) acl(%v) err(%v)",
GetRequestID(r), param.Bucket(), acl, err)
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%v)", errorCode.ErrorMessage, err)
return
}
}

var tagging *Tagging
if taggingRaw := formReq.MultipartFormValue("tagging"); taggingRaw != "" {
if tagging, err = ParseTagging(taggingRaw); err != nil {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%s)", errorCode.ErrorMessage, "Invalid tagging")
return
}
if _, erc := tagging.Validate(); erc != nil {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%v)", errorCode.ErrorMessage, erc)
return
}
}

successStatus := formReq.MultipartFormValue("success_action_status")
successRedirect := formReq.MultipartFormValue("success_action_redirect")
if successRedirect != "" {
if _, err = url.Parse(successRedirect); err != nil {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%s)", errorCode.ErrorMessage, "Invalid success_action_redirect")
return
}
}

contentType := formReq.MultipartFormValue("content-type")
contentDisposition := formReq.MultipartFormValue("content-disposition")
cacheControl := formReq.MultipartFormValue("cache-control")
if cacheControl != "" && !ValidateCacheControl(cacheControl) {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%s)", errorCode.ErrorMessage, "Invalid cache-control")
return
}

expires := formReq.MultipartFormValue("expires")
if expires != "" && !ValidateCacheExpires(expires) {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%s)", errorCode.ErrorMessage, "Invalid expires")
return
}

policy := formReq.MultipartFormValue("policy")
if policy == "" {
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%s)", errorCode.ErrorMessage, "Missing policy")
return
}

// read the file, the rest will be written to a temporary file if exceed
f, size, err := formReq.FormFile(10 << 20)
if err != nil {
log.LogErrorf("postObjectHandler: form file fail: requestID(%v) volume(%v) err(%v)",
GetRequestID(r), param.Bucket(), err)
errorCode = MalformedPOSTRequest
errorCode.ErrorMessage = fmt.Sprintf("%s (%v)", errorCode.ErrorMessage, err)
return
}
defer f.Close()
if size > SinglePutLimit {
errorCode = EntityTooLarge
return
}

metadata := make(map[string]string)
// policy match forms
forms := make(map[string]string)
for name, values := range formReq.MultipartForm.Value {
name = strings.ToLower(name)
if strings.HasPrefix(name, XAmzMetaPrefix) {
forms[name] = strings.Join(values, ",")
metadata[name[len(XAmzMetaPrefix):]] = strings.Join(values, ",")
} else if len(values) > 0 {
forms[name] = values[0]
}
}
forms["bucket"] = param.Bucket()
forms["key"] = key
forms["content-length"] = strconv.FormatInt(size, 10)

// policy condition check
if err = PolicyConditionMatch(policy, forms); err != nil {
log.LogErrorf("postObjectHandler: policy match fail: requestID(%v) volume(%v) forms(%v) err(%v)",
GetRequestID(r), param.Bucket(), forms, err)
return
}

// flow control
var reader io.Reader
if size > DefaultFlowLimitSize {
reader = rateLimit.GetReader(vol.owner, param.apiName, f)
} else {
reader = f
}

// put object
putOpt := &PutFileOption{
MIMEType: contentType,
Disposition: contentDisposition,
Tagging: tagging,
Metadata: metadata,
CacheControl: cacheControl,
Expires: expires,
ACL: aclInfo,
ObjectLock: objetLock,
}
var fsFileInfo *FSFileInfo
if fsFileInfo, err = vol.PutObject(key, reader, putOpt); err != nil {
log.LogErrorf("postObjectHandler: put object fail: requestId(%v) volume(%v) path(%v) err(%v)",
GetRequestID(r), vol.Name(), key, err)
err = handlePutObjectErr(err)
return
}

// check content-md5 of actual data if specified in the request
if requestMD5 != "" && requestMD5 != fsFileInfo.ETag {
log.LogErrorf("postObjectHandler: MD5 validate fail: requestID(%v) volume(%v) path(%v) requestMD5(%v) serverMD5(%v)",
GetRequestID(r), vol.Name(), key, requestMD5, fsFileInfo.ETag)
errorCode = BadDigest
return
}

// set response header
etag := wrapUnescapedQuot(fsFileInfo.ETag)
w.Header()[ETag] = []string{etag}

// return response depending on success_action_xxx parameter
if successRedirect != "" {
http.Redirect(w, r, successRedirect, http.StatusMovedPermanently)
return
}
switch successStatus {
case "200":
w.WriteHeader(http.StatusOK)
case "201":
response := NewS3UploadObject()
response.Bucket = param.Bucket()
response.Key = key
response.ETag = etag
response.Location = "/" + response.Bucket + "/" + response.Key
writeResponse(w, http.StatusCreated, []byte(response.String()), ValueContentTypeXML)
default:
w.WriteHeader(http.StatusNoContent)
}
}

func handlePutObjectErr(err error) error {
Expand Down
73 changes: 68 additions & 5 deletions objectnode/post_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ var startsWithSupported = map[string]bool{
"$x-amz-date": false,
}

var ignoredPostFormKeys = map[string]bool{
"awsaccesskeyid": true,
"file": true,
"policy": true,
"signature": true,
"x-amz-signature": true,
"x-amz-checksum-algorithm": true,
"x-amz-checksum-crc32": true,
"x-amz-checksum-crc32c": true,
"x-amz-checksum-sha1": true,
"x-amz-checksum-sha256": true,
"x-amz-checksum-mode": true,
}

const (
condEqual = "eq"
condStartsWith = "starts-with"
Expand Down Expand Up @@ -223,18 +237,42 @@ func (p *PostPolicy) parseSlice(cond []interface{}) error {
return nil
}

func (p *PostPolicy) Match(toMatch map[string]string) error {
func (p *PostPolicy) Match(forms map[string]string) error {
if !p.Expiration.After(time.Now().UTC()) {
return ErrPostPolicyExpired
}

if forms == nil {
forms = make(map[string]string)
}

allNeedKeys := make(map[string]bool)
ignoredKeys := make(map[string]bool)
for key := range forms {
key = strings.ToLower(key)
switch {
case ignoredPostFormKeys[key], ignoredKeys[key], strings.HasPrefix(key, "x-amz-server-side-encryption"):
continue
case strings.HasPrefix(key, "x-amz-ignore-"):
key = strings.Replace(key, "x-amz-ignore-", "", 1)
ignoredKeys[key] = true
delete(allNeedKeys, key)
case strings.HasPrefix(key, "x-ignore-"):
key = strings.Replace(key, "x-ignore-", "", 1)
ignoredKeys[key] = true
delete(allNeedKeys, key)
default:
allNeedKeys[key] = true
}
}

xAmzMeta := make(map[string]bool)
for _, cond := range p.Conditions.Matches {
if strings.HasPrefix(cond.Key, "$x-amz-meta-") {
xAmzMeta[strings.TrimPrefix(cond.Key, "$")] = true
}
}
for key := range toMatch {
for key := range forms {
key = strings.ToLower(key)
if strings.HasPrefix(key, "x-amz-meta-") && !xAmzMeta[key] {
return &ErrorCode{
Expand All @@ -247,24 +285,40 @@ func (p *PostPolicy) Match(toMatch map[string]string) error {

for _, cond := range p.Conditions.Matches {
key := strings.TrimPrefix(cond.Key, "$")
if !policyCondMatch(cond.Operator, toMatch[key], cond.Value) {
if !policyCondMatch(cond.Operator, forms[key], cond.Value) {
return &ErrorCode{
ErrorCode: "PolicyConditionNotMatch",
ErrorMessage: fmt.Sprintf("The %v not match with the policy condition.", key),
StatusCode: http.StatusForbidden,
}
}
delete(allNeedKeys, key)
}

if size := toMatch["content-length"]; size != "" {
if size := forms["content-length"]; size != "" {
fsize, _ := strconv.ParseInt(size, 10, 64)
if fsize < p.Conditions.ContentLengthRange.Min || fsize > p.Conditions.ContentLengthRange.Max {
lengthMin := p.Conditions.ContentLengthRange.Min
lengthMax := p.Conditions.ContentLengthRange.Max
if fsize < lengthMin || (lengthMax > 0 && fsize > lengthMax) {
return &ErrorCode{
ErrorCode: "PolicyConditionNotMatch",
ErrorMessage: "The content-length does not match the policy's content-length-range.",
StatusCode: http.StatusForbidden,
}
}
delete(allNeedKeys, "content-length")
}

var keys []string
for key := range allNeedKeys {
keys = append(keys, key)
}
if len(keys) > 0 {
return &ErrorCode{
ErrorCode: "PolicyConditionNotMatch",
ErrorMessage: fmt.Sprintf("Every field in the form must be in policy conditions, still need %s.", strings.Join(keys, ", ")),
StatusCode: http.StatusForbidden,
}
}

return nil
Expand Down Expand Up @@ -295,3 +349,12 @@ func parseToInt64(val interface{}) (int64, error) {
return 0, errors.New("invalid number format")
}
}

func PolicyConditionMatch(policy string, forms map[string]string) error {
p, err := NewPostPolicy(policy)
if err != nil {
return err
}

return p.Match(forms)
}
Loading

0 comments on commit 26f6923

Please sign in to comment.