Skip to content

Commit

Permalink
Archival: Delete blob activity (cadence-workflow#1931)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 4, 2019
1 parent b9075be commit b596c3f
Show file tree
Hide file tree
Showing 16 changed files with 504 additions and 45 deletions.
3 changes: 3 additions & 0 deletions common/blobstore/filestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func (c *client) Delete(_ context.Context, bucket string, key blob.Key) (bool, e
blobPath := bucketItemPath(c.storeDirectory, bucket, key.String())
deleted, err := deleteFile(blobPath)
if err != nil {
if os.IsNotExist(err) {
return false, blobstore.ErrBlobNotExists
}
return false, ErrDeleteFile
}
return deleted, nil
Expand Down
2 changes: 1 addition & 1 deletion common/blobstore/filestore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *ClientSuite) TestDelete_Success() {
key, err := blob.NewKeyFromString("blob.blob")
s.NoError(err)
deleted, err := client.Delete(context.Background(), defaultBucketName, key)
s.NoError(err)
s.Equal(blobstore.ErrBlobNotExists, err)
s.False(deleted)

b := blob.NewBlob([]byte("body"), map[string]string{})
Expand Down
10 changes: 4 additions & 6 deletions common/blobstore/filestore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"bytes"
"encoding/gob"
"errors"
"github.com/uber/cadence/common/blobstore/blob"
"gopkg.in/yaml.v2"
"io/ioutil"
"os"

"github.com/uber/cadence/common/blobstore/blob"
"gopkg.in/yaml.v2"
)

const (
Expand Down Expand Up @@ -92,10 +93,7 @@ func readFile(filepath string) ([]byte, error) {

func deleteFile(filepath string) (bool, error) {
if err := os.Remove(filepath); err != nil {
if !os.IsNotExist(err) {
return false, err
}
return false, nil
return false, err
}
return true, nil
}
Expand Down
9 changes: 5 additions & 4 deletions common/blobstore/filestore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
package filestore

import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/common/blobstore/blob"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/common/blobstore/blob"
)

type UtilSuite struct {
Expand Down Expand Up @@ -152,7 +153,7 @@ func (s *UtilSuite) TestDeleteFile() {
filename := "test-file-name"
fpath := filepath.Join(dir, filename)
deleted, err := deleteFile(fpath)
s.NoError(err)
s.True(os.IsNotExist(err))
s.False(deleted)

err = writeFile(fpath, []byte("file contents"))
Expand Down
9 changes: 9 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,8 @@ const (
ArchiverUploadHistoryActivityScope
// ArchiverDeleteHistoryActivityScope is scope used by all metrics emitted by archiver.DeleteHistoryActivity
ArchiverDeleteHistoryActivityScope
// ArchiverDeleteBlobActivityScope is scope used by all metrics emitted by archiver.DeleteBlobActivity
ArchiverDeleteBlobActivityScope
// ArchiverScope is scope used by all metrics emitted by archiver.Archiver
ArchiverScope
// ArchiverPumpScope is scope used by all metrics emitted by archiver.Pump
Expand Down Expand Up @@ -1099,6 +1101,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
IndexProcessorScope: {operation: "IndexProcessor"},
ArchiverUploadHistoryActivityScope: {operation: "ArchiverUploadHistoryActivity"},
ArchiverDeleteHistoryActivityScope: {operation: "ArchiverDeleteHistoryActivity"},
ArchiverDeleteBlobActivityScope: {operation: "ArchiverDeleteBlobActivity"},
ArchiverScope: {operation: "Archiver"},
ArchiverPumpScope: {operation: "ArchiverPump"},
ArchiverArchivalWorkflowScope: {operation: "ArchiverArchivalWorkflow"},
Expand Down Expand Up @@ -1339,9 +1342,12 @@ const (
ArchiverCoroutineStoppedCount
ArchiverHandleRequestLatency
ArchiverUploadWithRetriesLatency
ArchiverDeleteBlobWithRetriesLatency
ArchiverDeleteWithRetriesLatency
ArchiverUploadFailedAllRetriesCount
ArchiverUploadSuccessCount
ArchiverDeleteBlobFailedAllRetriesCount
ArchiverDeleteBlobSuccessCount
ArchiverDeleteLocalFailedAllRetriesCount
ArchiverDeleteLocalSuccessCount
ArchiverDeleteFailedAllRetriesCount
Expand Down Expand Up @@ -1576,9 +1582,12 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ArchiverCoroutineStoppedCount: {metricName: "archiver_coroutine_stopped"},
ArchiverHandleRequestLatency: {metricName: "archiver_handle_request_latency"},
ArchiverUploadWithRetriesLatency: {metricName: "archiver_upload_with_retries_latency"},
ArchiverDeleteBlobWithRetriesLatency: {metricName: "archiver_delete_blob_with_retries_latency"},
ArchiverDeleteWithRetriesLatency: {metricName: "archiver_delete_with_retries_latency"},
ArchiverUploadFailedAllRetriesCount: {metricName: "archiver_upload_failed_all_retries"},
ArchiverUploadSuccessCount: {metricName: "archiver_upload_success"},
ArchiverDeleteBlobFailedAllRetriesCount: {metricName: "archiver_delete_blob_failed_all_retries"},
ArchiverDeleteBlobSuccessCount: {metricName: "archiver_delete_blob_success"},
ArchiverDeleteLocalFailedAllRetriesCount: {metricName: "archiver_delete_local_failed_all_retries"},
ArchiverDeleteLocalSuccessCount: {metricName: "archiver_delete_local_success"},
ArchiverDeleteFailedAllRetriesCount: {metricName: "archiver_delete_failed_all_retries"},
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ func (t *timerQueueProcessorBase) archiveWorkflow(task *persistence.TimerTaskInf
BranchToken: msBuilder.GetCurrentBranch(),
NextEventID: msBuilder.GetNextEventID(),
CloseFailoverVersion: msBuilder.GetLastWriteVersion(),
BucketName: domainCacheEntry.GetConfig().ArchivalBucket,
}

// send signal before deleting mutable state to make sure archival is idempotent
Expand Down
Loading

0 comments on commit b596c3f

Please sign in to comment.