Skip to content

Commit

Permalink
Handle concurrent archivals more gracefully (cadence-workflow#1614)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Mar 29, 2019
1 parent 3a5d639 commit fa5e9e9
Show file tree
Hide file tree
Showing 27 changed files with 874 additions and 402 deletions.
2 changes: 1 addition & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *server) startService() common.Daemon {
}

if params.ClusterMetadata.ArchivalConfig().ConfiguredForArchival() {
params.BlobstoreClient, err = filestore.NewClient(&s.cfg.Archival.Filestore, params.Logger)
params.BlobstoreClient, err = filestore.NewClient(&s.cfg.Archival.Filestore)
if err != nil {
log.Fatalf("error creating blobstore: %v", err)
}
Expand Down
39 changes: 32 additions & 7 deletions common/blobstore/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,46 @@ func NewBlob(body []byte, tags map[string]string) *Blob {
}
}

// DeepCopy returns a deep copy of input blob
func DeepCopy(blob *Blob) *Blob {
if blob == nil {
// DeepCopy returns a deep copy of blob
func (b *Blob) DeepCopy() *Blob {
if b == nil {
return nil
}
tagsCopy := make(map[string]string, len(blob.Tags))
for k, v := range blob.Tags {
tagsCopy := make(map[string]string, len(b.Tags))
for k, v := range b.Tags {
tagsCopy[k] = v
}
bodyCopy := make([]byte, len(blob.Body), len(blob.Body))
for i, b := range blob.Body {
bodyCopy := make([]byte, len(b.Body), len(b.Body))
for i, b := range b.Body {
bodyCopy[i] = b
}
return &Blob{
Body: bodyCopy,
Tags: tagsCopy,
}
}

// Equal returns true if input blob is equal, false otherwise
func (b *Blob) Equal(other *Blob) bool {
if b == nil && other == nil {
return true
}
if b == nil || other == nil {
return false
}
if len(b.Body) != len(other.Body) || len(b.Tags) != len(other.Tags) {
return false
}
for k, v := range b.Tags {
otherVal, ok := other.Tags[k]
if !ok || otherVal != v {
return false
}
}
for i := 0; i < len(b.Body); i++ {
if b.Body[i] != other.Body[i] {
return false
}
}
return true
}
97 changes: 97 additions & 0 deletions common/blobstore/blob/blob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package blob

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type BlobSuite struct {
*require.Assertions
suite.Suite
}

func TestBlobSuite(t *testing.T) {
suite.Run(t, new(BlobSuite))
}

func (s *BlobSuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *BlobSuite) TestEquals() {
testCases := []struct {
blobA *Blob
blobB *Blob
equal bool
}{
{
blobA: nil,
blobB: nil,
equal: true,
},
{
blobA: NewBlob(nil, nil),
blobB: nil,
equal: false,
},
{
blobA: NewBlob([]byte{1, 2, 3}, map[string]string{}),
blobB: NewBlob([]byte{1, 2, 3}, map[string]string{"k1": "v1"}),
equal: false,
},
{
blobA: NewBlob(nil, map[string]string{"k1": "v1"}),
blobB: NewBlob([]byte{1}, map[string]string{"k1": "v1"}),
equal: false,
},
{
blobA: NewBlob([]byte{1, 2, 3}, map[string]string{"k1": "v1"}),
blobB: NewBlob([]byte{1, 2, 3, 4}, map[string]string{"k1": "v1"}),
equal: false,
},
{
blobA: NewBlob([]byte{1, 2, 3}, map[string]string{"k1": "v1"}),
blobB: NewBlob([]byte{1, 2, 3}, map[string]string{"k1_x": "v1_x"}),
equal: false,
},
{
blobA: NewBlob([]byte{1, 2}, map[string]string{"k1": "v1", "k2": "v2"}),
blobB: NewBlob([]byte{1, 2}, map[string]string{"k1": "v1"}),
equal: false,
},
{
blobA: NewBlob([]byte{1, 2}, map[string]string{"k1": "v1", "k2": "v2"}),
blobB: NewBlob([]byte{1, 2}, map[string]string{"k1": "v1", "k2": "v2"}),
equal: true,
},
}
for _, tc := range testCases {
s.Equal(tc.equal, tc.blobA.Equal(tc.blobB))
s.Equal(tc.equal, tc.blobA.Equal(tc.blobB))
aDeepCopy := tc.blobA.DeepCopy()
bDeepCopy := tc.blobB.DeepCopy()
s.Equal(tc.equal, aDeepCopy.Equal(bDeepCopy))
}
}
4 changes: 2 additions & 2 deletions common/blobstore/blob/blob_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func Wrap(blob *Blob, functions ...WrapFn) (*Blob, error) {
if blob == nil {
return nil, nil
}
wrappedBlob := DeepCopy(blob)
wrappedBlob := blob.DeepCopy()
for _, f := range functions {
if err := f(wrappedBlob); err != nil {
return nil, err
Expand All @@ -116,7 +116,7 @@ func Unwrap(blob *Blob) (*Blob, *WrappingLayers, error) {
if blob == nil {
return nil, wrappingLayers, nil
}
unwrappedBlob := DeepCopy(blob)
unwrappedBlob := blob.DeepCopy()
wrappers, ok := blob.Tags[wrappersTag]
if !ok {
return unwrappedBlob, wrappingLayers, nil
Expand Down
Loading

0 comments on commit fa5e9e9

Please sign in to comment.